prefect-kubernetes
contains Prefect tasks, flows, and blocks enabling orchestration, observation and management of Kubernetes resources.
This library is most commonly used for installation with a Kubernetes worker. See the Prefect docs on deploying with Kubernetes to learn how to create and run deployments in Kubernetes.
Prefect provides a Helm chart for deploying a worker, a self-hosted Prefect server instance, and other resources to a Kubernetes cluster. See the Prefect Helm chart for more information.
Getting started
Prerequisites
Install prefect-kubernetes
The following command will install a version of prefect-kubernetes
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
pip install "prefect[kubernetes]"
Upgrade to the latest versions of prefect
and prefect-kubernetes
:
pip install -U "prefect[kubernetes]"
Register newly installed block types
Register the block types in the prefect-kubernetes
module to make them available for use.
prefect block register -m prefect_kubernetes
Examples
Use with_options
to customize options on an existing task or flow
from prefect_kubernetes.flows import run_namespaced_job
customized_run_namespaced_job = run_namespaced_job.with_options(
name="My flow running a Kubernetes Job",
retries=2,
retry_delay_seconds=10,
) # this is now a new flow object that can be called
Specify and run a Kubernetes Job from a YAML file
from prefect import flow, get_run_logger
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob
k8s_creds = KubernetesCredentials.load("k8s-creds")
job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
credentials=k8s_creds,
manifest_path="path/to/job.yaml",
)
job.save("my-k8s-job", overwrite=True)
@flow
def kubernetes_orchestrator():
# run the flow and send logs to the parent flow run's logger
logger = get_run_logger()
run_namespaced_job(job, print_func=logger.info)
if __name__ == "__main__":
kubernetes_orchestrator()
As with all Prefect flows and tasks, you can call the underlying function directly if you don’t need Prefect features:
run_namespaced_job.fn(job, print_func=print)
Generate a resource-specific client from KubernetesClusterConfig
# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig
k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')
k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)
with k8s_credentials.get_client("core") as v1_core_client:
for namespace in v1_core_client.list_namespace().items:
print(namespace.metadata.name)
List jobs in a namespace
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.jobs import list_namespaced_job
@flow
def kubernetes_orchestrator():
v1_job_list = list_namespaced_job(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
namespace="my-namespace",
)
For assistance using Kubernetes, consult the Kubernetes documentation.
Refer to the prefect-kubernetes
SDK documentation to explore all the capabilities of the prefect-kubernetes
library.