How to get the path of a Prefect flow?
# ask-marvin
o
How to get the path of a Prefect flow?
@Marvin prefect3 my flow prefect/flows/debug/debug_flow.py @flow(name="Debug simple flow", on_completion=[post_to_slack]) def debug_simple_flow(message: str | None = None): """ Debug flow description :param message: message to output """ print_message_with_logger(msg=message) if name == "__main__": debug_simple_flow() my function post_to_slack prefect/etl_tools/tasks/notificators.py def post_to_slack(flow: Flow, flow_run: FlowRun, state: State): ... path_of_flow = ? how to find path_of_flow, it should be prefect/flows/debug/debug_flow.py, is there way without using inspect?
m
thought for 111.3 seconds
Short answer: there isn’t a built-in Prefect 3 attribute on
Flow
or
FlowRun
that gives you the source file path. But you can get it without
inspect
. Two practical options: 1) From the flow function object (works for local runs and most deployments)
Copy code
from pathlib import Path

def post_to_slack(flow: Flow, flow_run: FlowRun, state: State):
    file_path = Path(flow.fn.__code__.co_filename)  # absolute path
    # make it project-relative if possible
    try:
        rel_path = file_path.relative_to(Path.cwd())
    except ValueError:
        rel_path = file_path

    # rel_path.as_posix() -> e.g. "prefect/flows/debug/debug_flow.py"
    ...
2) From the deployment metadata (only when the run came from a deployment) - If
flow_run.deployment_id
is set, fetch the deployment and read its
entrypoint
(typically something like
prefect/flows/debug/debug_flow.py:debug_simple_flow
). - Prefect 3 orchestration client can read the deployment; sketch:
Copy code
import anyio
from prefect.client.orchestration import PrefectClient

def post_to_slack(flow: Flow, flow_run: FlowRun, state: State):
    entrypoint = None
    if flow_run.deployment_id:
        async def _get_entrypoint():
            async with PrefectClient() as client:
                dep = await client.read_deployment(flow_run.deployment_id)
                return getattr(dep, "entrypoint", None)
        entrypoint = anyio.run(_get_entrypoint)

    # Fallback to code object if no deployment or entrypoint
    file_path = Path(flow.fn.__code__.co_filename)
    ...
Notes and caveats: - In containers/remote environments,
co_filename
will be the path inside the image/worker. If you need a repo-relative path, normalize against your known project root (or use the deployment’s
entrypoint
which is already repo-relative). - There’s no
flow.path
or
flow_run.path
field in Prefect 3; this is by design. References: - Prefect flow internals (no entrypoint attribute on Flow/FlowRun): prefect/src/prefect/flows.py If you’d prefer one approach over the other (e.g., always use deployment entrypoint when available), say the word and I’ll tighten up the function.
o
@Marvin how can i specify job_variables so my limits would work?
Copy code
job_variables = {
    "env": {
        "STRESS_TEST": "passed",
        "HOSTNAME": flow.name,
        "PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING",
    },
    "namespace": DEFAULT_NAMESPACE,
    "stream_output": True,
    "labels": {
        "team": "data-eng",
        "flow": clean_name,
    },
    "finished_job_ttl": 60,

    "job": {
        "spec": {
            "template": {
                "spec": {
                    "containers": [
                        {
                            "name": "prefect-job",
                            "resources": {
                                "requests": {
                                    "cpu": flow_config.cpu_request,
                                    "memory": flow_config.memory_request,
                                },
                                "limits": {
                                    "cpu": flow_config.cpu_limit,
                                    "memory": flow_config.memory_limit,
                                },
                            },
                        }
                    ]
                }
            }
        }
    }
}

flow.deploy(
    name=clean_name,
    work_queue_name="default",
    work_pool_name=os.getenv("WORK_POOL", "k8s-worker"),
    image=DEFAULT_IMAGE,
    push=False,
    build=False,
    paused=(schedule.is_schedule_active is False),
    print_next_steps=False,
    ignore_warnings=True,
    job_variables=job_variables,
    tags=get_flow_tags(flow_path),
    description=get_flow_description(flow, flow_config),
    schedule=cron_schedule,
    parameters=schedule.parameters,
)
m
thought for 123.6 seconds
You’re on the right track. For a Kubernetes work pool in Prefect 3, resource limits/requests must live under the pod’s container spec. The common, working shape is:
Copy code
job_variables = {
  "namespace": DEFAULT_NAMESPACE,
  "stream_output": True,
  "labels": {"team": "data-eng", "flow": clean_name},
  "finished_job_ttl": 60,
  "env": {
    "STRESS_TEST": "passed",
    "HOSTNAME": flow.name,
    "PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING",
  },
  "job": {
    "spec": {
      "template": {
        "spec": {
          "containers": [
            {
              "name": "prefect-job",
              "resources": {
                "requests": {
                  "cpu": flow_config.cpu_request,   # e.g. "500m"
                  "memory": flow_config.memory_request,  # e.g. "512Mi"
                },
                "limits": {
                  "cpu": flow_config.cpu_limit,     # e.g. "1"
                  "memory": flow_config.memory_limit,    # e.g. "1Gi"
                },
              },
            }
          ]
        }
      }
    }
  }
}
Key points so your limits actually apply: - Use valid Kubernetes units: CPU like "500m" or "1", memory like "512Mi" or "1Gi". Raw ints/floats can be rejected or silently ignored by the cluster. - Container name must match the base template. Prefect’s default is "prefect-job"; if your pool template uses a different name, your resources won’t land. - Deep merge behavior: Prefect merges your
job_variables
into the base job template. Dicts are merged; lists may be replaced. The above pattern is known to work with the default template. If you find it replaces the whole containers list (and breaks the job), set these defaults in the work pool’s base job template instead, or include the full container entry (name, image, command) in your override. - Namespace-level LimitRange/ResourceQuota can cap or override your values. If the pod runs but with different resources, check those. You can verify what landed with:
Copy code
kubectl -n <ns> get pod <pod> -o jsonpath='{.spec.containers[?(@.name=="prefect-job")].resources}'
Docs, for when you need something cheerier than YAML: - Workers overview: Workers concept - Kubernetes specifics: Kubernetes worker how-to If you want, I can help tailor this to your pool’s exact base job template to avoid any merge surprises.
o
@Marvin you wrote that same what i had, and still i dont see resource limit in output job apiVersion: v1 kind: Pod metadata: name: placid-goldfish-wsbcc-bmkh9 generateName: placid-goldfish-wsbcc- namespace: prefect2-dev uid: 6325017d-c972-45b3-88b0-b661c0786a61 resourceVersion: '1915386' creationTimestamp: '2025-08-20T121935Z' labels: batch.kubernetes.io/controller-uid: b1da13da-7192-43d9-8d10-cfec346d11cd batch.kubernetes.io/job-name: placid-goldfish-wsbcc controller-uid: b1da13da-7192-43d9-8d10-cfec346d11cd flow: ml-template-test job-name: placid-goldfish-wsbcc prefect.io/deployment-id: 5c550811-e987-4dd1-844e-e730fab88a5c prefect.io/deployment-name: ml-template-test prefect.io/deployment-updated: 2025-08-20t12-19-34.586883-00-00 prefect.io/flow-id: 70f11811-6b19-4820-82df-ca44cb6eaab0 prefect.io/flow-name: ml-template-test prefect.io/flow-run-id: a45055b3-445c-4fca-9e33-8fe28ea83ec3 prefect.io/flow-run-name: placid-goldfish prefect.io/version: 3.4.13 prefect.io/work-pool-id: 84a4419c-a3ac-466b-81f7-a6b054c36ad5 prefect.io/work-pool-name: default-agent-pool prefect.io/worker-name: kubernetesworker-b940c453-bf6f-45d2-ab3f-2f051c99be15 team: data-eng ownerReferences: - apiVersion: batch/v1 kind: Job name: placid-goldfish-wsbcc uid: b1da13da-7192-43d9-8d10-cfec346d11cd controller: true blockOwnerDeletion: true finalizers: - batch.kubernetes.io/job-tracking selfLink: /api/v1/namespaces/prefect2-dev/pods/placid-goldfish-wsbcc-bmkh9 status: phase: Running conditions: - type: PodReadyToStartContainers status: 'True' lastProbeTime: null lastTransitionTime: '2025-08-20T121937Z' - type: Initialized status: 'True' lastProbeTime: null lastTransitionTime: '2025-08-20T121935Z' - type: Ready status: 'True' lastProbeTime: null lastTransitionTime: '2025-08-20T121937Z' - type: ContainersReady status: 'True' lastProbeTime: null lastTransitionTime: '2025-08-20T121937Z' - type: PodScheduled status: 'True' lastProbeTime: null lastTransitionTime: '2025-08-20T121935Z' hostIP: 192.168.65.3 hostIPs: - ip: 192.168.65.3 podIP: 10.1.12.176 podIPs: - ip: 10.1.12.176 startTime: '2025-08-20T121935Z' containerStatuses: - name: prefect-job state: running: startedAt: '2025-08-20T121936Z' lastState: {} ready: true restartCount: 0 image: dev/prefect2:worker imageID: >- docker-pullable://dev/prefect2@sha256:fa77ea8e3406e522b5ceeb40307b1b02045b621e244a0e96210b3a376a22f08e containerID: >- docker://0d59c2ea19d53e947cc2c075cfe524af0e8ff4dea32c106b68e3e616e6b22e89 started: true qosClass: BestEffort spec: volumes: - name: kube-api-access-j8dls projected: sources: - serviceAccountToken: expirationSeconds: 3607 path: token - configMap: name: kube-root-ca.crt items: - key: ca.crt path: ca.crt - downwardAPI: items: - path: namespace fieldRef: apiVersion: v1 fieldPath: metadata.namespace defaultMode: 420 containers: - name: prefect-job image: dev/prefect2:worker args: - prefect - flow-run - execute env: - name: PREFECT_DEBUG_MODE value: 'False' - name: PREFECT_API_URL value: http://prefect-server:4200/api - name: PREFECT_API_ENABLE_HTTP2 value: 'True' - name: PREFECT_SERVER_EPHEMERAL_ENABLED value: 'True' - name: PREFECT_SERVER_UI_STATIC_DIRECTORY value: /tmp/prefect_server_ui_build - name: PREFECT_WORKER_QUERY_SECONDS value: '5.0' - name: PREFECT_WORKER_PREFETCH_SECONDS value: '10.0' - name: PREFECT__FLOW_RUN_ID value: a45055b3-445c-4fca-9e33-8fe28ea83ec3 - name: HOSTNAME value: ML template test - name: STRESS_TEST value: passed - name: PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL value: WARNING - name: PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR value: reschedule resources: {} volumeMounts: - name: kube-api-access-j8dls readOnly: true mountPath: /var/run/secrets/kubernetes.io/serviceaccount terminationMessagePath: /dev/termination-log terminationMessagePolicy: File imagePullPolicy: IfNotPresent restartPolicy: Never terminationGracePeriodSeconds: 30 dnsPolicy: ClusterFirst serviceAccountName: default serviceAccount: default nodeName: docker-desktop securityContext: {} schedulerName: default-scheduler tolerations: - key: node.kubernetes.io/not-ready operator: Exists effect: NoExecute tolerationSeconds: 300 - key: node.kubernetes.io/unreachable operator: Exists effect: NoExecute tolerationSeconds: 300 priority: 0 enableServiceLinks: true preemptionPolicy: PreemptLowerPriority
m
thought for 64.3 seconds
I see why you’re getting BestEffort with resources: {} — your
job_variables
aren’t being read where you think they are. In Prefect 3’s Kubernetes worker, job_variables are validated against the worker’s variable schema and rendered into the pool’s Base Job Template. By default, that template does not deep-merge arbitrary
job.spec.template.spec.containers
overrides. Your top-level
env
,
namespace
, etc. work because the template exposes them. Your nested
job
block is ignored, so no resources land. What to do instead Option A: Use the resource keys the template actually exposes Most default Kubernetes pool templates expect top-level variables for resources. Try:
Copy code
job_variables = {
  "namespace": DEFAULT_NAMESPACE,
  "stream_output": True,
  "labels": {"team": "data-eng", "flow": clean_name},
  "finished_job_ttl": 60,
  "env": {
    "STRESS_TEST": "passed",
    "HOSTNAME": flow.name,
    "PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING",
  },

  # Use top-level resource variables the template renders
  "cpu_request": str(flow_config.cpu_request),     # e.g. "500m"
  "memory_request": str(flow_config.memory_request), # e.g. "512Mi"
  "cpu_limit": str(flow_config.cpu_limit),         # e.g. "1"
  "memory_limit": str(flow_config.memory_limit),   # e.g. "1Gi"
}
Then redeploy and run. If the pool’s template wires these variables, you’ll see non-empty
resources
and QoS change to Burstable/Guaranteed. Option B: Customize the pool’s Base Job Template If your template doesn’t render those keys, edit the work pool’s Base Job Template (UI: Work Pools -> your pool -> Edit template) and add:
Copy code
resources:
  requests:
    cpu: "{{ cpu_request }}"
    memory: "{{ memory_request }}"
  limits:
    cpu: "{{ cpu_limit }}"
    memory: "{{ memory_limit }}"
If you truly want to inject raw Kubernetes under
job
or
pod
, you must add corresponding Jinja hooks in the template for those fields; the default template won’t merge an arbitrary
job
dict. Important clues in your pod - work-pool-name is “default-agent-pool”, not “k8s-worker”. Make sure the deployment you ran actually targets the pool you think it does. If you configured resources for one pool but the run used another, you’ll get… nothing. - qosClass: BestEffort confirms no requests/limits were applied. - env vars are present, showing top-level variables were rendered; the resource keys just weren’t. Sanity checks - Use valid units: CPU like "500m" or "1", memory like "512Mi" or "1Gi". - Container name in the template is usually "prefect-job"; if you customize the template to accept a container block, ensure names match or target the right index. - Inspect the pool’s Base Job Template to see which variables it actually exposes. Docs and template references - Workers overview: Workers concept - Kubernetes worker how-to: Kubernetes worker how-to - Default K8s worker template (for variable names): Prefect Kubernetes job template If you paste your pool’s Base Job Template, I’ll map exactly which keys you need to set so the limits actually stick. I live for YAML, sadly.
o
@Marvin { "env": { "HOSTNAME": "Debug simple flow 10_58", "STRESS_TEST": "passed", "PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING" }, "name": "debug-simple-flow-10-58", "image": "dev/prefect2:worker", "labels": { "cur_time": "10-58" }, "cpu_limit": "2000m", "namespace": "prefect2-dev", "cpu_request": "200m", "memory_limit": "1Gi", "stream_output": true, "memory_request": "1Gi", "finished_job_ttl": 60, "ttlSecondsAfterFinished": 60 } { "variables": { "type": "object", "properties": { "env": { "type": "object", "title": "Environment Variables", "description": "Environment variables to set when starting a flow run.", "additionalProperties": { "anyOf": [ { "type": "string" }, { "type": "null" } ] } }, "name": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Name", "default": null, "description": "Name given to infrastructure created by a worker." }, "image": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Image", "default": null, "examples": [ "docker.io/prefecthq/prefect:3-latest" ], "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used." }, "labels": { "type": "object", "title": "Labels", "description": "Labels applied to infrastructure created by a worker.", "additionalProperties": { "type": "string" } }, "command": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Command", "default": null, "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker." }, "namespace": { "type": "string", "title": "Namespace", "default": "default", "description": "The Kubernetes namespace to create jobs within." }, "stream_output": { "type": "boolean", "title": "Stream Output", "default": true, "description": "If set, output will be streamed from the job to local standard output." }, "cluster_config": { "anyOf": [ { "$ref": "#/definitions/KubernetesClusterConfig" }, { "type": "null" } ], "default": null, "description": "The Kubernetes cluster config to use for job creation." }, "finished_job_ttl": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "title": "Finished Job TTL", "default": "{{ finished_job_ttl }}", "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely." }, "image_pull_policy": { "enum": [ "IfNotPresent", "Always", "Never" ], "type": "string", "title": "Image Pull Policy", "default": "IfNotPresent", "description": "The Kubernetes image pull policy to use for job containers." }, "service_account_name": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Service Account Name", "default": null, "description": "The Kubernetes service account to use for job creation." }, "job_watch_timeout_seconds": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "title": "Job Watch Timeout Seconds", "default": null, "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely." }, "pod_watch_timeout_seconds": { "type": "integer", "title": "Pod Watch Timeout Seconds", "default": 60, "description": "Number of seconds to watch for pod creation before timing out." } }, "definitions": { "KubernetesClusterConfig": { "type": "object", "title": "KubernetesClusterConfig", "required": [ "config", "context_name" ], "properties": { "config": { "type": "object", "title": "Config", "description": "The entire contents of a kubectl config file." }, "context_name": { "type": "string", "title": "Context Name", "description": "The name of the kubectl context to use." } }, "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee
from_file
for creation.", "secret_fields": [], "block_type_slug": "kubernetes-cluster-config", "block_schema_references": {} } }, "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the
variables
section of the default\nbase job template." }, "job_configuration": { "env": "{{ env }}", "name": "{{ name }}", "labels": "{{ labels }}", "command": "{{ command }}", "namespace": "{{ namespace }}", "job_manifest": { "kind": "Job", "spec": { "template": { "spec": { "containers": [ { "env": "{{ env }}", "args": "{{ command }}", "name": "prefect-job", "image": "{{ image }}", "envFrom": [ { "secretRef": { "name": "secrets-vault" } }, { "secretRef": { "name": "secrets-vault-base64-tokens" } } ], "resources": { "limits": { "cpu": "{{ cpu_limit | default('2') }}", "memory": "{{ memory_limit | default('4Gi') }}" }, "requests": { "cpu": "{{ cpu_request | default('200m') }}", "memory": "{{ memory_request | default('1Gi') }}" } }, "imagePullPolicy": "{{ image_pull_policy }}" } ], "completions": 1, "parallelism": 1, "restartPolicy": "Never", "serviceAccountName": "{{ service_account_name }}" } }, "backoffLimit": 0, "ttlSecondsAfterFinished": 60 }, "metadata": { "labels": "{{ labels }}", "namespace": "{{ namespace }}", "generateName": "{{ name }}-" }, "apiVersion": "batch/v1" }, "stream_output": "{{ stream_output }}", "cluster_config": "{{ cluster_config }}", "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}", "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}" } } response_data = await self.request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 249, in POST return (await self.request("POST", url, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 206, in request raise ApiException(http_resp=r) kubernetes_asyncio.client.exceptions.ApiException: (400) Reason: Bad Request HTTP response headers: <CIMultiDictProxy('Audit-Id': '7eae52ac-bd6d-47c6-ad43-ff72a75cabd4', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'e92bf259-c16a-4007-b5de-f536895e9434', 'X-Kubernetes-Pf-Prioritylevel-Uid': '03359ab3-9994-4cfb-b068-e4b73a77e01c', 'Date': 'Thu, 21 Aug 2025 075824 GMT', 'Content-Length': '256')> HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job in version \"v1\" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'","reason":"BadRequest","code":400} The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 1263, in _submit_run_and_capture_errors result = await self.run( ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 745, in run job = await self._create_job(configuration, client) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped return await copy(fn, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 111, in call do = await self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 153, in iter result = await action(retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/_utils.py", line 99, in inner return call(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 420, in exc_check raise retry_exc.reraise() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 187, in reraise raise self.last_attempt.result() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 114, in call result = await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 929, in _create_job raise InfrastructureError( prefect.exceptions.InfrastructureError: Unable to create Kubernetes job: Bad Request: Job in version "v1" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$' 105824 AM prefect.flow_runs.worker Reported flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' as crashed: Flow run could not be submitted to infrastructure: InfrastructureError('Unable to create Kubernetes job: Bad Request: Job in version "v1" cannot be handled as a Job: quantities must match the regular expression \'^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\'') 105824 AM prefect.flow_runs.worker
m
Your message was too long, here's your message at the allowed limit: { env { HOSTNAME Debug simple flow 10_58 STRESS_TEST passed PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL WARNING } name debug-simple-flow-10-58 image dev/prefect2 worker labels { cur_time 10-58 } cpu_limit 2000m namespace prefect2-dev cpu_request 200m memory_limit 1Gi stream_output true memory_request 1Gi finished_job_ttl 60 ttlSecondsAfterFinished 60 } { variables { type object properties { env { type object title Environment Variables description Environment variables to set when starting a flow run additionalProperties { anyOf [ { type string } { type null } ] } } name { anyOf [ { type string } { type null } ] title Name default null description Name given to infrastructure created by a worker } image { anyOf [ { type string } { type null } ] title Image default null examples [ <http //docker io/prefecthq/prefect 3-latest|docker io/prefecthq/prefect 3-latest> ] description The image reference of a container image to use for created jobs If not set the latest Prefect image will be used } labels { type object title Labels description Labels applied to infrastructure created by a worker additionalProperties { type string } } command { anyOf [ { type string } { type null } ] title Command default null description The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker } namespace { type string title Namespace default default description The Kubernetes namespace to create jobs within } stream_output { type boolean title Stream Output default true description If set output will be streamed from the job to local standard output } cluster_config { anyOf [ { $ref #/definitions/KubernetesClusterConfig } { type null } ] default null description The Kubernetes cluster config to use for job creation } finished_job_ttl { anyOf [ { type integer } { type null } ] title Finished Job TTL default {{ finished_job_ttl }} description The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely } image_pull_policy { enum [ IfNotPresent Always Never ] type string title Image Pull Policy default IfNotPresent description The Kubernetes image pull policy to use for job containers } service_account_name { anyOf [ { type string } { type null } ] title Service Account Name default null description The Kubernetes service account to use for job creation } job_watch_timeout_seconds { anyOf [ { type integer } { type null } ] title Job Watch Timeout Seconds default null description Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely } pod_watch_timeout_seconds { type integer title Pod Watch Timeout Seconds default 60 description Number of seconds to watch for pod creation before timing out } } definitions { KubernetesClusterConfig { type object title KubernetesClusterConfig required [ config context_name ] properties { config { type object title Config description The entire contents
o
@Marvin me envs { "env": { "HOSTNAME": "Debug simple flow 10_58", "STRESS_TEST": "passed", "PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING" }, "name": "debug-simple-flow-10-58", "image": "dev/prefect2:worker", "labels": { "cur_time": "10-58" }, "cpu_limit": "2000m", "namespace": "prefect2-dev", "cpu_request": "200m", "memory_limit": "1Gi", "stream_output": true, "memory_request": "1Gi", "finished_job_ttl": 60, "ttlSecondsAfterFinished": 60 } main template ... "job_configuration": { "env": "{{ env }}", "name": "{{ name }}", "labels": "{{ labels }}", "command": "{{ command }}", "namespace": "{{ namespace }}", "job_manifest": { "kind": "Job", "spec": { "template": { "spec": { "containers": [ { "env": "{{ env }}", "args": "{{ command }}", "name": "prefect-job", "image": "{{ image }}", "envFrom": [ { "secretRef": { "name": "secrets-vault" } }, { "secretRef": { "name": "secrets-vault-base64-tokens" } } ], "resources": { "limits": { "cpu": "{{ cpu_limit | default('2') }}", "memory": "{{ memory_limit | default('4Gi') }}" }, "requests": { "cpu": "{{ cpu_request | default('200m') }}", "memory": "{{ memory_request | default('1Gi') }}" } }, "imagePullPolicy": "{{ image_pull_policy }}" } ], ... Worker 'KubernetesWorker 8547351c-8150-4dc8-8eef-ab4f5e7b0499' submitting flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' 105819 AM prefect.flow_runs.worker Creating Kubernetes job... 105819 AM prefect.flow_runs.worker Failed to submit flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' to infrastructure. Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 917, in _create_job job = await batch_client.create_namespaced_job( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 192, in __call_api raise e File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api response_data = await self.request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 249, in POST return (await self.request("POST", url, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 206, in request raise ApiException(http_resp=r) kubernetes_asyncio.client.exceptions.ApiException: (400) Reason: Bad Request HTTP response headers: <CIMultiDictProxy('Audit-Id': '7eae52ac-bd6d-47c6-ad43-ff72a75cabd4', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'e92bf259-c16a-4007-b5de-f536895e9434', 'X-Kubernetes-Pf-Prioritylevel-Uid': '03359ab3-9994-4cfb-b068-e4b73a77e01c', 'Date': 'Thu, 21 Aug 2025 075824 GMT', 'Content-Length': '256')> HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job in version \"v1\" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'","reason":"BadRequest","code":400} The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 1263, in _submit_run_and_capture_errors result = await self.run( ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 745, in run job = await self._create_job(configuration, client) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped return await copy(fn, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 111, in call do = await self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 153, in iter result = await action(retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/_utils.py", line 99, in inner return call(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 420, in exc_check raise retry_exc.reraise() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 187, in reraise raise self.last_attempt.result() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 114, in call result = await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 929, in _create_job raise InfrastructureError( prefect.exceptions.InfrastructureError: Unable to create Kubernetes job: Bad Request: Job in version "v1" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$' 105824 AM prefect.flow_runs.worker Reported flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' as crashed: Flow run could not be submitted to infrastructure: InfrastructureError('Unable to create Kubernetes job: Bad Request: Job in version "v1" cannot be handled as a Job: quantities must match the regular expression \'^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\'') 105824 AM prefect.flow_runs.worker
m
Your message was too long, here's your message at the allowed limit: me envs { env { HOSTNAME Debug simple flow 10_58 STRESS_TEST passed PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL WARNING } name debug-simple-flow-10-58 image dev/prefect2 worker labels { cur_time 10-58 } cpu_limit 2000m namespace prefect2-dev cpu_request 200m memory_limit 1Gi stream_output true memory_request 1Gi finished_job_ttl 60 ttlSecondsAfterFinished 60 } main template job_configuration { env {{ env }} name {{ name }} labels {{ labels }} command {{ command }} namespace {{ namespace }} job_manifest { kind Job spec { template { spec { containers [ { env {{ env }} args {{ command }} name prefect-job image {{ image }} envFrom [ { secretRef { name secrets-vault } } { secretRef { name secrets-vault-base64-tokens } } ] resources { limits { cpu {{ cpu_limit | default('2') }} memory {{ memory_limit | default('4Gi') }} } requests { cpu {{ cpu_request | default('200m') }} memory {{ memory_request | default('1Gi') }} } } imagePullPolicy {{ image_pull_policy }} } ] Worker 'KubernetesWorker 8547351c-8150-4dc8-8eef-ab4f5e7b0499' submitting flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' 10 58 19 AM prefect flow_runs worker Creating Kubernetes job 10 58 19 AM prefect flow_runs worker Failed to submit flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' to infrastructure Traceback (most recent call last) File /usr/local/lib/python3 11/site-packages/prefect_kubernetes/worker py line 917 in _create_job job = await batch_client create_namespaced_job( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/kubernetes_asyncio/client/api_client py line 192 in __call_api raise e File /usr/local/lib/python3 11/site-packages/kubernetes_asyncio/client/api_client py line 185 in __call_api response_data = await self request( ^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/kubernetes_asyncio/client/rest py line 249 in POST return (await self request( POST url ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/kubernetes_asyncio/client/rest py line 206 in request raise ApiException(http_resp=r) kubernetes_asyncio client exceptions ApiException (400) Reason Bad Request HTTP response headers <CIMultiDictProxy('Audit-Id' '7eae52ac-bd6d-47c6-ad43-ff72a75cabd4' 'Cache-Control' 'no-cache private' 'Content-Type' 'application/json' 'X-Kubernetes-Pf-Flowschema-Uid' 'e92bf259-c16a-4007-b5de-f536895e9434' 'X-Kubernetes-Pf-Prioritylevel-Uid' '03359ab3-9994-4cfb-b068-e4b73a77e01c' 'Date' 'Thu 21 Aug 2025 07 58 24 GMT' 'Content-Length' '256')> HTTP response body { kind Status apiVersion v1 metadata {} status Failure message Job in version \ v1\ cannot be handled as a Job quantities must match the regular expression '^([+-]?[0-9 ]+)([eEinumkKMGTP]*[-+]?[0-9]*)$' reason BadRequest code 400} The above exception was the direct cause of the following exception Traceback (most recent call last) File /usr/local/lib/python3 11/site-packages/prefect/workers/base py line 1263 in _submit_run_and_capture_errors result = await self run( ^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/prefect_kubernetes/worker py line 745 in run job = await self _create_job(configuration client) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/tenacity/asyncio/__init__ py line 189 in async_wrapped return await copy(fn *args *kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/tenacity/asyncio/__init__ py line 111 in call do = await self iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/tenacity/asyncio/__init__ py line 153 in iter result = await action(retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/tenacity/_utils py line 99 in inner return call(*args *kwargs) ^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/tenacity/__init__ py line 420 in exc_check
raise retry_exc reraise() ^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/tenacity/__init__ py line 187 in reraise raise self last_attempt result() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/concurrent/futures/_base py line 449 in result return self __get_result() ^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/concurrent/futures/_base py line 401 in __get_result raise self _exception File /usr/local/lib/python3 11/site-packages/tenacity/asyncio/__init__ py line 114 in call result = await fn(*args **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3 11/site-packages/prefect_kubernetes/worker py line 929 in _create_job raise InfrastructureError( prefect exceptions InfrastructureError Unable to create Kubernetes job Bad Request Job in
o
@Marvin me envs { "env": { "HOSTNAME": "Debug simple flow 10_58", "STRESS_TEST": "passed", "PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING" }, "name": "debug-simple-flow-10-58", "image": "dev/prefect2:worker", "labels": { "cur_time": "10-58" }, "cpu_limit": "2000m", "namespace": "prefect2-dev", "cpu_request": "200m", "memory_limit": "1Gi", "stream_output": true, "memory_request": "1Gi", "finished_job_ttl": 60, "ttlSecondsAfterFinished": 60 } main template ... "job_configuration": { "env": "{{ env }}", "name": "{{ name }}", "labels": "{{ labels }}", "command": "{{ command }}", "namespace": "{{ namespace }}", "job_manifest": { "kind": "Job", "spec": { "template": { "spec": { "containers": [ { "env": "{{ env }}", "args": "{{ command }}", "name": "prefect-job", "image": "{{ image }}", "envFrom": [ { "secretRef": { "name": "secrets-vault" } }, { "secretRef": { "name": "secrets-vault-base64-tokens" } } ], "resources": { "limits": { "cpu": "{{ cpu_limit | default('2') }}", "memory": "{{ memory_limit | default('4Gi') }}" }, "requests": { "cpu": "{{ cpu_request | default('200m') }}", "memory": "{{ memory_request | default('1Gi') }}" } }, "imagePullPolicy": "{{ image_pull_policy }}" } ], ... Worker 'KubernetesWorker 8547351c-8150-4dc8-8eef-ab4f5e7b0499' submitting flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' 105819 AM prefect.flow_runs.worker Creating Kubernetes job... 105819 AM prefect.flow_runs.worker Failed to submit flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' to infrastructure. Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 917, in _create_job job = await batch_client.create_namespaced_job( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 192, in __call_api raise e File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api response_data = await self.request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 249, in POST return (await self.request("POST", url, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 206, in request raise ApiException(http_resp=r) kubernetes_asyncio.client.exceptions.ApiException: (400) Reason: Bad Request HTTP response headers: <CIMultiDictProxy('Audit-Id': '7eae52ac-bd6d-47c6-ad43-ff72a75cabd4', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'e92bf259-c16a-4007-b5de-f536895e9434', 'X-Kubernetes-Pf-Prioritylevel-Uid': '03359ab3-9994-4cfb-b068-e4b73a77e01c', 'Date': 'Thu, 21 Aug 2025 075824 GMT', 'Content-Length': '256')> HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job in version \"v1\" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'","reason":"BadRequest","code":400}
m
thought for 672.5 seconds
Thanks for sharing the template and logs — the 400 “quantities must match the regular expression …” error is coming from Kubernetes rejecting a value that’s parsed as a resource quantity. In practice, this almost always means one of the values under
resources.requests
or
resources.limits
is invalid (missing a unit, has a trailing space, wrong unit, or is empty). A few concrete things I see and how to fix/debug: 1) Add apiVersion to your Job manifest - Your snippet shows
"kind": "Job"
but no
"apiVersion"
. The default Prefect template uses
batch/v1
. Without it, the server may treat it as
v1
and fail to decode the object cleanly. - Update your base job template to include:
Copy code
"apiVersion": "batch/v1",
"kind": "Job",
2) Double-check your resource quantities - The values you posted look valid:
cpu: "2000m"
,
memory: "1Gi"
,
cpu: "200m"
,
memory: "1Gi"
. - Common pitfalls: - Trailing or leading whitespace: e.g.,
"1Gi "
or
"200m\t"
will fail. - Empty strings due to templating: if a var is unset and you don’t default it, it can render as
""
. - Wrong units:
Gi
,
Mi
,
m
, or plain integers like
"2"
are OK.
GB
or
GiB
are NOT. - Numbers without quotes that get coerced oddly by templating. 3) Make the worker print the rendered manifest (best next step) - Bump worker logging to DEBUG so it logs the full manifest it’s sending: - Set on the worker pod/deployment:
Copy code
PREFECT_LOGGING_LEVEL=DEBUG
PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=DEBUG
- Re-run the flow. The worker will log the final Job payload, letting you spot any stray spaces or empty strings. 4) Compare against the default Prefect template - Pull the default template to verify shape/fields:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes
- Ensure yours matches the structure, especially
apiVersion: batch/v1
and that
resources
sit under the container. 5) Server-side dry run with kubectl (quick validation) - Once you have the rendered manifest (from DEBUG logs), validate it directly:
Copy code
kubectl apply -n prefect2-dev --dry-run=server -f -
- Paste the manifest into stdin. This will surface the exact field that fails. Example corrected portion of your job_manifest (JSON-style for clarity):
Copy code
"job_manifest": {
  "apiVersion": "batch/v1",
  "kind": "Job",
  "metadata": {
    "generateName": "{{ name }}-",
    "labels": "{{ labels }}",
    "namespace": "{{ namespace }}"
  },
  "spec": {
    "backoffLimit": 0,
    "template": {
      "spec": {
        "containers": [
          {
            "name": "prefect-job",
            "image": "{{ image }}",
            "args": "{{ command }}",
            "env": "{{ env }}",
            "envFrom": [
              { "secretRef": { "name": "secrets-vault" } },
              { "secretRef": { "name": "secrets-vault-base64-tokens" } }
            ],
            "resources": {
              "limits": {
                "cpu": "{{ cpu_limit | default('2') }}",
                "memory": "{{ memory_limit | default('4Gi') }}"
              },
              "requests": {
                "cpu": "{{ cpu_request | default('200m') }}",
                "memory": "{{ memory_request | default('1Gi') }}"
              }
            },
            "imagePullPolicy": "{{ image_pull_policy }}"
          }
        ],
        "restartPolicy": "Never",
        "serviceAccountName": "{{ service_account_name }}"
      }
    },
    "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
  }
}
Links for reference: - Kubernetes worker docs: Prefect Kubernetes worker - Kubernetes quantities (valid units/formats): K8s resource units - Prefect K8s how-to: Prefect on Kubernetes
If you can re-run with DEBUG and paste the rendered
resources
section from the log, I can point out the exact offending value. My hunch is either a missing
apiVersion: batch/v1
or a subtle whitespace/empty string in one of the quantity fields.
o
@Marvin "resources": { "limits": { "memory": "{{ memory_limit }}" }, "requests": { "memory": "{{ memory_request }}" } Value error, The variables specified in the job configuration template must be present as properties in the variables schema. Your job configuration uses the following undeclared variable(s): memory_request ,memory_limit. how to pass these variables?
m
thought for 409.2 seconds
You’re seeing that error because
memory_limit
and
memory_request
are referenced in your job template but are not defined in the template’s variables schema. In Prefect 3.x, any
{{ variable }}
used in
job_configuration
must be declared under the
variables.properties
section of the work pool’s base job template. There are two parts to fix this: 1) Declare the variables in the work pool’s base job template - Add entries for
memory_limit
and
memory_request
to the
variables.properties
section. For example:
Copy code
{
  "job_configuration": {
    ...
    "job_manifest": {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "name": "prefect-job",
                "image": "{{ image }}",
                "args": "{{ command }}",
                "env": "{{ env }}",
                "resources": {
                  "limits": {
                    "memory": "{{ memory_limit }}"
                  },
                  "requests": {
                    "memory": "{{ memory_request }}"
                  }
                }
              }
            ],
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        },
        "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
      }
    }
  },
  "variables": {
    "description": "Variables for the Kubernetes worker.",
    "properties": {
      "memory_limit": {
        "title": "Memory limit",
        "type": "string",
        "default": "1Gi",
        "description": "K8s quantity, e.g. 512Mi or 1Gi"
      },
      "memory_request": {
        "title": "Memory request",
        "type": "string",
        "default": "512Mi",
        "description": "K8s quantity, e.g. 256Mi or 1Gi"
      },
      // keep existing properties like image, env, namespace, etc.
    }
  }
}
- Apply the updated base job template to your work pool: - Export the default to start from a known-good shape (optional):
Copy code
prefect work-pool get-default-base-job-template --type kubernetes --file base.json
- Edit
base.json
as above, then update your pool:
Copy code
prefect work-pool update "<your-pool-name>" --base-job-template base.json
2) Provide values for those variables (or use defaults) - If you set
default
values in the schema (as above), those will be used automatically. - You can override per deployment via `prefect.yaml`:
Copy code
deployments:
- name: my-deployment
  entrypoint: flow.py:my_flow
  work_pool:
    name: <your-pool-name>
    job_variables:
      memory_limit: "1Gi"
      memory_request: "1Gi"
- Or set them in the UI when creating/editing the deployment (Work pool variables section). Tips - Valid K8s quantities are critical; use values like
"512Mi"
or
"1Gi"
and avoid trailing spaces. See K8s resource units. - You can inspect your pool to confirm the variables schema:
Copy code
prefect work-pool inspect "<your-pool-name>"
- Reference docs: - Work pools and base job templates: Work pools - Kubernetes worker: Prefect Kubernetes worker - Prefect on Kubernetes: How-to: Kubernetes If you share your current base job template’s
variables
section, I can suggest an exact patch.
o
@Marvin i dont need this "Event Feed" how to turn off it?
m
thought for 101.5 seconds
Got it — do you want to: - hide the Event Feed in the UI sidebar, or - stop events from being recorded/emitted entirely? There isn’t a built-in toggle to remove the Event Feed from the UI. If your goal is to stop events from being stored/visible, the path depends on your environment: - Prefect Cloud: you can’t disable events globally; best is to ignore/filter them in the UI. - Self-hosted Prefect Server: you can disable the event service so events aren’t persisted. I can share exact steps, but I’ll need how you’re running Server (Docker Compose, Helm, etc.) and your Prefect version. If filtering is enough, you can use the Event Feed’s search to narrow to nothing (e.g., apply a time window or resource filters that don’t match your runs), but the section will still exist in the nav. Let me know which route you want and your environment (Cloud vs. self-hosted + how you deploy). For background on events: Prefect events.
o
@Marvin i have self hosted prefect3.10 and i want stop events from being recorded/emitted entirely
m
thought for 132.7 seconds
Understood. On self‑hosted Prefect 3.x you can stop events by disabling the server’s event services. Set these env vars on the Prefect Server API container and restart it: - PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED=false - PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED=false - (Optional) PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED=false — disables websocket “stream out” to the UI Examples - Docker Compose
Copy code
services:
  server:
    image: prefecthq/prefect:3.10.0
    environment:
      PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED: "false"
      PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED: "false"
      PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED: "false"
- Helm values (typical chart allows extra env on the server pod)
Copy code
server:
  extraEnv:
    - name: PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED
      value: "false"
    - name: PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED
      value: "false"
    - name: PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED
      value: "false"
- Generic Kubernetes manifest snippet for the server deployment
Copy code
env:
- name: PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED
  value: "false"
- name: PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED
  value: "false"
- name: PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED
  value: "false"
What this does - Event logger OFF: the API will not accept/forward incoming events. - Event persister OFF: even if any events slip through, they will not be written to the DB. - Stream out OFF: the UI will not receive any event stream (Event Feed will effectively be empty). Notes - Workers/flows can still attempt to emit events, but the server will drop them. There isn’t a documented client-side toggle to globally disable event emission; server-side disablement is the reliable approach. - Disabling events will disable Automations and any features that rely on events. Docs - Events overview: Prefect events - Settings reference (env vars): Settings reference If you share how you’re running the server (Docker Compose vs. Helm vs. raw K8s), I can give you an exact patch for your setup.
o
@Marvin I got a lot errors like DETAIL: Key (flow_run_id, task_key, dynamic_key)=(0198d8a0-079b-7c65-b751-aa2761ba8946, delete_flow_run_wo_deployments-d3b0e697, 94aedf5c-41c3-4276-af85-b81b31bb94b0) already exists. [SQL: INSERT INTO task_run (flow_run_id, task_key, dynamic_key, flow_run_run_count, empirical_policy, task_inputs, tags, labels, state_id, name, state_type, state_name, state_timestamp, run_count, expected_start_time, start_time, total_run_time, id, created, updated) VALUES ($1::UUID, $2::VARCHAR, $3::VARCHAR, $4::INTEGER, $5, $6, $7::JSONB, $8::JSONB, $9::UUID, $10::VARCHAR, $11::state_type, $12::VARCHAR, $13::TIMESTAMP WITH TIME ZONE, $14::INTEGER, $15::TIMESTAMP WITH TIME ZONE, $16::TIMESTAMP WITH TIME ZONE, $17::INTERVAL, $18::UUID, $19::TIMESTAMP WITH TIME ZONE, $20::TIMESTAMP WITH TIME ZONE) ON CONFLICT (id) DO UPDATE SET flow_run_id = $21::UUID, task_key = $22::VARCHAR, dynamic_key = $23::VARCHAR, flow_run_run_count = $24::INTEGER, empirical_policy = $25, task_inputs = $26, tags = $27::JSONB, labels = $28::JSONB, state_id = $29::UUID, name = $30::VARCHAR, state_type = $31::state_type, state_name = $32::VARCHAR, state_timestamp = $33::TIMESTAMP WITH TIME ZONE, run_count = $34::INTEGER, expected_start_time = $35::TIMESTAMP WITH TIME ZONE, start_time = $36::TIMESTAMP WITH TIME ZONE, total_run_time = $37::INTERVAL, id = $38::UUID, updated = $39::TIMESTAMP WITH TIME ZONE WHERE task_run.state_timestamp < $40::TIMESTAMP WITH TIME ZONE] [parameters: ('0198d8a0-079b-7c65-b751-aa2761ba8946', 'delete_flow_run_wo_deployments-d3b0e697', '94aedf5c-41c3-4276-af85-b81b31bb94b0', 1, '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', '{}', '[]', '{}', '0198d8df-e553-7cac-8674-4fd2807f8a92', 'delete_flow_run_wo_deployments-94a', 'RUNNING', 'Running', DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), 1, DateTime(2025, 8, 23, 21, 40, 8, 642125, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), datetime.timedelta(0), '0198d8df-e542-76d6-9c2d-cf8f180970d0', DateTime(2025, 8, 23, 21, 40, 9, 177067, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 9, 267849, tzinfo=Timezone('UTC')), '0198d8a0-079b-7c65-b751-aa2761ba8946', 'delete_flow_run_wo_deployments-d3b0e697', '94aedf5c-41c3-4276-af85-b81b31bb94b0', 1, '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', '{}', '[]', '{}', '0198d8df-e553-7cac-8674-4fd2807f8a92', 'delete_flow_run_wo_deployments-94a', 'RUNNING', 'Running', DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), 1, DateTime(2025, 8, 23, 21, 40, 8, 642125, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), datetime.timedelta(0), '0198d8df-e542-76d6-9c2d-cf8f180970d0', DateTime(2025, 8, 23, 21, 40, 9, 177719, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')))] (Background on this error at: https://sqlalche.me/e/20/gkpj)
m
Error: Tool 'read_github_issues' exceeded max retries count of 1
Sorry, I encountered an error while processing your request. Please try again.
o
@Marvin I got a lot errors like DETAIL: Key (flow_run_id, task_key, dynamic_key)=(0198d8a0-079b-7c65-b751-aa2761ba8946, delete_flow_run_wo_deployments-d3b0e697, 94aedf5c-41c3-4276-af85-b81b31bb94b0) already exists. [SQL: INSERT INTO task_run (flow_run_id, task_key, dynamic_key, flow_run_run_count, empirical_policy, task_inputs, tags, labels, state_id, name, state_type, state_name, state_timestamp, run_count, expected_start_time, start_time, total_run_time, id, created, updated) VALUES ($1::UUID, $2::VARCHAR, $3::VARCHAR, $4::INTEGER, $5, $6, $7::JSONB, $8::JSONB, $9::UUID, $10::VARCHAR, $11::state_type, $12::VARCHAR, $13::TIMESTAMP WITH TIME ZONE, $14::INTEGER, $15::TIMESTAMP WITH TIME ZONE, $16::TIMESTAMP WITH TIME ZONE, $17::INTERVAL, $18::UUID, $19::TIMESTAMP WITH TIME ZONE, $20::TIMESTAMP WITH TIME ZONE) ON CONFLICT (id) DO UPDATE SET flow_run_id = $21::UUID, task_key = $22::VARCHAR, dynamic_key = $23::VARCHAR, flow_run_run_count = $24::INTEGER, empirical_policy = $25, task_inputs = $26, tags = $27::JSONB, labels = $28::JSONB, state_id = $29::UUID, name = $30::VARCHAR, state_type = $31::state_type, state_name = $32::VARCHAR, state_timestamp = $33::TIMESTAMP WITH TIME ZONE, run_count = $34::INTEGER, expected_start_time = $35::TIMESTAMP WITH TIME ZONE, start_time = $36::TIMESTAMP WITH TIME ZONE, total_run_time = $37::INTERVAL, id = $38::UUID, updated = $39::TIMESTAMP WITH TIME ZONE WHERE task_run.state_timestamp < $40::TIMESTAMP WITH TIME ZONE] [parameters: ('0198d8a0-079b-7c65-b751-aa2761ba8946', 'delete_flow_run_wo_deployments-d3b0e697', '94aedf5c-41c3-4276-af85-b81b31bb94b0', 1, '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', '{}', '[]', '{}', '0198d8df-e553-7cac-8674-4fd2807f8a92', 'delete_flow_run_wo_deployments-94a', 'RUNNING', 'Running', DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), 1, DateTime(2025, 8, 23, 21, 40, 8, 642125, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), datetime.timedelta(0), '0198d8df-e542-76d6-9c2d-cf8f180970d0', DateTime(2025, 8, 23, 21, 40, 9, 177067, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 9, 267849, tzinfo=Timezone('UTC')), '0198d8a0-079b-7c65-b751-aa2761ba8946', 'delete_flow_run_wo_deployments-d3b0e697', '94aedf5c-41c3-4276-af85-b81b31bb94b0', 1, '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', '{}', '[]', '{}', '0198d8df-e553-7cac-8674-4fd2807f8a92', 'delete_flow_run_wo_deployments-94a', 'RUNNING', 'Running', DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), 1, DateTime(2025, 8, 23, 21, 40, 8, 642125, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')), datetime.timedelta(0), '0198d8df-e542-76d6-9c2d-cf8f180970d0', DateTime(2025, 8, 23, 21, 40, 9, 177719, tzinfo=Timezone('UTC')), DateTime(2025, 8, 23, 21, 40, 8, 659242, tzinfo=Timezone('UTC')))] (Background on this error at: https://sqlalche.me/e/20/gkpj)
m
thought for 235.3 seconds
Thanks for sharing the details — that error is a PostgreSQL unique constraint violation on the
task_run
table’s composite key
(flow_run_id, task_key, dynamic_key)
. In Prefect 3.x, the server “task run recorder” service inserts task runs from an event stream. If two server processes handle the same task-run creation event concurrently (e.g., due to out-of-order or duplicated messages, or multiple server replicas without proper causal ordering), both will try to insert the same unique triple and you’ll see exactly this error. The
ON CONFLICT (id)
in the SQL won’t help because the conflict is on the composite unique index, not the primary key. What we’ve seen/are tracking - This pattern shows up in multi-server/self-hosted deployments when multiple server replicas process the same messages without proper causal ordering. See: - #17767: Deadlocks and task_run insert races - #18654: Unique constraint violations and multi-server messaging notes - Prefect 3.x added causal ordering improvements; for single-replica servers this is in-memory, and for multi-replica you should use the Redis-backed messaging so all replicas coordinate on event order. What I suggest 1) Confirm your setup - Are you self-hosted or using Prefect Cloud? - Prefect version (
prefect version
), number of server replicas, and whether Redis messaging is enabled. - Database engine/version (looks like Postgres from the error). 2) If you are running multiple server replicas, ensure Redis messaging is enabled and shared by all replicas - Multi-replica setups should not use the default in-memory messaging. Configure the server to use Redis messaging so events are causally ordered across replicas. - Docs for self-hosted + Redis setup: Self-hosted Redis setup - Make sure every server replica points at the same Redis and that the prefect-redis integration is installed in the server image. 3) Upgrade to the latest 3.4.x - There have been incremental fixes and improvements around task run recording/ordering in the 3.4.x series. Please upgrade both server and workers to the latest (currently 3.4.14) and retest. - Release notes: 3.4.14 release 4) Quick isolation test - Temporarily scale to a single server replica. If the errors stop, that strongly indicates a race between replicas and points back to enabling/validating Redis messaging. 5) If you still see the violations after the above - Share a short window of server logs around the first unique violation including any “task_run_recorder” messages, your server config (messaging backend and Redis URL), and version info. I can help dig deeper. A few clarifications - You don’t need to (and shouldn’t) change `task_key`/`dynamic_key` in your code; this is not caused by your flow/task definitions. It’s a server-side concurrency/ordering issue. - The error can be noisy in logs but is typically tied to the insert race; fixing ordering usually resolves it at the source. If you can confirm your Prefect version, number of server replicas, and whether Redis messaging is enabled/configured, I can tailor the next steps.
o
@Marvin i have hosted on-premise prefect version 3.4.10, have one instance of server Can it correlate with that I have a flow that launches often with a specified flow_run_name?
Copy code
dump_mysql_table_batch_by_id.with_options(
    flow_run_name=f"dump sharded table w/o column. Table number {shard_id=}/{table_amount}",
)(...)
Got an error
Copy code
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1963, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 943, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 580, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 558, in _prepare_and_execute
    self._handle_exception(error)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 508, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 792, in _handle_exception
    raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "uq_task_run__flow_run_id_task_key_dynamic_key"
DETAIL:  Key (flow_run_id, task_key, dynamic_key)=(0198d8a0-079b-7c65-b751-aa2761ba8946, delete_flow_run_wo_deployments-d3b0e697, 94aedf5c-41c3-4276-af85-b81b31bb94b0) already exists.
m
thought for 53.7 seconds
Short answer: it’s very unlikely to be caused by your
flow_run_name
. That name is not part of the unique key in your error, and Prefect does not enforce uniqueness on flow run names. What the error means - The unique constraint is on
(flow_run_id, task_key, dynamic_key)
. That’s entirely within a single flow run and identifies a single logical task occurrence. - You’ll see this when two “create task run” events for the same logical task occurrence are processed concurrently, racing to insert the same row. Why you can see this on 3.4.10 even with one server - There were known races in the task run recorder/event handling that could emit/process duplicate task-run-create events out of order, even on a single server under bursty load or after reconnects/retries. - These were addressed with causal ordering fixes that landed after 3.4.10. Recommended next steps 1) Upgrade your server to the latest 3.4.x (currently 3.4.14) — this includes ordering/recorder improvements that reduce or eliminate these unique violations. - Release notes: 3.4.14 - Related background/issues: #17767 and #18654 2) Sanity checks - Ensure you truly have a single API server instance (no leftover replica or second process). You said you do — great. - If you’re seeing this tied to restarts or bursts, note the first occurrence time and grab server logs around it (look for “task_run_recorder” entries). 3) Not related to flow_run_name - Specifying a
flow_run_name
with
with_options(...)
is fine and not connected to this constraint. The composite key uses
flow_run_id
(UUID), not name. If you can upgrade and still observe violations, please share: - Prefect version after upgrade - A short server log snippet around the first violation - Whether the violations appear for specific tasks or broadly I can help dig into the recorder path with those details.