Chad LeJeune
07/18/2025, 2:39 AMMarvin
07/18/2025, 2:39 AMMarvin
07/18/2025, 2:40 AMbash
prefect work-pool create "k8s-pool" --type kubernetes
2. Configure Pod Template:
You can customize the pod configuration including:
- Resource requests and limits
- Container image
- Environment variables
- Volume mounts
- Other Kubernetes-specific configurations
3. Deploy Your Flow:
Create a prefect.yaml
file that specifies the Kubernetes work pool:
yaml
deployments:
- name: my-flow
entrypoint: path/to/flow.py:flow_function
work_pool:
name: k8s-pool
4. Deploy and Run:
Deploy your flow using:
bash
prefect deploy
When the flow runs, Prefect will:
1. Create a new pod in your Kubernetes cluster
2. Execute the entire flow within that pod
3. Clean up the pod after execution
Important notes:
- Each flow run gets its own pod, providing isolation and clean execution environments
- The pod lifecycle is managed automatically by Prefect
- You can customize the pod template to specify resource requirements, security contexts, and other Kubernetes-specific configurations
- Workers themselves run as pods in the cluster and manage the creation/deletion of flow run pods
For more detailed information, you can refer to:
- The Kubernetes deployment guide: https://docs-3.prefect.io/v3/how-to-guides/deployment_infra/kubernetes
- Migration guide (if coming from Airflow): https://docs-3.prefect.io/v3/how-to-guides/migrate/airflow
Would you like me to provide more specific details about any aspect of Kubernetes execution, such as pod template configuration or worker deployment?Chad LeJeune
07/18/2025, 2:41 AMMarvin
07/18/2025, 2:41 AMMarvin
07/18/2025, 2:42 AMpython
from prefect import task, flow
from kubernetes import client, config
@task
def create_and_run_pod(pod_name, image, command):
config.load_kube_config() # or load_incluster_config() if running inside k8s
v1 = client.CoreV1Api()
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": pod_name},
"spec": {
"containers": [{
"name": "task-container",
"image": image,
"command": command
}],
"restartPolicy": "Never"
}
}
# Create the pod
v1.create_namespaced_pod(namespace="default", body=pod_manifest)
# Optionally wait for completion and get logs
# ... pod monitoring code here ...
@flow
def my_flow():
# Create pods dynamically as needed
create_and_run_pod("task-1", "python:3.9", ["python", "-c", "print('Hello from pod!')"])
2. Using Dask on Kubernetes
If you need more sophisticated distributed computation, you could use Dask with Kubernetes:
python
from prefect import task, flow
from dask_kubernetes import KubeCluster
from dask.distributed import Client
@flow
def my_distributed_flow():
# Create a Dask cluster on Kubernetes
cluster = KubeCluster(
pod_template={
"spec": {
"containers": [{
"name": "dask-worker",
"image": "your-image:tag",
"resources": {
"limits": {"cpu": "1", "memory": "2Gi"},
"requests": {"cpu": "1", "memory": "2Gi"}
}
}]
}
}
)
# Scale the cluster as needed
cluster.adapt(minimum=1, maximum=10)
with Client(cluster) as client:
# Your distributed computations here
# Each will run in a separate pod
...
Important Considerations:
1. Security: Ensure your flow has the necessary permissions to create and manage pods in your Kubernetes cluster.
2. Resource Management: You'll need to handle pod cleanup and resource management yourself if creating pods programmatically.
3. Monitoring: Consider implementing proper monitoring and error handling for dynamically created pods.
4. Alternative Approach: Instead of creating pods during the flow run, consider whether your use case might be better served by breaking your flow into smaller flows and deploying them separately, letting Prefect handle the pod lifecycle.
Would you like me to elaborate on any of these approaches or provide more specific examples for your use case?