Oleksandr Riabyi
08/20/2025, 9:40 AMOleksandr Riabyi
08/20/2025, 9:40 AMMarvin
08/20/2025, 9:40 AMMarvin
08/20/2025, 9:42 AMFlow
or FlowRun
that gives you the source file path. But you can get it without inspect
.
Two practical options:
1) From the flow function object (works for local runs and most deployments)
from pathlib import Path
def post_to_slack(flow: Flow, flow_run: FlowRun, state: State):
file_path = Path(flow.fn.__code__.co_filename) # absolute path
# make it project-relative if possible
try:
rel_path = file_path.relative_to(Path.cwd())
except ValueError:
rel_path = file_path
# rel_path.as_posix() -> e.g. "prefect/flows/debug/debug_flow.py"
...
2) From the deployment metadata (only when the run came from a deployment)
- If flow_run.deployment_id
is set, fetch the deployment and read its entrypoint
(typically something like prefect/flows/debug/debug_flow.py:debug_simple_flow
).
- Prefect 3 orchestration client can read the deployment; sketch:
import anyio
from prefect.client.orchestration import PrefectClient
def post_to_slack(flow: Flow, flow_run: FlowRun, state: State):
entrypoint = None
if flow_run.deployment_id:
async def _get_entrypoint():
async with PrefectClient() as client:
dep = await client.read_deployment(flow_run.deployment_id)
return getattr(dep, "entrypoint", None)
entrypoint = anyio.run(_get_entrypoint)
# Fallback to code object if no deployment or entrypoint
file_path = Path(flow.fn.__code__.co_filename)
...
Notes and caveats:
- In containers/remote environments, co_filename
will be the path inside the image/worker. If you need a repo-relative path, normalize against your known project root (or use the deployment’s entrypoint
which is already repo-relative).
- There’s no flow.path
or flow_run.path
field in Prefect 3; this is by design.
References:
- Prefect flow internals (no entrypoint attribute on Flow/FlowRun): prefect/src/prefect/flows.py
If you’d prefer one approach over the other (e.g., always use deployment entrypoint when available), say the word and I’ll tighten up the function.Oleksandr Riabyi
08/20/2025, 12:11 PMjob_variables = {
"env": {
"STRESS_TEST": "passed",
"HOSTNAME": flow.name,
"PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING",
},
"namespace": DEFAULT_NAMESPACE,
"stream_output": True,
"labels": {
"team": "data-eng",
"flow": clean_name,
},
"finished_job_ttl": 60,
"job": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "prefect-job",
"resources": {
"requests": {
"cpu": flow_config.cpu_request,
"memory": flow_config.memory_request,
},
"limits": {
"cpu": flow_config.cpu_limit,
"memory": flow_config.memory_limit,
},
},
}
]
}
}
}
}
}
flow.deploy(
name=clean_name,
work_queue_name="default",
work_pool_name=os.getenv("WORK_POOL", "k8s-worker"),
image=DEFAULT_IMAGE,
push=False,
build=False,
paused=(schedule.is_schedule_active is False),
print_next_steps=False,
ignore_warnings=True,
job_variables=job_variables,
tags=get_flow_tags(flow_path),
description=get_flow_description(flow, flow_config),
schedule=cron_schedule,
parameters=schedule.parameters,
)
Marvin
08/20/2025, 12:11 PMMarvin
08/20/2025, 12:13 PMjob_variables = {
"namespace": DEFAULT_NAMESPACE,
"stream_output": True,
"labels": {"team": "data-eng", "flow": clean_name},
"finished_job_ttl": 60,
"env": {
"STRESS_TEST": "passed",
"HOSTNAME": flow.name,
"PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING",
},
"job": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "prefect-job",
"resources": {
"requests": {
"cpu": flow_config.cpu_request, # e.g. "500m"
"memory": flow_config.memory_request, # e.g. "512Mi"
},
"limits": {
"cpu": flow_config.cpu_limit, # e.g. "1"
"memory": flow_config.memory_limit, # e.g. "1Gi"
},
},
}
]
}
}
}
}
}
Key points so your limits actually apply:
- Use valid Kubernetes units: CPU like "500m" or "1", memory like "512Mi" or "1Gi". Raw ints/floats can be rejected or silently ignored by the cluster.
- Container name must match the base template. Prefect’s default is "prefect-job"; if your pool template uses a different name, your resources won’t land.
- Deep merge behavior: Prefect merges your job_variables
into the base job template. Dicts are merged; lists may be replaced. The above pattern is known to work with the default template. If you find it replaces the whole containers list (and breaks the job), set these defaults in the work pool’s base job template instead, or include the full container entry (name, image, command) in your override.
- Namespace-level LimitRange/ResourceQuota can cap or override your values. If the pod runs but with different resources, check those.
You can verify what landed with:
kubectl -n <ns> get pod <pod> -o jsonpath='{.spec.containers[?(@.name=="prefect-job")].resources}'
Docs, for when you need something cheerier than YAML:
- Workers overview: Workers concept
- Kubernetes specifics: Kubernetes worker how-to
If you want, I can help tailor this to your pool’s exact base job template to avoid any merge surprises.Oleksandr Riabyi
08/20/2025, 12:22 PMMarvin
08/20/2025, 12:22 PMMarvin
08/20/2025, 12:23 PMjob_variables
aren’t being read where you think they are.
In Prefect 3’s Kubernetes worker, job_variables are validated against the worker’s variable schema and rendered into the pool’s Base Job Template. By default, that template does not deep-merge arbitrary job.spec.template.spec.containers
overrides. Your top-level env
, namespace
, etc. work because the template exposes them. Your nested job
block is ignored, so no resources land.
What to do instead
Option A: Use the resource keys the template actually exposes
Most default Kubernetes pool templates expect top-level variables for resources. Try:
job_variables = {
"namespace": DEFAULT_NAMESPACE,
"stream_output": True,
"labels": {"team": "data-eng", "flow": clean_name},
"finished_job_ttl": 60,
"env": {
"STRESS_TEST": "passed",
"HOSTNAME": flow.name,
"PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL": "WARNING",
},
# Use top-level resource variables the template renders
"cpu_request": str(flow_config.cpu_request), # e.g. "500m"
"memory_request": str(flow_config.memory_request), # e.g. "512Mi"
"cpu_limit": str(flow_config.cpu_limit), # e.g. "1"
"memory_limit": str(flow_config.memory_limit), # e.g. "1Gi"
}
Then redeploy and run. If the pool’s template wires these variables, you’ll see non-empty resources
and QoS change to Burstable/Guaranteed.
Option B: Customize the pool’s Base Job Template
If your template doesn’t render those keys, edit the work pool’s Base Job Template (UI: Work Pools -> your pool -> Edit template) and add:
resources:
requests:
cpu: "{{ cpu_request }}"
memory: "{{ memory_request }}"
limits:
cpu: "{{ cpu_limit }}"
memory: "{{ memory_limit }}"
If you truly want to inject raw Kubernetes under job
or pod
, you must add corresponding Jinja hooks in the template for those fields; the default template won’t merge an arbitrary job
dict.
Important clues in your pod
- work-pool-name is “default-agent-pool”, not “k8s-worker”. Make sure the deployment you ran actually targets the pool you think it does. If you configured resources for one pool but the run used another, you’ll get… nothing.
- qosClass: BestEffort confirms no requests/limits were applied.
- env vars are present, showing top-level variables were rendered; the resource keys just weren’t.
Sanity checks
- Use valid units: CPU like "500m" or "1", memory like "512Mi" or "1Gi".
- Container name in the template is usually "prefect-job"; if you customize the template to accept a container block, ensure names match or target the right index.
- Inspect the pool’s Base Job Template to see which variables it actually exposes.
Docs and template references
- Workers overview: Workers concept
- Kubernetes worker how-to: Kubernetes worker how-to
- Default K8s worker template (for variable names): Prefect Kubernetes job template
If you paste your pool’s Base Job Template, I’ll map exactly which keys you need to set so the limits actually stick. I live for YAML, sadly.Oleksandr Riabyi
08/21/2025, 8:39 AMfrom_file
for creation.",
"secret_fields": [],
"block_type_slug": "kubernetes-cluster-config",
"block_schema_references": {}
}
},
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the variables
section of the default\nbase job template."
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"command": "{{ command }}",
"namespace": "{{ namespace }}",
"job_manifest": {
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"env": "{{ env }}",
"args": "{{ command }}",
"name": "prefect-job",
"image": "{{ image }}",
"envFrom": [
{
"secretRef": {
"name": "secrets-vault"
}
},
{
"secretRef": {
"name": "secrets-vault-base64-tokens"
}
}
],
"resources": {
"limits": {
"cpu": "{{ cpu_limit | default('2') }}",
"memory": "{{ memory_limit | default('4Gi') }}"
},
"requests": {
"cpu": "{{ cpu_request | default('200m') }}",
"memory": "{{ memory_request | default('1Gi') }}"
}
},
"imagePullPolicy": "{{ image_pull_policy }}"
}
],
"completions": 1,
"parallelism": 1,
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}"
}
},
"backoffLimit": 0,
"ttlSecondsAfterFinished": 60
},
"metadata": {
"labels": "{{ labels }}",
"namespace": "{{ namespace }}",
"generateName": "{{ name }}-"
},
"apiVersion": "batch/v1"
},
"stream_output": "{{ stream_output }}",
"cluster_config": "{{ cluster_config }}",
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
}
}
response_data = await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 249, in POST
return (await self.request("POST", url,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 206, in request
raise ApiException(http_resp=r)
kubernetes_asyncio.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: <CIMultiDictProxy('Audit-Id': '7eae52ac-bd6d-47c6-ad43-ff72a75cabd4', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'e92bf259-c16a-4007-b5de-f536895e9434', 'X-Kubernetes-Pf-Prioritylevel-Uid': '03359ab3-9994-4cfb-b068-e4b73a77e01c', 'Date': 'Thu, 21 Aug 2025 075824 GMT', 'Content-Length': '256')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job in version \"v1\" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'","reason":"BadRequest","code":400}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 1263, in _submit_run_and_capture_errors
result = await self.run(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 745, in run
job = await self._create_job(configuration, client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
return await copy(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 111, in call
do = await self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
result = await action(retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/_utils.py", line 99, in inner
return call(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 420, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 187, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 114, in call
result = await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 929, in _create_job
raise InfrastructureError(
prefect.exceptions.InfrastructureError: Unable to create Kubernetes job: Bad Request: Job in version "v1" cannot be handled as a Job: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'
105824 AM
prefect.flow_runs.worker
Reported flow run '9626fb28-5cf8-48dd-8d69-e9a5dbab9444' as crashed: Flow run could not be submitted to infrastructure:
InfrastructureError('Unable to create Kubernetes job: Bad Request: Job in version "v1" cannot be handled as a Job: quantities must match the regular expression \'^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\'')
105824 AM
prefect.flow_runs.workerMarvin
08/21/2025, 8:39 AMOleksandr Riabyi
08/21/2025, 8:40 AMMarvin
08/21/2025, 8:40 AMMarvin
08/21/2025, 8:40 AMOleksandr Riabyi
08/21/2025, 8:41 AMMarvin
08/21/2025, 8:41 AMMarvin
08/21/2025, 8:52 AMresources.requests
or resources.limits
is invalid (missing a unit, has a trailing space, wrong unit, or is empty).
A few concrete things I see and how to fix/debug:
1) Add apiVersion to your Job manifest
- Your snippet shows "kind": "Job"
but no "apiVersion"
. The default Prefect template uses batch/v1
. Without it, the server may treat it as v1
and fail to decode the object cleanly.
- Update your base job template to include:
"apiVersion": "batch/v1",
"kind": "Job",
2) Double-check your resource quantities
- The values you posted look valid: cpu: "2000m"
, memory: "1Gi"
, cpu: "200m"
, memory: "1Gi"
.
- Common pitfalls:
- Trailing or leading whitespace: e.g., "1Gi "
or "200m\t"
will fail.
- Empty strings due to templating: if a var is unset and you don’t default it, it can render as ""
.
- Wrong units: Gi
, Mi
, m
, or plain integers like "2"
are OK. GB
or GiB
are NOT.
- Numbers without quotes that get coerced oddly by templating.
3) Make the worker print the rendered manifest (best next step)
- Bump worker logging to DEBUG so it logs the full manifest it’s sending:
- Set on the worker pod/deployment:
PREFECT_LOGGING_LEVEL=DEBUG
PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=DEBUG
- Re-run the flow. The worker will log the final Job payload, letting you spot any stray spaces or empty strings.
4) Compare against the default Prefect template
- Pull the default template to verify shape/fields:
prefect work-pool get-default-base-job-template --type kubernetes
- Ensure yours matches the structure, especially apiVersion: batch/v1
and that resources
sit under the container.
5) Server-side dry run with kubectl (quick validation)
- Once you have the rendered manifest (from DEBUG logs), validate it directly:
kubectl apply -n prefect2-dev --dry-run=server -f -
- Paste the manifest into stdin. This will surface the exact field that fails.
Example corrected portion of your job_manifest (JSON-style for clarity):
"job_manifest": {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"generateName": "{{ name }}-",
"labels": "{{ labels }}",
"namespace": "{{ namespace }}"
},
"spec": {
"backoffLimit": 0,
"template": {
"spec": {
"containers": [
{
"name": "prefect-job",
"image": "{{ image }}",
"args": "{{ command }}",
"env": "{{ env }}",
"envFrom": [
{ "secretRef": { "name": "secrets-vault" } },
{ "secretRef": { "name": "secrets-vault-base64-tokens" } }
],
"resources": {
"limits": {
"cpu": "{{ cpu_limit | default('2') }}",
"memory": "{{ memory_limit | default('4Gi') }}"
},
"requests": {
"cpu": "{{ cpu_request | default('200m') }}",
"memory": "{{ memory_request | default('1Gi') }}"
}
},
"imagePullPolicy": "{{ image_pull_policy }}"
}
],
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}"
}
},
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
}
}
Links for reference:
- Kubernetes worker docs: Prefect Kubernetes worker
- Kubernetes quantities (valid units/formats): K8s resource units
- Prefect K8s how-to: Prefect on KubernetesMarvin
08/21/2025, 8:52 AMresources
section from the log, I can point out the exact offending value. My hunch is either a missing apiVersion: batch/v1
or a subtle whitespace/empty string in one of the quantity fields.Oleksandr Riabyi
08/21/2025, 9:35 AMMarvin
08/21/2025, 9:35 AMMarvin
08/21/2025, 9:42 AMmemory_limit
and memory_request
are referenced in your job template but are not defined in the template’s variables schema. In Prefect 3.x, any {{ variable }}
used in job_configuration
must be declared under the variables.properties
section of the work pool’s base job template.
There are two parts to fix this:
1) Declare the variables in the work pool’s base job template
- Add entries for memory_limit
and memory_request
to the variables.properties
section. For example:
{
"job_configuration": {
...
"job_manifest": {
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "prefect-job",
"image": "{{ image }}",
"args": "{{ command }}",
"env": "{{ env }}",
"resources": {
"limits": {
"memory": "{{ memory_limit }}"
},
"requests": {
"memory": "{{ memory_request }}"
}
}
}
],
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}"
}
},
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
}
}
},
"variables": {
"description": "Variables for the Kubernetes worker.",
"properties": {
"memory_limit": {
"title": "Memory limit",
"type": "string",
"default": "1Gi",
"description": "K8s quantity, e.g. 512Mi or 1Gi"
},
"memory_request": {
"title": "Memory request",
"type": "string",
"default": "512Mi",
"description": "K8s quantity, e.g. 256Mi or 1Gi"
},
// keep existing properties like image, env, namespace, etc.
}
}
}
- Apply the updated base job template to your work pool:
- Export the default to start from a known-good shape (optional):
prefect work-pool get-default-base-job-template --type kubernetes --file base.json
- Edit base.json
as above, then update your pool:
prefect work-pool update "<your-pool-name>" --base-job-template base.json
2) Provide values for those variables (or use defaults)
- If you set default
values in the schema (as above), those will be used automatically.
- You can override per deployment via `prefect.yaml`:
deployments:
- name: my-deployment
entrypoint: flow.py:my_flow
work_pool:
name: <your-pool-name>
job_variables:
memory_limit: "1Gi"
memory_request: "1Gi"
- Or set them in the UI when creating/editing the deployment (Work pool variables section).
Tips
- Valid K8s quantities are critical; use values like "512Mi"
or "1Gi"
and avoid trailing spaces. See K8s resource units.
- You can inspect your pool to confirm the variables schema:
prefect work-pool inspect "<your-pool-name>"
- Reference docs:
- Work pools and base job templates: Work pools
- Kubernetes worker: Prefect Kubernetes worker
- Prefect on Kubernetes: How-to: Kubernetes
If you share your current base job template’s variables
section, I can suggest an exact patch.Oleksandr Riabyi
08/21/2025, 1:58 PMMarvin
08/21/2025, 1:58 PMMarvin
08/21/2025, 1:59 PMOleksandr Riabyi
08/21/2025, 2:00 PMMarvin
08/21/2025, 2:00 PMMarvin
08/21/2025, 2:02 PMservices:
server:
image: prefecthq/prefect:3.10.0
environment:
PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED: "false"
PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED: "false"
PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED: "false"
- Helm values (typical chart allows extra env on the server pod)
server:
extraEnv:
- name: PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED
value: "false"
- name: PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED
value: "false"
- name: PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED
value: "false"
- Generic Kubernetes manifest snippet for the server deployment
env:
- name: PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED
value: "false"
- name: PREFECT_SERVER_SERVICES_EVENT_PERSISTER_ENABLED
value: "false"
- name: PREFECT_SERVER_EVENTS_STREAM_OUT_ENABLED
value: "false"
What this does
- Event logger OFF: the API will not accept/forward incoming events.
- Event persister OFF: even if any events slip through, they will not be written to the DB.
- Stream out OFF: the UI will not receive any event stream (Event Feed will effectively be empty).
Notes
- Workers/flows can still attempt to emit events, but the server will drop them. There isn’t a documented client-side toggle to globally disable event emission; server-side disablement is the reliable approach.
- Disabling events will disable Automations and any features that rely on events.
Docs
- Events overview: Prefect events
- Settings reference (env vars): Settings reference
If you share how you’re running the server (Docker Compose vs. Helm vs. raw K8s), I can give you an exact patch for your setup.Oleksandr Riabyi
08/25/2025, 8:21 AMMarvin
08/25/2025, 8:21 AMMarvin
08/25/2025, 8:22 AMOleksandr Riabyi
08/25/2025, 8:23 AMMarvin
08/25/2025, 8:23 AMMarvin
08/25/2025, 8:27 AMtask_run
table’s composite key (flow_run_id, task_key, dynamic_key)
. In Prefect 3.x, the server “task run recorder” service inserts task runs from an event stream. If two server processes handle the same task-run creation event concurrently (e.g., due to out-of-order or duplicated messages, or multiple server replicas without proper causal ordering), both will try to insert the same unique triple and you’ll see exactly this error. The ON CONFLICT (id)
in the SQL won’t help because the conflict is on the composite unique index, not the primary key.
What we’ve seen/are tracking
- This pattern shows up in multi-server/self-hosted deployments when multiple server replicas process the same messages without proper causal ordering. See:
- #17767: Deadlocks and task_run insert races
- #18654: Unique constraint violations and multi-server messaging notes
- Prefect 3.x added causal ordering improvements; for single-replica servers this is in-memory, and for multi-replica you should use the Redis-backed messaging so all replicas coordinate on event order.
What I suggest
1) Confirm your setup
- Are you self-hosted or using Prefect Cloud?
- Prefect version (prefect version
), number of server replicas, and whether Redis messaging is enabled.
- Database engine/version (looks like Postgres from the error).
2) If you are running multiple server replicas, ensure Redis messaging is enabled and shared by all replicas
- Multi-replica setups should not use the default in-memory messaging. Configure the server to use Redis messaging so events are causally ordered across replicas.
- Docs for self-hosted + Redis setup: Self-hosted Redis setup
- Make sure every server replica points at the same Redis and that the prefect-redis integration is installed in the server image.
3) Upgrade to the latest 3.4.x
- There have been incremental fixes and improvements around task run recording/ordering in the 3.4.x series. Please upgrade both server and workers to the latest (currently 3.4.14) and retest.
- Release notes: 3.4.14 release
4) Quick isolation test
- Temporarily scale to a single server replica. If the errors stop, that strongly indicates a race between replicas and points back to enabling/validating Redis messaging.
5) If you still see the violations after the above
- Share a short window of server logs around the first unique violation including any “task_run_recorder” messages, your server config (messaging backend and Redis URL), and version info. I can help dig deeper.
A few clarifications
- You don’t need to (and shouldn’t) change `task_key`/`dynamic_key` in your code; this is not caused by your flow/task definitions. It’s a server-side concurrency/ordering issue.
- The error can be noisy in logs but is typically tied to the insert race; fixing ordering usually resolves it at the source.
If you can confirm your Prefect version, number of server replicas, and whether Redis messaging is enabled/configured, I can tailor the next steps.Oleksandr Riabyi
08/25/2025, 8:34 AMdump_mysql_table_batch_by_id.with_options(
flow_run_name=f"dump sharded table w/o column. Table number {shard_id=}/{table_amount}",
)(...)
Got an error
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1963, in _exec_single_context
self.dialect.do_execute(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 943, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 580, in execute
self._adapt_connection.await_(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 558, in _prepare_and_execute
self._handle_exception(error)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 508, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 792, in _handle_exception
raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "uq_task_run__flow_run_id_task_key_dynamic_key"
DETAIL: Key (flow_run_id, task_key, dynamic_key)=(0198d8a0-079b-7c65-b751-aa2761ba8946, delete_flow_run_wo_deployments-d3b0e697, 94aedf5c-41c3-4276-af85-b81b31bb94b0) already exists.
Marvin
08/25/2025, 8:34 AMMarvin
08/25/2025, 8:35 AMflow_run_name
. That name is not part of the unique key in your error, and Prefect does not enforce uniqueness on flow run names.
What the error means
- The unique constraint is on (flow_run_id, task_key, dynamic_key)
. That’s entirely within a single flow run and identifies a single logical task occurrence.
- You’ll see this when two “create task run” events for the same logical task occurrence are processed concurrently, racing to insert the same row.
Why you can see this on 3.4.10 even with one server
- There were known races in the task run recorder/event handling that could emit/process duplicate task-run-create events out of order, even on a single server under bursty load or after reconnects/retries.
- These were addressed with causal ordering fixes that landed after 3.4.10.
Recommended next steps
1) Upgrade your server to the latest 3.4.x (currently 3.4.14) — this includes ordering/recorder improvements that reduce or eliminate these unique violations.
- Release notes: 3.4.14
- Related background/issues: #17767 and #18654
2) Sanity checks
- Ensure you truly have a single API server instance (no leftover replica or second process). You said you do — great.
- If you’re seeing this tied to restarts or bursts, note the first occurrence time and grab server logs around it (look for “task_run_recorder” entries).
3) Not related to flow_run_name
- Specifying a flow_run_name
with with_options(...)
is fine and not connected to this constraint. The composite key uses flow_run_id
(UUID), not name.
If you can upgrade and still observe violations, please share:
- Prefect version after upgrade
- A short server log snippet around the first violation
- Whether the violations appear for specific tasks or broadly
I can help dig into the recorder path with those details.