<@ULVA73B9P> can you give me a example of using .d...
# ask-community
a
@Marvin can you give me a example of using .deploy() method to deploy a flow targeting a prefect k8 worker with customization on the k8 job template
m
thought for 444.2 seconds
Absolutely—here’s a working pattern in Prefect 3.x to deploy a flow to a Kubernetes work pool using Flow.deploy(), including how to customize the Kubernetes Job via job_variables. Quick context - job_variables can only override values that your work pool’s base job template exposes as variables. - The default Kubernetes pool template exposes things like image, image_pull_policy, namespace, service_account_name, env, labels, finished_job_ttl, and timeouts. - For deeper pod-level customizations (resources, tolerations, nodeSelector, volumes, etc.), create/update your work pool with a custom base job template that includes placeholders for those fields, then set them via job_variables at deploy time. 1) Simple example: customize defaults (no custom template needed) This uses the default Kubernetes base job template. Note that env is a simple dict of key/value pairs.
Copy code
from prefect import flow

@flow
def my_flow():
    print("Hello from K8s!")

# If your code is stored in a Git repo, prefer from_source so workers can pull code:
my_flow.from_source(
    "<https://github.com/your-org/your-repo>",
    entrypoint="flows/my_flow.py:my_flow",
).deploy(
    name="my-k8s-deployment",
    work_pool_name="k8s-pool",          # must be a Kubernetes work pool
    image="<http://ghcr.io/your-org/your-image:2025-01-01|ghcr.io/your-org/your-image:2025-01-01>",
    build=False,                        # set False if you’re using a prebuilt image
    push=False,                         # set False if you’re using a prebuilt image
    job_variables={
        "namespace": "data-platform",
        "service_account_name": "prefect-runner",
        "image_pull_policy": "IfNotPresent",
        "finished_job_ttl": 600,        # seconds after job completion
        "job_watch_timeout_seconds": 60,
        "pod_watch_timeout_seconds": 60,
        "labels": {"team": "data-eng", "app": "prefect"},
        "env": {                        # simple name->value mapping
            "PREFECT_LOGGING_LEVEL": "INFO",
            "EXTRA_FLAG": "true",
        },
    },
)
2) Advanced example: add resources, tolerations, nodeSelector, volumes First, get the current default and create a custom base job template for your Kubernetes work pool that adds placeholders for the fields you want: - Preview the default template:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes
- Create a file k8s-job-template.json by starting from the default and adding new variables (resources, node_selector, tolerations, volumes, volume_mounts). Example: ``` { "job_configuration": { "name": "{{ name }}", "namespace": "{{ namespace }}", "labels": "{{ labels }}", "command": "{{ command }}", "env": "{{ env }}", "cluster_config": "{{ cluster_config }}", "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}", "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}", "stream_output": "{{ stream_output }}", "job_manifest": { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "generateName": "{{ name }}-", "namespace": "{{ namespace }}", "labels": "{{ labels }}" }, "spec": { "backoffLimit": 0, "ttlSecondsAfterFinished": "{{ finished_job_ttl }}", "template": { "spec": { "serviceAccountName": "{{ service_account_name }}", "restartPolicy": "Never", "nodeSelector": "{{ node_selector }}", "tolerations": "{{ tolerations }}", "volumes": "{{ volumes }}", "containers": [ { "name": "prefect-job", "image": "{{ image }}", "imagePullPolicy": "{{ image_pull_policy }}", "args": "{{ command }}", "env": "{{ env }}", "resources": "{{ resources }}", "volumeMounts": "{{ volume_mounts }}" } ] } } } } }, "variables": {
"description": "Kubernetes worker template with additional pod spec options", "properties": { "name": {"type": ["string", "null"], "default": null}, "namespace": {"type": ["string", "null"], "default": null}, "labels": {"type": "object", "default": {}}, "command": {"type": ["array", "null"], "items": {"type": "string"}, "default": null}, "env": {"type": "object", "default": {}}, "image": {"type": ["string", "null"], "default": null}, "image_pull_policy": {"type": ["string", "null"], "default": "IfNotPresent"}, "service_account_name": {"type": ["string", "null"], "default": null}, "finished_job_ttl": {"type": ["integer", "null"], "default": 600}, "cluster_config": {"type": ["object", "null"], "default": null}, "job_watch_timeout_seconds": {"type": ["integer", "null"], "default": 60}, "pod_watch_timeout_seconds": {"type": ["integer", "null"], "default": 60}, "stream_output": {"type": ["boolean", "null"], "default": true}, "node_selector": {"type": "object", "default": {}}, "tolerations": {"type": "array", "items": {"type": "object"}, "default": []}, "resources": {"type": "object", "default": {}}, "volumes": {"type": "array", "items": {"type": "object"}, "default": []}, "volume_mounts": {"type": "array", "items": {"type": "object"}, "default": []} }, "type": "object" } }
Copy code
- Create (or update) the work pool to use this template:
prefect work-pool create "k8s-pool" --type kubernetes --base-job-template ./k8s-job-template.json --overwrite
Copy code
- Now deploy with those advanced fields via job_variables:
from prefect import flow @flow def my_flow(): print("Hello from K8s with custom pod spec!") my_flow.from_source( "https://github.com/your-org/your-repo", entrypoint="flows/my_flow.py:my_flow", ).deploy( name="my-k8s-adv", work_pool_name="k8s-pool", image="ghcr.io/your-org/your-image:2025-01-01", build=False, push=False, job_variables={ "namespace": "data-platform", "service_account_name": "prefect-runner", "image_pull_policy": "IfNotPresent", "labels": {"team": "data-eng", "app": "prefect"}, "env": {"PREFECT_LOGGING_LEVEL": "INFO"}, "resources": { "limits": {"cpu": "2", "memory": "4Gi"}, "requests": {"cpu": "1", "memory": "2Gi"} }, "node_selector": {"kubernetes.io/os": "linux", "nodeclass": "spot"}, "tolerations": [ {"key": "preemptible", "operator": "Exists", "effect": "NoSchedule"} ], "volumes": [ {"name": "data", "persistentVolumeClaim": {"claimName": "my-pvc"}} ], "volume_mounts": [ {"name": "data", "mountPath": "/mnt/data"} ], "finished_job_ttl": 1200, "job_watch_timeout_seconds": 120, "pod_watch_timeout_seconds": 120, }, )
Copy code
Tips and references
- Inspect the default Kubernetes base job template for your version to see exactly which keys are supported out of the box:
prefect work-pool get-default-base-job-template --type kubernetes ``` - Docs: - Run flows on Kubernetes (work pools, workers, config) - Customize job variables (deployment overrides) - Manage work pools and base job templates Would you like me to tailor the custom base template to a specific set of pod options you need (e.g., initContainers, securityContext, annotations)?
a
@Marvin the above
node_selector
did not translate to the job template send to k8, why
m
thought for 488.2 seconds
A few common reasons this happens—let’s narrow it down. First, can you share: - Your Prefect version (assuming 3.x unless you say otherwise) - How you set
node_selector
(work pool base job template vs deployment job_variables) - The relevant snippet of your base job template and/or the output of
prefect work-pool inspect <pool-name>
In Prefect 3.x with a Kubernetes work pool: - There is no built-in
node_selector
knob that’s auto-mapped. The worker only sends what’s in the work pool’s base job template (or what that template renders from job_variables). K8s expects
nodeSelector
(camelCase) under
spec.template.spec
. - If you set
node_selector
in deployment
job_variables
, it won’t change anything unless the base job template actually references it (e.g.,
"nodeSelector": {{ node_selector | json }}
). - Incorrect placement or casing will be ignored by K8s (e.g.,
node_selector
vs
nodeSelector
, or placing it anywhere other than
spec.template.spec
). - If you updated the base job template recently, restart the worker to be safe so it fetches the latest (workers usually fetch config at run submission, but a restart removes doubt). - The CLI expects a JSON file for `--base-job-template`; YAML won’t be parsed by that flag. How to inspect and update your work pool:
Copy code
prefect work-pool inspect <your-pool>
prefect work-pool update <your-pool> --base-job-template ./base-job-template.json
Docs: - Kubernetes work pools (3.x) - Manage work pools - CLI: work-pool Example: ensure
nodeSelector
is in the right spot of your base_job_template JSON:
Copy code
{
  "job_configuration": {
    "job": {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "nodeSelector": {
              "<http://kubernetes.io/os|kubernetes.io/os>": "linux",
              "nodegroup": "spot"
            },
            "tolerations": [
              {
                "key": "spot",
                "operator": "Exists",
                "effect": "NoSchedule"
              }
            ],
            "affinity": {
              "nodeAffinity": {
                "requiredDuringSchedulingIgnoredDuringExecution": {
                  "nodeSelectorTerms": [
                    {
                      "matchExpressions": [
                        {
                          "key": "<http://topology.kubernetes.io/zone|topology.kubernetes.io/zone>",
                          "operator": "In",
                          "values": [
                            "us-east-1a",
                            "us-east-1b"
                          ]
                        }
                      ]
                    }
                  ]
                }
              }
            }
          }
        }
      }
    }
  }
}
If you want to drive it via deployment
job_variables
, your template must reference the variable:
Copy code
"nodeSelector": {{ node_selector | json }}
Then set in your deployment:
Copy code
job_variables:
  node_selector:
    nodegroup: spot
If you can paste: -
prefect work-pool inspect <pool>
- The resulting K8s Job YAML from
kubectl get job <name> -n <ns> -o yaml
I can pinpoint exactly why it’s not showing up.
a
@Marvin it looks like regardless what job variable I set in workflow.deploy it won’t show up in the deployment page on prefect ui, why
m
thought for 276.6 seconds
Short answer: in Prefect 3.x, job variables only show on a deployment’s Configuration tab if they’re valid for the deployment’s work pool type and actually saved on that deployment. A few common gotchas can make them “disappear.” A few things to check: - There’s no
workflow.deploy
in Prefect 3.x. Did you mean
flow.deploy(...)
(Python) or
prefect deploy
(CLI)? If you really are calling
workflow.deploy
, can you share that code? It may not be persisting job variables at all. - Look in the UI under the deployment’s Configuration tab (not Overview). Job variables appear under Work pool → Job variables. - Job variables are work-pool-specific. If you set keys that your work pool doesn’t support, they’ll be ignored and won’t show in the UI. For example: - Process work pools support keys like
env
,
labels
,
command
,
working_dir
,
name
,
stream_output
. - They do NOT use
image
,
cpu
,
memory
, etc. (those are for Docker/Kubernetes). - If you didn’t set a
work_pool_name
at deploy time, Prefect will use a default; your variables might be ignored if they don’t match that pool type. - You can verify what’s actually stored with:
Copy code
prefect deployment inspect "<flow-name>/<deployment-name>"
Look for a
job_variables
section in the output. Minimal example that will show job variables for a Process pool:
Copy code
from prefect import flow

@flow
def example():
    pass

if __name__ == "__main__":
    example.deploy(
        name="proc-demo",
        work_pool_name="process-pool",
        job_variables={
            "env": {"FOO": "BAR"},
            "command": "echo hello",
            "working_dir": "/tmp",
        },
    )
If that still doesn’t appear in the UI: - Which work pool type are you deploying to? - Exactly how are you deploying (Python
flow.deploy
, CLI
prefect deploy
, or something else)? - Which job variable keys are you setting? Docs for reference: - Customize job variables - Deploy via Python - Create deployments