Arthur Ren
11/05/2025, 5:23 PMMarvin
11/05/2025, 5:23 PMMarvin
11/05/2025, 5:30 PMfrom prefect import flow
@flow
def my_flow():
print("Hello from K8s!")
# If your code is stored in a Git repo, prefer from_source so workers can pull code:
my_flow.from_source(
"<https://github.com/your-org/your-repo>",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="my-k8s-deployment",
work_pool_name="k8s-pool", # must be a Kubernetes work pool
image="<http://ghcr.io/your-org/your-image:2025-01-01|ghcr.io/your-org/your-image:2025-01-01>",
build=False, # set False if you’re using a prebuilt image
push=False, # set False if you’re using a prebuilt image
job_variables={
"namespace": "data-platform",
"service_account_name": "prefect-runner",
"image_pull_policy": "IfNotPresent",
"finished_job_ttl": 600, # seconds after job completion
"job_watch_timeout_seconds": 60,
"pod_watch_timeout_seconds": 60,
"labels": {"team": "data-eng", "app": "prefect"},
"env": { # simple name->value mapping
"PREFECT_LOGGING_LEVEL": "INFO",
"EXTRA_FLAG": "true",
},
},
)
2) Advanced example: add resources, tolerations, nodeSelector, volumes
First, get the current default and create a custom base job template for your Kubernetes work pool that adds placeholders for the fields you want:
- Preview the default template:
prefect work-pool get-default-base-job-template --type kubernetes
- Create a file k8s-job-template.json by starting from the default and adding new variables (resources, node_selector, tolerations, volumes, volume_mounts). Example:
```
{
"job_configuration": {
"name": "{{ name }}",
"namespace": "{{ namespace }}",
"labels": "{{ labels }}",
"command": "{{ command }}",
"env": "{{ env }}",
"cluster_config": "{{ cluster_config }}",
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}",
"stream_output": "{{ stream_output }}",
"job_manifest": {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"generateName": "{{ name }}-",
"namespace": "{{ namespace }}",
"labels": "{{ labels }}"
},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}",
"template": {
"spec": {
"serviceAccountName": "{{ service_account_name }}",
"restartPolicy": "Never",
"nodeSelector": "{{ node_selector }}",
"tolerations": "{{ tolerations }}",
"volumes": "{{ volumes }}",
"containers": [
{
"name": "prefect-job",
"image": "{{ image }}",
"imagePullPolicy": "{{ image_pull_policy }}",
"args": "{{ command }}",
"env": "{{ env }}",
"resources": "{{ resources }}",
"volumeMounts": "{{ volume_mounts }}"
}
]
}
}
}
}
},
"variables": {Marvin
11/05/2025, 5:30 PM- Create (or update) the work pool to use this template:
prefect work-pool create "k8s-pool" --type kubernetes --base-job-template ./k8s-job-template.json --overwrite
- Now deploy with those advanced fields via job_variables:
from prefect import flow
@flow
def my_flow():
print("Hello from K8s with custom pod spec!")
my_flow.from_source(
"https://github.com/your-org/your-repo",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="my-k8s-adv",
work_pool_name="k8s-pool",
image="ghcr.io/your-org/your-image:2025-01-01",
build=False,
push=False,
job_variables={
"namespace": "data-platform",
"service_account_name": "prefect-runner",
"image_pull_policy": "IfNotPresent",
"labels": {"team": "data-eng", "app": "prefect"},
"env": {"PREFECT_LOGGING_LEVEL": "INFO"},
"resources": {
"limits": {"cpu": "2", "memory": "4Gi"},
"requests": {"cpu": "1", "memory": "2Gi"}
},
"node_selector": {"kubernetes.io/os": "linux", "nodeclass": "spot"},
"tolerations": [
{"key": "preemptible", "operator": "Exists", "effect": "NoSchedule"}
],
"volumes": [
{"name": "data", "persistentVolumeClaim": {"claimName": "my-pvc"}}
],
"volume_mounts": [
{"name": "data", "mountPath": "/mnt/data"}
],
"finished_job_ttl": 1200,
"job_watch_timeout_seconds": 120,
"pod_watch_timeout_seconds": 120,
},
)
Tips and references
- Inspect the default Kubernetes base job template for your version to see exactly which keys are supported out of the box:
prefect work-pool get-default-base-job-template --type kubernetes
```
- Docs:
- Run flows on Kubernetes (work pools, workers, config)
- Customize job variables (deployment overrides)
- Manage work pools and base job templates
Would you like me to tailor the custom base template to a specific set of pod options you need (e.g., initContainers, securityContext, annotations)?Arthur Ren
11/06/2025, 12:33 AMnode_selector did not translate to the job template send to k8, whyMarvin
11/06/2025, 12:33 AMMarvin
11/06/2025, 12:41 AMnode_selector (work pool base job template vs deployment job_variables)
- The relevant snippet of your base job template and/or the output of prefect work-pool inspect <pool-name>
In Prefect 3.x with a Kubernetes work pool:
- There is no built-in node_selector knob that’s auto-mapped. The worker only sends what’s in the work pool’s base job template (or what that template renders from job_variables). K8s expects nodeSelector (camelCase) under spec.template.spec.
- If you set node_selector in deployment job_variables, it won’t change anything unless the base job template actually references it (e.g., "nodeSelector": {{ node_selector | json }}).
- Incorrect placement or casing will be ignored by K8s (e.g., node_selector vs nodeSelector, or placing it anywhere other than spec.template.spec).
- If you updated the base job template recently, restart the worker to be safe so it fetches the latest (workers usually fetch config at run submission, but a restart removes doubt).
- The CLI expects a JSON file for `--base-job-template`; YAML won’t be parsed by that flag.
How to inspect and update your work pool:
prefect work-pool inspect <your-pool>
prefect work-pool update <your-pool> --base-job-template ./base-job-template.json
Docs:
- Kubernetes work pools (3.x)
- Manage work pools
- CLI: work-pool
Example: ensure nodeSelector is in the right spot of your base_job_template JSON:
{
"job_configuration": {
"job": {
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"spec": {
"nodeSelector": {
"<http://kubernetes.io/os|kubernetes.io/os>": "linux",
"nodegroup": "spot"
},
"tolerations": [
{
"key": "spot",
"operator": "Exists",
"effect": "NoSchedule"
}
],
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "<http://topology.kubernetes.io/zone|topology.kubernetes.io/zone>",
"operator": "In",
"values": [
"us-east-1a",
"us-east-1b"
]
}
]
}
]
}
}
}
}
}
}
}
}
}
If you want to drive it via deployment job_variables, your template must reference the variable:
"nodeSelector": {{ node_selector | json }}
Then set in your deployment:
job_variables:
node_selector:
nodegroup: spot
If you can paste:
- prefect work-pool inspect <pool>
- The resulting K8s Job YAML from kubectl get job <name> -n <ns> -o yaml
I can pinpoint exactly why it’s not showing up.Arthur Ren
11/06/2025, 9:36 PMMarvin
11/06/2025, 9:36 PMMarvin
11/06/2025, 9:41 PMworkflow.deploy in Prefect 3.x. Did you mean flow.deploy(...) (Python) or prefect deploy (CLI)? If you really are calling workflow.deploy, can you share that code? It may not be persisting job variables at all.
- Look in the UI under the deployment’s Configuration tab (not Overview). Job variables appear under Work pool → Job variables.
- Job variables are work-pool-specific. If you set keys that your work pool doesn’t support, they’ll be ignored and won’t show in the UI. For example:
- Process work pools support keys like env, labels, command, working_dir, name, stream_output.
- They do NOT use image, cpu, memory, etc. (those are for Docker/Kubernetes).
- If you didn’t set a work_pool_name at deploy time, Prefect will use a default; your variables might be ignored if they don’t match that pool type.
- You can verify what’s actually stored with:
prefect deployment inspect "<flow-name>/<deployment-name>"
Look for a job_variables section in the output.
Minimal example that will show job variables for a Process pool:
from prefect import flow
@flow
def example():
pass
if __name__ == "__main__":
example.deploy(
name="proc-demo",
work_pool_name="process-pool",
job_variables={
"env": {"FOO": "BAR"},
"command": "echo hello",
"working_dir": "/tmp",
},
)
If that still doesn’t appear in the UI:
- Which work pool type are you deploying to?
- Exactly how are you deploying (Python flow.deploy, CLI prefect deploy, or something else)?
- Which job variable keys are you setting?
Docs for reference:
- Customize job variables
- Deploy via Python
- Create deployments