William Nono
05/01/2025, 7:06 PMWilliam Nono
07/17/2025, 1:21 PMMarvin
07/17/2025, 1:22 PMMarvin
07/17/2025, 1:22 PMretry_policy parameter:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def my_flow():
pass
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
retry_policy={ "retries": 3, "retry_delay_seconds": 60 }
)
b. Network Configuration:
- Ensure stable network connectivity between your worker and Prefect Cloud
- Check if there are any firewall rules or proxy settings that might be interfering
- Verify your network can consistently reach api.prefect.cloud
c. Worker Configuration:
- Consider increasing worker timeout settings if you're seeing this with larger flows
- Monitor worker logs for any patterns in when these failures occur
3. Monitoring and Debugging:
- Keep an eye on your worker logs for patterns
- Set up monitoring for worker health and connectivity
- Consider implementing logging to track when these errors occur
4. Additional Context:
This issue has been reported in various forms, particularly with infrastructure provisioning. While the error you're seeing is specifically a 500 error from the Prefect Cloud API, the underlying cause is often related to network connectivity or temporary service issues.
If you're seeing this error frequently, it would be helpful to:
1. Track when these errors occur (time of day, frequency)
2. Note any patterns in the affected flows or deployments
3. Monitor your network connectivity to api.prefect.cloud
4. Consider implementing more robust error handling and retry logic in your deployments
Would you like me to provide more specific information about any of these aspects or help you implement a particular solution?William Nono
09/19/2025, 12:00 PMfor j in range(0, len(requests)):
for i in range(0, num_to_process, batch_size):
flow_runs.append(
asyncio.ensure_future(
run_deployment(
deployment_name=request.deployment_name,
)
)
)
start_time = time.time()
logger.debug(f"Awaiting completion of flow run(s): num_flow_runs={len(flow_runs)}")
finished_subflows: list[FlowRun] = await asyncio.gather(*flow_runs)
but we’ve been noticing that even though we set the max concurrency limit of to be 250, we are never approaching it even though we have a bunch of scheduled runs created. Also we seem to be hitting 10000req rate limits on the creation of flow runs.Marvin
09/19/2025, 12:00 PMMarvin
09/19/2025, 12:06 PMprefect.deployments.run_deployment treats the child as a subflow and waits for it to finish. That means it will poll the child run’s state repeatedly (every poll_interval, default 5s) — if you kick off hundreds/thousands at once, that’s a lot of API GETs per run.
- Calling with a deployment name (e.g., "flow/deployment") also incurs a lookup API call to resolve the deployment ID. Doing that per run adds even more calls.
- If your parent flow creates runs much faster than your workers can actually start them, you’ll end up with many “Scheduled” runs while never approaching your target concurrency; the bottleneck is usually worker/infrastructure capacity or a lower (effective) concurrency cap elsewhere.
What I’d change
1) Don’t wait (no polling) if you don’t need to
- If you just want to create runs and let them execute, call with as_subflow=False. This stops the status polling loop.
- Reuse a single client and, if possible, pass a deployment UUID to avoid a name→id lookup per run.
Example
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import PrefectClient
import asyncio
@flow
async def parent(requests, num_to_process, batch_size):
async with PrefectClient() as client:
# Optional: resolve the deployment ID once (saves a GET per run)
# dep = await client.read_deployment_by_name("flow/deployment")
# dep_id = dep.id
# Limit how fast you create runs to avoid API bursts
create_sem = asyncio.Semaphore(50)
async def create_one(req):
async with create_sem:
return await run_deployment(
name=req.deployment_name, # or name=dep_id
as_subflow=False,
client=client, # reuse the same client
)
tasks = []
for j in range(len(requests)):
for i in range(0, num_to_process, batch_size):
tasks.append(asyncio.create_task(create_one(requests[j])))
flow_runs = await asyncio.gather(*tasks, return_exceptions=True)
return flow_runs
2) If you do want to wait, poll less often
- Increase poll_interval significantly (e.g., 30-60s) to reduce GETs.
- Or wait on batches with a single bulk read (see “monitoring” below) instead of per-run polling.
3) Throttle creation explicitly
- A simple asyncio.Semaphore (as above) is often enough to keep you well below rate limits but still high throughput.
- You can also use Prefect’s concurrency primitives, but local throttling is the most predictable way to avoid API spikes.
4) Monitor in bulk instead of per-run polling
- After you’ve created all runs (fire-and-forget), periodically query their status in batches using one API call per batch instead of one per run.
- For example, chunk your run IDs and call a bulk “read flow runs” endpoint to check progress every N seconds.
Why you aren’t hitting your 250 “max concurrency”
- Which limit is 250? Work pool concurrency? Global concurrency limit? Something else?
- Effective concurrency is bounded by the lowest of:
- Work pool/queue concurrency settings
- Number/size of workers and the backing infrastructure capacity (e.g., how many containers/pods you can spin up concurrently)
- Any global concurrency limits/resources you’ve configured
- If runs are sitting in “Scheduled”, you likely need more workers and/or higher worker-side parallelism (and ensure the pool limit is actually 250). Also make sure there aren’t additional per-queue limits or resource-based global concurrency limits holding things back.
A few quality-of-life tweaks
- Prefer asyncio.create_task over asyncio.ensure_future.
- Use return_exceptions=True in gather so one failure doesn’t cancel everything.Marvin
09/19/2025, 12:06 PMrun_deployment, at least look up the deployment ID once and reuse it.
Open questions to help tailor the fix
- Which “max concurrency limit” did you set to 250 (work pool concurrency, global concurrency/resource limits, something else)?
- How many workers are running and what infrastructure are they using (Docker, Kubernetes, etc.)?
- Do you actually need the parent flow to wait for the children, or is “fire-and-forget + external/batch monitoring” acceptable?
Docs for reference
- Running deployments: How to run deployments
- Concurrency limits: Global concurrency limits
- Rate limits overview: Rate limits (Cloud enforces rate limits; the SDK will retry 429s, but it’s best to avoid spikes)
- REST API (flow runs): Flow runs API
If you can share which concurrency knob is set to 250 and a rough picture of your worker setup, I can help pinpoint why execution isn’t reaching it and propose concrete worker/pool settings.William Nono
09/19/2025, 12:28 PMMarvin
09/19/2025, 12:28 PMMarvin
09/19/2025, 12:30 PMas_subflow=True. Yes, run_deployment will wait and poll those runs; to reduce API traffic, increase poll_interval (e.g., 30–120s). Also throttle creation so you don’t spike flow-run-creation requests.
Why you’re not hitting 250 with one k8s worker
- Pool concurrency 250 is an upper bound. Actual concurrency is the min of:
- Pool limit (250)
- Worker/infrastructure throughput (one k8s worker can submit many jobs, but your cluster must have capacity/quotas for ~250 pods)
- How fast you can create runs without hitting API limits
- Creating many runs + per-run polling from the parent can flood the API before workers can start pods, so you end up with lots of Scheduled runs and lower effective concurrency. Throttling creation and increasing polling intervals usually fixes this.
Recommendations
1) Keep the parent-child relationship but poll less:
- Use as_subflow=True and set a higher poll_interval (30–120s) to cut GETs by 6–24x.
2) Throttle flow-run creation:
- Limit parallel run_deployment calls so you don’t burst-create thousands of runs at once.
- Reuse a single PrefectClient and, if possible, resolve the deployment ID once to avoid repeated name→ID lookups.
3) Ensure infra can actually run ~250 pods:
- One k8s worker can submit jobs quickly, but your cluster must have node capacity and API quotas for 250 concurrent pods. Consider running 2–3 workers for submission throughput/redundancy (pool limit still enforces the 250 cap).
Concrete example
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import PrefectClient
import asyncio
@flow
async def parent(requests, num_to_process, batch_size):
async with PrefectClient() as client:
# Optional: resolve deployment once to avoid repeated lookups
# dep = await client.read_deployment_by_name("flow/deployment")
# dep_id = dep.id
create_sem = asyncio.Semaphore(50) # throttle creation concurrency
async def launch(req):
async with create_sem:
return await run_deployment(
name=req.deployment_name, # prefer dep_id if you have it
as_subflow=True, # keep parent-child relationship
poll_interval=60, # reduce polling frequency
client=client, # reuse one client
)
tasks = []
for j in range(len(requests)):
for i in range(0, num_to_process, batch_size):
tasks.append(asyncio.create_task(launch(requests[j])))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Notes
- In Prefect 3.x the parameter is name (or a UUID), not deployment_name — just making sure that matches your code.
- If you do NOT need the parent to wait for completion, you could use as_subflow=False to avoid polling entirely (but you’ll lose the subflow link in the UI). If you want “linked, but don’t wait,” we can also discuss a two-phase approach where you create subflows and monitor them later in batches.
References
- Running deployments: How to run deployments
- Concurrency limits: Global concurrency limits
- Rate limits: Rate limits
- Flow runs API: Flow runs API
Follow-ups to tune this precisely:
- Roughly how many child runs are you creating per minute?
- Is a 60–120s polling interval acceptable for your SLAs?
- How many nodes/pods can your cluster support concurrently right now? If needed, we can talk about worker replicas and cluster quotas to actually sustain ~250 concurrent runs.William Nono
09/19/2025, 12:34 PMMarvin
09/19/2025, 12:34 PMMarvin
09/19/2025, 12:36 PMrun_deployment accepts an optional client parameter. If you don’t pass one, it will open/manage its own client for that call. So even inside a flow run, each run_deployment(...) can create its own ephemeral client unless you provide one.
- To guarantee reuse (and reduce connection churn + request overhead), create one PrefectClient in your parent flow and pass it to every run_deployment call.
Example:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import PrefectClient
import asyncio
@flow
async def parent(requests):
async with PrefectClient() as client:
sem = asyncio.Semaphore(50) # throttle creation
async def launch(req):
async with sem:
return await run_deployment(
name=req.deployment_name,
as_subflow=True, # keep parent-child relationship
poll_interval=60, # reduce polling frequency
client=client, # reuse the same client
)
tasks = [asyncio.create_task(launch(r)) for r in requests]
return await asyncio.gather(*tasks, return_exceptions=True)
Notes:
- Keeping as_subflow=True is what creates the parent-child link. Polling is what drives waiting; increasing poll_interval (e.g., 30–120s) reduces API load while preserving the relationship and blocking semantics.
- If you don’t need to wait for completion, you could set as_subflow=False (no subflow link) to avoid polling entirely. There isn’t a “linked but non-blocking” mode today via run_deployment.
Docs: Run deploymentsWilliam Nono
09/19/2025, 4:29 PMimport asyncio
from asyncio import BoundedSemaphore
from typing import Union
from prefect import context, flow, get_run_logger
from prefect.client.orchestration import PrefectClient, SyncPrefectClient
from prefect.client.schemas.objects import FlowRun
from prefect.deployments import run_deployment
@flow
async def batch_deployment_runner(
deployment_name: str,
batch_items: list[dict],
max_concurrent_api_calls: int = 10,
batch_size: int = 5,
) -> dict:
"""
Efficiently runs multiple deployment instances with controlled concurrency.
Args:
deployment_name: Name of the deployment to run
batch_items: List of parameter dictionaries for each deployment run
max_concurrent_api_calls: Max simultaneous API calls to Prefect (rate limiting)
batch_size: Number of items to process per batch
Returns:
Metadata about the batch execution
"""
logger = get_run_logger()
# Create semaphore to control API call concurrency
api_semaphore = BoundedSemaphore(max_concurrent_api_calls)
# Get flow context for client and tags
run_context = context.get_run_context()
current_tags = run_context.flow_run.tags
# Reuse single client connection for all operations
async with run_context.client as client:
# Resolve deployment ID once (optimization: avoids repeated API calls)
deployment = await client.read_deployment_by_name(deployment_name)
deployment_id = deployment.id if deployment else deployment_name
# Create all deployment run futures immediately
deployment_futures = []
for i in range(0, len(batch_items), batch_size):
batch = batch_items[i:i + batch_size]
for j, item_params in enumerate(batch):
deployment_futures.append(
asyncio.ensure_future(
run_single_deployment(
batch_num=i // batch_size,
item_index=j,
deployment_id=deployment_id,
parameters=item_params,
api_semaphore=api_semaphore,
tags=current_tags,
client=client,
)
)
)
<http://logger.info|logger.info>(f"Starting {len(deployment_futures)} deployment runs...")
# Execute all deployments concurrently (with API throttling via semaphore)
finished_runs: list[FlowRun] = await asyncio.gather(*deployment_futures)
<http://logger.info|logger.info>(f"Completed all {len(finished_runs)} deployment runs")
# Check for failures
failed_runs = [
run for run in finished_runs
if run and run.state.is_final() and not run.state.is_completed()
]
return {
"total_runs": len(finished_runs),
"successful_runs": len(finished_runs) - len(failed_runs),
"failed_runs": len(failed_runs),
"deployment_name": deployment_name,
}
async def run_single_deployment(
batch_num: int,
item_index: int,
deployment_id: str,
parameters: dict,
api_semaphore: BoundedSemaphore,
tags: list[str],
client: Union[PrefectClient, SyncPrefectClient],
) -> FlowRun | None:
"""
Runs a single deployment with semaphore-controlled API access.
Key insight: Semaphore only throttles the API call, not the total concurrency.
All deployments will eventually run concurrently on Prefect infrastructure.
"""
logger = get_run_logger()
# Semaphore controls API submission rate (not total concurrency)
async with api_semaphore:
logger.debug(f"Submitting deployment run: batch={batch_num}, item={item_index}")
result = await run_deployment(
name=deployment_id,
parameters=parameters,
tags=tags,
client=client,
poll_interval=30, # Optimized polling interval
)
logger.debug(f"Deployment submitted: batch={batch_num}, item={item_index}")
return result
does the _api_semaphore_ limit the number of possible concurrent subflows being executed?Marvin
09/19/2025, 4:29 PMMarvin
09/19/2025, 4:30 PMWilliam Nono
09/19/2025, 9:07 PMMarvin
09/19/2025, 9:07 PMcheck_cli_command
📊 Tools used:
• check_cli_command (1x)
• research_prefect_topic (1x)William Nono
09/23/2025, 1:27 AMMarvin
09/23/2025, 1:28 AMMarvin
09/23/2025, 1:31 AMprefect flow-run inspect <flow-run-id>
Then:
prefect flow-run logs <flow-run-id>
- Verify work pool and worker health
- Pool status, concurrency, last worker heartbeats:
prefect work-pool inspect <pool-name>
- Make sure at least one worker is running and polling the pool (and not crashing):
prefect worker start --pool <pool-name>
- Check work queues/backlog
prefect work-queue ls
prefect work-queue preview --work-pool <pool-name>
If runs are stuck due to pool/queue concurrency limits, they’ll usually remain Scheduled, not Pending — but it’s worth confirming limits aren’t set.
Infra-specific quick checks
- Kubernetes
- Find pod name in the state message or worker logs, then:
kubectl get pods -n <namespace> -w
kubectl describe pod/<name> -n <namespace>
- Look for Unschedulable, insufficient resources, or ImagePullBackOff.
- Docker
- On the worker host:
docker ps -a
docker logs <container-id>
- If image pulls are slow, pre-pull/cache the image or use a closer registry.
- AWS Batch/ECS
- Check queue/service events for capacity or placement failures.
- Ensure the compute environment can scale.
If your runs are actually stuck in Scheduled (not Pending), that points to no workers, paused pool/queue, or concurrency limits — different root cause. But since you said Pending for ~2h, it strongly suggests infrastructure isn’t starting the job.
Could you share:
- Your work pool type (Kubernetes, Docker, Process, ECS/Batch, etc.)
- One example flow run ID and pool name
- The state message from prefect flow-run inspect <id>
References
- Manage work pools and workers
- Worker CLI reference
- Flow and state conceptsWilliam Nono
09/23/2025, 2:20 PMMarvin
09/23/2025, 2:20 PMMarvin
09/23/2025, 2:24 PMprefect work-pool inspect <pool>
prefect work-queue ls
- If you see “Blocked by concurrency limit” or a backlog, raise limits:
prefect work-pool set-concurrency-limit "<pool>" 10
prefect work-queue set-concurrency-limit "<queue>" 10
- Add more workers to the pool:
prefect worker start -p "<pool>" -n "worker-2"
Docs:
- Workers and work pools: Workers & Work Pools
- Worker CLI: prefect worker CLI
- Global concurrency limits: Global Concurrency Limits
- Work queues: prefect work-queue CLI
- Work pools: prefect work-pool CLI
If you can share:
- Are you seeing a backlog of flow runs, or is a single flow run too slow?
- Which worker type and infrastructure are you using (process, Docker, Kubernetes, etc.)?
- Any pool/queue/global concurrency limits configured?
With that, I can recommend the most effective scaling approach.William Nono
09/23/2025, 2:25 PMMarvin
09/23/2025, 2:25 PMMarvin
09/23/2025, 2:28 PMprefect work-pool inspect <pool>
- Ensure the pool isn’t paused
- Look for “Open slots,” active worker heartbeats, and any “blocked by concurrency limit” messages
2) Check work queues and limits
prefect work-queue ls
- Confirm your deployments target queues that your worker is polling
- If needed, raise limits:
prefect work-pool set-concurrency-limit "<pool>" 10
prefect work-queue set-concurrency-limit "<queue>" 10
3) Check global/task-tag concurrency limits
prefect concurrency-limit ls
- If your flows have tags with low limits, they’ll stick in Pending even if workers are available
4) Inspect a specific flow run for a reason
prefect flow-run inspect <flow_run_id>
- The “state_details” often says if it’s blocked by a limit or paused pool
If it’s just lack of worker capacity:
- Scale your Kubernetes workers. If you run the Prefect worker as a Kubernetes Deployment:
kubectl scale deployment/prefect-worker -n <namespace> --replicas=5
- Or start additional workers (wherever you run them):
prefect worker start -p "<pool>" -n "k8s-worker-2"
Kubernetes-specific checks (common causes of “Pending” backlogs):
- Worker is not attached to the right pool/queues: verify your worker start command includes the correct -p and any -q
- Cluster capacity: pods stuck “Unschedulable” due to CPU/memory/quotas
- Image pull issues: private registry without imagePullSecrets
- RBAC/service account issues for the job template
- Namespace/resource limits in your job template
Docs for reference:
- Workers & Work Pools: Concepts: Workers & Work Pools
- Kubernetes how-to: Run on Kubernetes
- Worker CLI: prefect worker CLI
- Work queues: prefect work-queue CLI
- Global concurrency limits: Global concurrency limits
If you can share:
- The output of prefect work-pool inspect <pool>
- Whether your worker runs as a K8s Deployment (and its current replicas)
- Any messages in flow-run inspect for a Pending run
I can then confirm whether scaling workers will clear the backlog or if a limit/config is the real blocker.Marvin
09/23/2025, 2:44 PMWilliam Nono
09/23/2025, 2:45 PMstatus=WorkPoolStatus.READYMarvin
09/23/2025, 2:45 PMMarvin
09/23/2025, 2:47 PMkubectl logs deploy/<your-prefect-worker-deployment> -n <namespace> -f
You should see logs like “Found X scheduled runs” and “Claiming run …”. If you only see “No work found”, the worker may be pointed at the wrong queues.
2) Verify queues and limits
- Which queues is the worker polling? If you started it with prefect worker start -p "<pool>" -q "<queue>", make sure your deployments target the same queue(s).
- Inspect the pool and queues:
prefect work-pool inspect <pool>
prefect work-queue ls
prefect concurrency-limit ls
If you see “blocked by concurrency limit” or very low limits, raise them:
prefect work-pool set-concurrency-limit "<pool>" 20
prefect work-queue set-concurrency-limit "<queue>" 20
3) Preview scheduled work
- See what’s expected to run vs open slots:
prefect work-pool preview "<pool>" --hours 1
4) If runs are claimed but pods don’t start, check Kubernetes
- Look for Prefect-created pods/jobs and their events:
kubectl get pods -n <namespace> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>' -w
kubectl describe pod <pod> -n <namespace>
kubectl get events -n <namespace> --sort-by=.metadata.creationTimestamp
Common culprits:
- Unschedulable: Insufficient CPU/memory or namespace quotas
- Image pull failures: add imagePullSecrets in the pool’s base job template
- RBAC/serviceAccount issues: ensure the service account in the job template can create pods in the namespace
- Requests/limits too high in the job template
5) Scale for throughput
- If the above look good and you just need more throughput, scale the worker Deployment and ensure limits are raised to match:
kubectl scale deployment/<your-prefect-worker-deployment> -n <namespace> --replicas=5
And keep pool/queue limits high enough (step 2).
Helpful docs:
- Workers & Work Pools concepts: Workers & Work Pools
- Kubernetes how-to (job template, imagePullSecrets, service accounts): Run on Kubernetes
- Worker CLI: prefect worker CLI
- Work Pools/Queues CLI: work-pool CLI, work-queue CLI
- Global concurrency limits: Global concurrency limits
If you can share:
- A snippet from kubectl logs for the worker during the backlog
- Output of prefect work-pool inspect <pool> and prefect work-queue ls
- One kubectl describe pod from a Prefect-run pod stuck Pending
…I can pinpoint whether scaling replicas will clear the backlog or if a limit/config in Prefect/K8s is the blocker.William Nono
09/23/2025, 3:37 PMMarvin
09/23/2025, 3:37 PMMarvin
09/23/2025, 3:38 PMkubectl logs deploy/<prefect-worker> -n <ns> -f
Look for “Claiming run …” and job creation logs.
- Do you see flow pods?
kubectl get pods -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
2) If pods exist but are Pending, what’s the reason?
- Describe one:
kubectl describe pod <pod> -n <ns>
- If you see “Insufficient cpu/memory” → cluster autoscaler should kick in if it’s configured for that node group and not at max nodes.
- If you see image pull / RBAC / quota errors → fix those first (autoscaler won’t scale for these).
3) If no flow pods are being created:
- The worker may not be claiming runs or is blocked by limits. Check:
prefect work-pool inspect <pool>
prefect work-queue ls
prefect concurrency-limit ls
If you see “blocked by concurrency limit,” raise them:
prefect work-pool set-concurrency-limit "<pool>" 20
prefect work-queue set-concurrency-limit "<queue>" 20
4) Ensure your flow pods request enough resources to trigger autoscaling
- Set requests/limits in the Kubernetes work pool’s base job template (UI or CLI):
prefect work-pool update "<pool>" --base-job-template ./k8s-job-template.json
In the template, add resources under the container:
{
"job": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "prefect-job",
"resources": {
"requests": {
"cpu": "1",
"memory": "2Gi"
},
"limits": {
"cpu": "2",
"memory": "4Gi"
}
}
}
]
}
}
}
}
}
Also set required imagePullSecrets, serviceAccountName, taints/tolerations, and nodeSelector/affinity to match your autoscaling node group.
5) If pods are Unschedulable but nodes still don’t scale
- Check cluster autoscaler settings: min/max node count, taints/tolerations on the target node group, priority classes, and whether the pending pods match the ASG/NodePool’s selector.
- Verify namespace/resource quotas aren’t blocking scheduling.
Helpful references:
- Kubernetes how-to (job template, imagePullSecrets, service accounts): Run on Kubernetes
- Workers & Work Pools: Workers & Work Pools
- Work pool/queue CLI: work-pool CLI, work-queue CLI
If you can share:
- A few lines of worker logs around run claiming
- Output of kubectl get pods -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>' and a kubectl describe of a Pending pod
- prefect work-pool inspect <pool> output
…I can tell you exactly whether we need to raise Prefect limits, fix job template/resources, or adjust your cluster autoscaler config.William Nono
09/23/2025, 6:09 PMMarvin
09/23/2025, 6:10 PMMarvin
09/23/2025, 6:11 PMprefect work-pool inspect <pool>
prefect work-queue ls
prefect concurrency-limit ls
- If you see slots at 0 or “blocked by concurrency limit”, raise/clear limits:
prefect work-pool set-concurrency-limit "<pool>" 20
prefect work-queue set-concurrency-limit "<queue>" 20
- For tag limits with orphaned slots:
prefect concurrency-limit inspect <tag>
prefect concurrency-limit reset <tag>
2) Find and resolve stuck runs
prefect flow-run ls --state RUNNING --limit 100
prefect flow-run ls --state CANCELLING --limit 100
prefect flow-run ls --state PENDING --limit 100
prefect flow-run inspect <flow_run_id>
Then cancel them (and re-run if needed):
prefect flow-run cancel <flow_run_id>
If they won’t resolve and you need to unblock quickly:
prefect flow-run delete <flow_run_id>
3) Clean up Kubernetes leftovers
kubectl get pods -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
kubectl get jobs -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
kubectl describe pod <pod> -n <ns>
kubectl delete job <job> -n <ns>
Consider adding TTL for finished Jobs in your work pool’s base job template so completed/failed jobs are cleaned automatically.
Refs:
- Workers & Work Pools: Concepts: Workers & Work Pools
- Global/tag concurrency limits: Global Concurrency Limits
- Kubernetes how-to (job template, TTL, imagePullSecrets, service accounts): Run on Kubernetes
If you share:
- prefect work-pool inspect <pool> output
- A prefect concurrency-limit ls snippet
- One prefect flow-run inspect <id> for a stuck run
- Whether you see leftover Pods/Jobs
…I can pinpoint exactly which blocker applies and the fastest remediation.William Nono
09/23/2025, 6:12 PMMarvin
09/23/2025, 6:12 PMMarvin
09/23/2025, 6:13 PMttlSecondsAfterFinished on the Job spec.
Two common ways:
A) Edit in the UI
- Prefect Cloud/Server > Work Pools > your K8s pool > Edit base job template
- Add this field under `job.spec`:
"job": {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {},
"spec": {
"ttlSecondsAfterFinished": 3600,
"template": {
"spec": {
"containers": [ ... ],
"restartPolicy": "Never"
}
},
"backoffLimit": 0
}
}
- Save. New flow runs will create Jobs that Kubernetes deletes 1 hour after they finish (and their pods will be GC’d).
B) Edit via CLI
1) Grab a starting template (default for K8s):
prefect work-pool get-default-base-job-template --type kubernetes -f k8s-template.json
2) Edit k8s-template.json to add:
"ttlSecondsAfterFinished": 3600
under job.spec.
3) Apply it to your pool:
prefect work-pool update "<your-pool>" --base-job-template ./k8s-template.json
Notes:
- The field must be at job.spec.ttlSecondsAfterFinished (not under template.spec).
- Ensure your cluster supports the TTL controller (it’s standard on modern K8s).
- TTL deletes the Job and GC’s its pods; make sure your logs/artifacts are persisted elsewhere before they’re cleaned up.
Helpful docs:
- Work pools (editing base job templates): Workers & Work Pools
- Kubernetes how-to (job templates, service accounts, imagePullSecrets): Run on Kubernetes
- Work pool CLI: prefect work-pool CLI
If you want, paste your current base job template snippet and I’ll show exactly where to drop the TTL line.William Nono
09/25/2025, 6:58 PMFailed to submit flow run '068d57f4-7464-74c8-8000-58ce61839ca8' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 72, in map_httpcore_exceptions
yield
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
raise exc from None
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 192, in handle_async_request
connection = await pool_request.wait_for_connection(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 35, in wait_for_connection
await self._connection_acquired.wait(timeout=timeout)
File "/usr/local/lib/python3.11/site-packages/httpcore/_synchronization.py", line 148, in wait
with map_exceptions(anyio_exc_map):
File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.PoolTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 863, in _submit_run_and_capture_errors
configuration = await self._get_configuration(flow_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 942, in _get_configuration
flow = await self._client.read_flow(flow_run.flow_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 502, in read_flow
response = await self._client.get(f"/flows/{flow_id}")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1814, in get
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1585, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 330, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 254, in _send_with_retry
response = await send(request, *send_args, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1674, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1776, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 376, in handle_async_request
with map_httpcore_exceptions():
File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.PoolTimeoutMarvin
09/25/2025, 6:59 PMMarvin
09/25/2025, 7:05 PMPREFECT_API_URL and PREFECT_API_KEY (for Cloud)
- From inside the worker environment/container:
- curl health endpoint:
echo $PREFECT_API_URL
curl -sv "${PREFECT_API_URL%/}/api/health"
- DNS and reachability:
nslookup $(echo $PREFECT_API_URL | awk -F[/:] '{print $4}')
- If you’re behind a proxy, ensure env vars are set appropriately and that your Prefect API host is in NO_PROXY to avoid needless proxying:
- HTTP_PROXY / HTTPS_PROXY as needed
- NO_PROXY=<your-prefect-api-host>,localhost,127.0.0.1
2) Reduce burstiness with a work-pool concurrency limit
If many runs are being submitted at once, the worker and/or API can get overwhelmed, leading to pool timeouts. Set a concurrency limit on the work pool to smooth load:
- Inspect the pool and current limits:
prefect work-pool inspect <your-pool>
- Set a limit (e.g., 5):
prefect work-pool set-concurrency-limit <your-pool> 5
Docs: Work pool CLI
3) Increase client-side resiliency (retries)
Increase the client retries to give the API more time during transient congestion:
- Temporarily via environment on the worker:
export PREFECT_CLIENT_MAX_RETRIES=10
export PREFECT_CLIENT_RETRY_JITTER_FACTOR=0.5
- Or persist in your Prefect profile:
prefect config set client.max_retries=10 client.retry_jitter_factor=0.5
Then restart the worker.
4) Turn on debug logs on the worker
This will show each HTTP request/response and help pinpoint slow or failing requests:
export PREFECT_LOGGING_LEVEL=DEBUG
prefect worker start -p <your-pool>
5) Self-hosted tuning (if applicable)
- Check API service logs and DB performance at the same time as the timeouts. If the API/DB is slow or connection-limited, the client pool will back up.
- Scale out API replicas and ensure your DB pool size and reverse proxy keepalive/connection limits are sufficient.
- If you’re using Kubernetes and the Kubernetes observer, heavy event storms can spike API calls; consider isolating the observer or scaling the API to handle the load. Relevant issue for high event volume producing PoolTimeouts: Kubernetes worker PoolTimeout under heavy k8s events
6) Validate you’re on a recent Prefect version
Pool timeout behavior has come up in various load scenarios; ensure both server and worker are on current releases so you have recent fixes and improvements.
Why this happens
- httpx.PoolTimeout means the HTTP client couldn’t acquire a connection from its pool quickly enough. In practice this occurs when:
- The API is slow or overloaded (server CPU/DB bottlenecks, rate limiting).
- Network path issues (proxy misconfig, DNS resolution delays, load balancer connection limits).
- Excessive concurrency from workers or auxiliary services (many runs submitted at once, Kubernetes observer generating bursts of API calls).Marvin
09/25/2025, 7:05 PMWilliam Nono
10/06/2025, 9:55 PMdeployments:
- name: dep-name
entrypoint: entry
parameters: {}
work_pool:
name: kubernetes-pool
work_queue_name:
job_variables:
env:
- name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
value: "json"
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
getting the following error: Failed to submit flow run ‘068e439a-08ab-7901-8000-0e196b204376’ to infrastructure.
Traceback (most recent call last):
File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 1277, in _submit_run_and_capture_errors
configuration = await self._get_configuration(flow_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 1378, in _get_configuration
configuration = await self.job_configuration.from_template_and_values(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py”, line 99, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 194, in from_template_and_values
base_config[“env”] = base_config.get(“env”) | deployment_env
~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~
TypeError: unsupported operand type(s) for |: ‘dict’ and ‘list’Marvin
10/06/2025, 9:55 PMMarvin
10/06/2025, 10:02 PMwork_pool.job_variables.env must be a dict (key -> value), not a Kubernetes-style list. Your list form is why you’re getting:
TypeError: unsupported operand type(s) for |: 'dict' and 'list'
Prefect is trying to merge dict env from the base job template with your deployment’s env, but it’s a list.
What to do instead
- For simple env vars, use a dict:
deployments:
- name: dep-name
entrypoint: entry
parameters: {}
work_pool:
name: kubernetes-pool
job_variables:
env:
PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
# DD_AGENT_HOST can't use valueFrom here; needs a manifest override
- For Kubernetes features like valueFrom/fieldRef, override the Kubernetes job manifest and supply the container env list there. In Prefect 3.x, the Kubernetes worker’s base job template has a job_manifest you can override. A minimal example that preserves the built-in templating:
deployments:
- name: dep-name
entrypoint: entry
parameters: {}
work_pool:
name: kubernetes-pool
job_variables:
job_manifest:
apiVersion: batch/v1
kind: Job
metadata:
generateName: "{{ name }}-"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
spec:
backoffLimit: 0
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
restartPolicy: "Never"
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
env:
- name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
value: "json"
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
Notes:
- Don’t also set job_variables.env if you fully control the job_manifest.env list; you’d be bypassing the dict-based env.
- If you want to see the default schema/template for your pool type, you can print it and use it as a starting point:
prefect work-pool get-default-base-job-template --type kubernetes
Then copy it into your deployment’s job_variables.job_manifest and edit just the bits you need (like the env list).
Docs:
- Customize job variables
If you share your current pool’s base job template (prefect work-pool inspect "kubernetes-pool"), I can help you craft the exact job_manifest override needed. Are you specifically trying to set DD_AGENT_HOST to the node’s host IP via the downward API?William Nono
10/06/2025, 10:05 PM{
"variables": {
"type": "object",
"properties": {
"env": {
"type": "object",
"title": "Environment Variables",
"default": {
"ENVIRONMENT": "dev"
},
"description": "Environment variables to set when starting a flow run.",
"additionalProperties": {
"type": "string"
}
},
"name": {
"type": "string",
"title": "Name",
"description": "Name given to infrastructure created by a worker."
},
"image": {
"type": "string",
"title": "Image",
"example": "<http://docker.io/prefecthq/prefect:2-latest|docker.io/prefecthq/prefect:2-latest>",
"description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used."
},
"labels": {
"type": "object",
"title": "Labels",
"description": "Labels applied to infrastructure created by a worker.",
"additionalProperties": {
"type": "string"
}
},
"command": {
"type": "string",
"title": "Command",
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
},
"namespace": {
"type": "string",
"title": "Namespace",
"default": "prefect",
"description": "The Kubernetes namespace to create jobs within."
},
"stream_output": {
"type": "boolean",
"title": "Stream Output",
"default": true,
"description": "If set, output will be streamed from the job to local standard output."
},
"cluster_config": {
"allOf": [
{
"$ref": "#/definitions/KubernetesClusterConfig"
}
],
"title": "Cluster Config",
"description": "The Kubernetes cluster config to use for job creation."
},
"finished_job_ttl": {
"type": "integer",
"title": "Finished Job TTL",
"default": 60,
"description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
},
"image_pull_policy": {
"enum": [
"IfNotPresent",
"Always",
"Never"
],
"type": "string",
"title": "Image Pull Policy",
"default": "IfNotPresent",
"description": "The Kubernetes image pull policy to use for job containers."
},
"service_account_name": {
"type": "string",
"title": "Service Account Name",
"default": "d-maestro-k8s",
"description": "The Kubernetes service account to use for job creation."
},
"job_watch_timeout_seconds": {
"type": "integer",
"title": "Job Watch Timeout Seconds",
"description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
},
"pod_watch_timeout_seconds": {
"type": "integer",
"title": "Pod Watch Timeout Seconds",
"default": 1000,
"description": "Number of seconds to watch for pod creation before timing out."
},
"kubernetes_pod_minimum_memory": {
"type": "string",
"default": "1024Mi",
"description": "Kubernetes pod memory setting"
},
"kubernetes_pod_maximum_memory": {
"type": "string",
"default": "1024Mi",
"description": "Kubernetes pod memory setting"
},
"kubernetes_pod_minimum_cpu": {
"type": "string",
"default": "2",
"description": "Kubernetes pod CPU setting"
},
"pod_labels": {
"type": "object",
"title": "Labels",
"description": "Labels applied to EKS pods created by a worker.",
"additionalProperties": {
"type": "string"
}
},
"pod_annotations": {
"type": "object",
"title": "Annotations",
"description": "Annotations applied to EKS pods created by a worker.",
"additionalProperties": {
"type": "string"
}
}
},
"definitions": {
"KubernetesClusterConfig": {
"type": "object",
"title": "KubernetesClusterConfig",
"required": [
"config",
"context_name"
],
"properties": {
"config": {
"type": "object",
"title": "Config",
"description": "The entire contents of a kubectl config file."
},
"context_name": {
"type": "string",
"title": "Context Name",
"description": "The name of the kubectl context to use."
}
},
"description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
"secret_fields": [],
"block_type_slug": "kubernetes-cluster-config",
"block_schema_references": {}
}
},
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"command": "{{ command }}",
"namespace": "{{ namespace }}",
"job_manifest": {
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"env": "{{ env }}",
"args": "{{ command }}",
"name": "prefect-job",
"image": "{{ image }}",
"imagePullPolicy": "{{ image_pull_policy }}",
"resources": {
"requests": {
"memory": "{{ kubernetes_pod_minimum_memory }}",
"cpu": "{{ kubernetes_pod_minimum_cpu }}"
},
"limits": {
"memory": "{{ kubernetes_pod_maximum_memory }}"
}
}
}
],
"completions": 1,
"parallelism": 1,
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}"
},
"metadata": {
"labels": "{{ pod_labels }}",
"annotations": "{{ pod_annotations }}"
}
},
"backoffLimit": 0,
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
},
"metadata": {
"labels": "{{ labels }}",
"namespace": "{{ namespace }}",
"generateName": "{{ name }}-"
},
"apiVersion": "batch/v1"
},
"stream_output": "{{ stream_output }}",
"cluster_config": "{{ cluster_config }}",
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
}
}William Nono
10/06/2025, 10:07 PMMarvin
10/06/2025, 10:07 PMMarvin
10/06/2025, 10:08 PMwork_pool.job_variables.env must be a dict of key -> value. Your current list-of-env-vars style is Kubernetes-native, but Prefect expects a dict there, so it fails when trying to merge dicts:
TypeError: unsupported operand type(s) for |: 'dict' and 'list'
- If you only need simple env vars, use a dict under `job_variables.env`:
deployments:
- name: dep-name
entrypoint: entry
parameters: {}
work_pool:
name: kubernetes-pool
job_variables:
env:
PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
# DD_AGENT_HOST requires valueFrom; see the next option
- If you need Kubernetes features like valueFrom/fieldRef (e.g., node host IP), override the Kubernetes Job manifest and set the container env list there. Keep the Prefect templating placeholders:
deployments:
- name: dep-name
entrypoint: entry
parameters: {}
work_pool:
name: kubernetes-pool
job_variables:
job_manifest:
apiVersion: batch/v1
kind: Job
metadata:
generateName: "{{ name }}-"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
spec:
backoffLimit: 0
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
restartPolicy: "Never"
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
env:
- name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
value: "json"
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
Tips:
- Don’t also set job_variables.env if you’re defining env in the `job_manifest`—otherwise you may override or confuse the merge.
- You can view the default Kubernetes worker template (handy to copy and edit) with:
prefect work-pool get-default-base-job-template --type kubernetes
- And inspect your specific pool’s current template with:
prefect work-pool inspect "kubernetes-pool"
Docs: Customize job variables
If you paste your manifest here, I’ll tailor it to the correct Prefect 3.4 shape and ensure the valueFrom bit is wired up properly.William Nono
10/06/2025, 10:09 PMMarvin
10/06/2025, 10:09 PMMarvin
10/06/2025, 10:12 PMjob_variables.env (dict) and adds your valueFrom entry. Because containers[0].env is a list, we replace it but use Jinja to include the original env list plus your addition.
deployments:
- name: dep-name
entrypoint: entry
work_pool:
name: kubernetes-pool
job_variables:
env:
PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
job_manifest:
apiVersion: batch/v1
kind: Job
metadata:
generateName: "{{ name }}-"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
spec:
backoffLimit: 0
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
restartPolicy: "Never"
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
env: "{{ env + [{'name': 'DD_AGENT_HOST', 'valueFrom': {'fieldRef': {'fieldPath': 'status.hostIP'}}}] }}"
Notes:
- env on the container is now a Jinja expression that takes the rendered default env list and appends your valueFrom item.
- Keep the templated placeholders ({{ image }}, {{ command }}, etc.) so you don’t lose defaults.
Option B: Replace env entirely in the manifest
If you don’t need job_variables.env, you can fully define the container’s env list yourself. This is simpler but does not pull in the dict-based env at all.
deployments:
- name: dep-name
entrypoint: entry
work_pool:
name: kubernetes-pool
job_variables:
job_manifest:
apiVersion: batch/v1
kind: Job
metadata:
generateName: "{{ name }}-"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
spec:
backoffLimit: 0
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
restartPolicy: "Never"
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
env:
- name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
value: "json"
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
Why your original failed
- In 3.4, job_variables.env must be a dict (key -> value). Supplying a list of Kubernetes EnvVar objects there causes the merge error you saw:
TypeError: unsupported operand type(s) for |: 'dict' and 'list'
Helpful commands
- Print the default Kubernetes worker template (good starting point to copy/modify):
prefect work-pool get-default-base-job-template --type kubernetes
- Inspect your specific work pool’s base job template to see what you’re merging with:
prefect work-pool inspect "your-pool-name"
Docs
- Customize job variables: Customize job variables
If you paste your current manifest snippet here, I can help rewrite it using Option A (Jinja concatenation) so you only “override” the env list while preserving the rest.William Nono
10/06/2025, 10:58 PMvolumes:
- name: podinfo
downwardAPI:
items:
- path: host_ip
fieldRef:
fieldPath: status.hostIP
volume_mounts:
- name: podinfo
mountPath: /etc/podinfo
readOnly: trueMarvin
10/06/2025, 10:58 PMMarvin
10/06/2025, 10:59 PMdeployments:
- name: dep-name
entrypoint: entry
work_pool:
name: kubernetes-pool
job_variables:
job_manifest:
apiVersion: batch/v1
kind: Job
metadata:
generateName: "{{ name }}-"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
spec:
backoffLimit: 0
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
restartPolicy: "Never"
serviceAccountName: "{{ service_account_name }}"
volumes:
- name: podinfo
downwardAPI:
items:
- path: host_ip
fieldRef:
fieldPath: status.hostIP
containers:
- name: prefect-job
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
env: "{{ env }}"
volumeMounts:
- name: podinfo
mountPath: /etc/podinfo
readOnly: true
Notes:
- Use volumeMounts (camelCase), not volume_mounts.
- Place volumes under spec.template.spec.volumes and mounts under the container spec.
- This will write the node IP to /etc/podinfo/host_ip inside the container. If your goal is to set DD_AGENT_HOST, you’d still need to read that file and export it (e.g., via a wrapper entrypoint). If you want an env var directly, it’s simpler to add it via valueFrom:
env: "{{ env + [{'name': 'DD_AGENT_HOST', 'valueFrom': {'fieldRef': {'fieldPath': 'status.hostIP'}}}] }}"
That appends the downward API env var to the existing env generated from job_variables.env.
Related docs: Customize job variables
If you share your current deployment/pool template, I can fold the volume/mount or the valueFrom env into it with the least disruption.William Nono
10/06/2025, 11:19 PMFailed to submit flow run '068e44e0-d35e-7325-8000-43ee8c28b2a5' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 917, in _create_job
job = await batch_client.create_namespaced_job(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 192, in __call_api
raise e
File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
response_data = await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 249, in POST
return (await self.request("POST", url,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 206, in request
raise ApiException(http_resp=r)
kubernetes_asyncio.client.exceptions.ApiException: (422)
Reason: Unprocessable Entity
HTTP response headers: <CIMultiDictProxy('Audit-Id': 'dfbd34dd-0efd-4fbc-b845-e231e3cd6e9b', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_ENV\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_SERVICE\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_VERSION\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_MAIN_PACKAGE\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_TRACE_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_PROFILING_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_PROFILING_STACK_V2_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_PROFILING_TIMELINE_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.completions\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.parallelism\\""', 'X-Kubernetes-Pf-Flowschema-Uid': '0c64a2c2-6776-49b6-bcb7-8244c03aa61d', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'ed1ab123-74e2-448b-8af9-4a37f73567d5', 'Date': 'Mon, 06 Oct 2025 23:17:48 GMT', 'Content-Length': '409')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"caped-wolf-ppw86\" is invalid: spec.template.spec.containers[0].env[16].name: Required value","reason":"Invalid","details":{"name":"caped-wolf-ppw86","group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].env[16].name"}]},"code":422}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 1281, in _submit_run_and_capture_errors
result = await self.run(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 745, in run
job = await self._create_job(configuration, client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
return await copy(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__
do = await self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
result = await action(retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/_utils.py", line 99, in inner
return call(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 420, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 187, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__
result = await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 929, in _create_job
raise InfrastructureError(
prefect.exceptions.InfrastructureError: Unable to create Kubernetes job: Unprocessable Entity: Job.batch "caped-wolf-ppw86" is invalid: spec.template.spec.containers[0].env[16].name: Required valueMarvin
10/06/2025, 11:19 PMMarvin
10/06/2025, 11:21 PMenv: "{{ env + [ { 'DD_ENV': 'prod', 'DD_SERVICE': 'svc', ... } ] }}"
Kubernetes sees a single env list item with keys DD_ENV, DD_SERVICE, … — none of which are valid fields — and there’s no required name field. Hence the 422.
How to fix it
- Keep simple variables in job_variables.env as a dict (Prefect will convert them to the proper list of {name,value}):
work_pool:
name: kubernetes-pool
job_variables:
env:
DD_ENV: "prod"
DD_SERVICE: "your-service"
DD_VERSION: "1.2.3"
DD_MAIN_PACKAGE: "your_pkg"
DD_TRACE_ENABLED: "true"
DD_PROFILING_ENABLED: "true"
DD_PROFILING_STACK_V2_ENABLED: "true"
DD_PROFILING_TIMELINE_ENABLED: "true"
PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
- Only append the special valueFrom variable(s) in the container’s env via Jinja, making sure each appended item has name/valueFrom:
job_variables:
job_manifest:
apiVersion: batch/v1
kind: Job
metadata:
generateName: "{{ name }}-"
labels: "{{ labels }}"
namespace: "{{ namespace }}"
spec:
# parallelism/completions belong here on Job.spec (see note below)
backoffLimit: 0
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
restartPolicy: "Never"
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
env: "{{ env + [ {'name': 'DD_AGENT_HOST', 'valueFrom': {'fieldRef': {'fieldPath': 'status.hostIP'}}} ] }}"
Important: Don’t put multiple envs into a single dict item in the list. Each env var should be its own object with name/value or name/valueFrom.
About the other warnings (completions/parallelism)
Your error also shows:
- unknown field "spec.template.spec.completions"
- unknown field "spec.template.spec.parallelism"
Those fields belong under Job.spec, not under Pod template spec. If you set them, place them at spec.parallelism and spec.completions (sibling to template), not under spec.template.spec. Example:
spec:
parallelism: 1
completions: 1
template:
spec:
containers:
- ...
Docs: Customize job variables
If you paste your current job_manifest block (just the spec.template.spec and containers/env bits), I’ll rewrite it so it validates and preserves Prefect’s defaults.