<@ULVA73B9P> what is the best way to get results f...
# ask-marvin
w
@Marvin what is the best way to get results from subflow in parent flow without specifically doing results persistance in s3? why do errors persist with no issue?
@Marvin why are would we be occasionally getting: Failed to submit flow run ‘06878931-6e1c-7a43-8000-7852286531b6’ to infrastructure. Traceback (most recent call last): File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 863, in _submit_run_and_capture_errors configuration = await self._get_configuration(flow_run) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 940, in _get_configuration else await self._client.read_deployment(flow_run.deployment_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py”, line 1787, in read_deployment response = await self._client.get(f”/deployments/{deployment_id}“) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/httpx/_client.py”, line 1814, in get return await self.request( ^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/httpx/_client.py”, line 1585, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/prefect/client/base.py”, line 361, in send response.raise_for_status() File “/usr/local/lib/python3.11/site-packages/prefect/client/base.py”, line 174, in raise_for_status raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause prefect.exceptions.PrefectHTTPStatusError: Server error ‘500 Internal Server Error’ for url ’https://api.prefect.cloud/api/accounts/172fd80b-60d8-4f3c-a46b-f4fc4dfcc599/workspaces/a7fba5cf-26d2-4038-9fe8-1d40e83e0568/deployments/283c759b-3ae4-4ab7-952d-1572e8bc77a2' Response: {‘exception_message’: ‘Internal Server Error’} For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
m
thought for 18.5 seconds
Based on the error message and research, this issue appears to be related to intermittent connectivity problems when trying to communicate with Prefect Cloud's API. The 500 Internal Server Error you're seeing specifically occurs when trying to read deployment information before submitting a flow run. Here are a few key points and recommendations: 1. Common Causes: - Network connectivity issues between your worker and Prefect Cloud - Temporary API service disruptions - Rate limiting or timeout issues 2. Recommended Solutions: a. Implement Retry Logic: - Add retry policies to your deployments to handle infrastructure-related failures - You can configure this in your deployment using the
retry_policy
parameter:
Copy code
python
   from prefect import flow
   from prefect.deployments import Deployment
   
   @flow
   def my_flow():
       pass
   
   deployment = Deployment.build_from_flow(
       flow=my_flow,
       name="my-deployment",
       retry_policy={ "retries": 3, "retry_delay_seconds": 60 }
   )
b. Network Configuration: - Ensure stable network connectivity between your worker and Prefect Cloud - Check if there are any firewall rules or proxy settings that might be interfering - Verify your network can consistently reach
api.prefect.cloud
c. Worker Configuration: - Consider increasing worker timeout settings if you're seeing this with larger flows - Monitor worker logs for any patterns in when these failures occur 3. Monitoring and Debugging: - Keep an eye on your worker logs for patterns - Set up monitoring for worker health and connectivity - Consider implementing logging to track when these errors occur 4. Additional Context: This issue has been reported in various forms, particularly with infrastructure provisioning. While the error you're seeing is specifically a 500 error from the Prefect Cloud API, the underlying cause is often related to network connectivity or temporary service issues. If you're seeing this error frequently, it would be helpful to: 1. Track when these errors occur (time of day, frequency) 2. Note any patterns in the affected flows or deployments 3. Monitor your network connectivity to
api.prefect.cloud
4. Consider implementing more robust error handling and retry logic in your deployments Would you like me to provide more specific information about any of these aspects or help you implement a particular solution?
w
@Marvin ive been submitting flow runs from a parent flow like the following:
Copy code
for j in range(0, len(requests)):
        for i in range(0, num_to_process, batch_size):
            flow_runs.append(
                asyncio.ensure_future(
                    run_deployment(
                     deployment_name=request.deployment_name,    
                    )
                )
            )

    start_time = time.time()
    logger.debug(f"Awaiting completion of flow run(s): num_flow_runs={len(flow_runs)}")
    finished_subflows: list[FlowRun] = await asyncio.gather(*flow_runs)
but we’ve been noticing that even though we set the max concurrency limit of to be 250, we are never approaching it even though we have a bunch of scheduled runs created. Also we seem to be hitting 10000req rate limits on the creation of flow runs.
m
thought for 369.7 seconds
Thanks for the details — a few things are likely at play here. Key points about run_deployment and why you’re seeing rate limits - By default,
prefect.deployments.run_deployment
treats the child as a subflow and waits for it to finish. That means it will poll the child run’s state repeatedly (every
poll_interval
, default 5s) — if you kick off hundreds/thousands at once, that’s a lot of API GETs per run. - Calling with a deployment name (e.g.,
"flow/deployment"
) also incurs a lookup API call to resolve the deployment ID. Doing that per run adds even more calls. - If your parent flow creates runs much faster than your workers can actually start them, you’ll end up with many “Scheduled” runs while never approaching your target concurrency; the bottleneck is usually worker/infrastructure capacity or a lower (effective) concurrency cap elsewhere. What I’d change 1) Don’t wait (no polling) if you don’t need to - If you just want to create runs and let them execute, call with
as_subflow=False
. This stops the status polling loop. - Reuse a single client and, if possible, pass a deployment UUID to avoid a name→id lookup per run. Example
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import PrefectClient
import asyncio

@flow
async def parent(requests, num_to_process, batch_size):
    async with PrefectClient() as client:
        # Optional: resolve the deployment ID once (saves a GET per run)
        # dep = await client.read_deployment_by_name("flow/deployment")
        # dep_id = dep.id

        # Limit how fast you create runs to avoid API bursts
        create_sem = asyncio.Semaphore(50)

        async def create_one(req):
            async with create_sem:
                return await run_deployment(
                    name=req.deployment_name,  # or name=dep_id
                    as_subflow=False,
                    client=client,             # reuse the same client
                )

        tasks = []
        for j in range(len(requests)):
            for i in range(0, num_to_process, batch_size):
                tasks.append(asyncio.create_task(create_one(requests[j])))

        flow_runs = await asyncio.gather(*tasks, return_exceptions=True)
        return flow_runs
2) If you do want to wait, poll less often - Increase
poll_interval
significantly (e.g., 30-60s) to reduce GETs. - Or wait on batches with a single bulk read (see “monitoring” below) instead of per-run polling. 3) Throttle creation explicitly - A simple
asyncio.Semaphore
(as above) is often enough to keep you well below rate limits but still high throughput. - You can also use Prefect’s concurrency primitives, but local throttling is the most predictable way to avoid API spikes. 4) Monitor in bulk instead of per-run polling - After you’ve created all runs (fire-and-forget), periodically query their status in batches using one API call per batch instead of one per run. - For example, chunk your run IDs and call a bulk “read flow runs” endpoint to check progress every N seconds. Why you aren’t hitting your 250 “max concurrency” - Which limit is 250? Work pool concurrency? Global concurrency limit? Something else? - Effective concurrency is bounded by the lowest of: - Work pool/queue concurrency settings - Number/size of workers and the backing infrastructure capacity (e.g., how many containers/pods you can spin up concurrently) - Any global concurrency limits/resources you’ve configured - If runs are sitting in “Scheduled”, you likely need more workers and/or higher worker-side parallelism (and ensure the pool limit is actually 250). Also make sure there aren’t additional per-queue limits or resource-based global concurrency limits holding things back. A few quality-of-life tweaks - Prefer
asyncio.create_task
over
asyncio.ensure_future
. - Use
return_exceptions=True
in
gather
so one failure doesn’t cancel everything.
- If you must pass a name to
run_deployment
, at least look up the deployment ID once and reuse it. Open questions to help tailor the fix - Which “max concurrency limit” did you set to 250 (work pool concurrency, global concurrency/resource limits, something else)? - How many workers are running and what infrastructure are they using (Docker, Kubernetes, etc.)? - Do you actually need the parent flow to wait for the children, or is “fire-and-forget + external/batch monitoring” acceptable? Docs for reference - Running deployments: How to run deployments - Concurrency limits: Global concurrency limits - Rate limits overview: Rate limits (Cloud enforces rate limits; the SDK will retry 429s, but it’s best to avoid spikes) - REST API (flow runs): Flow runs API If you can share which concurrency knob is set to 250 and a rough picture of your worker setup, I can help pinpoint why execution isn’t reaching it and propose concrete worker/pool settings.
w
@Marvin - The “max concurrency limit” did you set to 250 is the work pool concurrency - 1 Kubernetes worker Also, the child runs should be related to the parent flows but so we must poll just extend the interval right?
m
thought for 164.4 seconds
Thanks — that context helps. Short answer to your question: if you need the child runs to appear as subflows of the parent, keep
as_subflow=True
. Yes,
run_deployment
will wait and poll those runs; to reduce API traffic, increase
poll_interval
(e.g., 30–120s). Also throttle creation so you don’t spike flow-run-creation requests. Why you’re not hitting 250 with one k8s worker - Pool concurrency 250 is an upper bound. Actual concurrency is the min of: - Pool limit (250) - Worker/infrastructure throughput (one k8s worker can submit many jobs, but your cluster must have capacity/quotas for ~250 pods) - How fast you can create runs without hitting API limits - Creating many runs + per-run polling from the parent can flood the API before workers can start pods, so you end up with lots of Scheduled runs and lower effective concurrency. Throttling creation and increasing polling intervals usually fixes this. Recommendations 1) Keep the parent-child relationship but poll less: - Use
as_subflow=True
and set a higher
poll_interval
(30–120s) to cut GETs by 6–24x. 2) Throttle flow-run creation: - Limit parallel
run_deployment
calls so you don’t burst-create thousands of runs at once. - Reuse a single
PrefectClient
and, if possible, resolve the deployment ID once to avoid repeated name→ID lookups. 3) Ensure infra can actually run ~250 pods: - One k8s worker can submit jobs quickly, but your cluster must have node capacity and API quotas for 250 concurrent pods. Consider running 2–3 workers for submission throughput/redundancy (pool limit still enforces the 250 cap). Concrete example
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import PrefectClient
import asyncio

@flow
async def parent(requests, num_to_process, batch_size):
    async with PrefectClient() as client:
        # Optional: resolve deployment once to avoid repeated lookups
        # dep = await client.read_deployment_by_name("flow/deployment")
        # dep_id = dep.id

        create_sem = asyncio.Semaphore(50)  # throttle creation concurrency

        async def launch(req):
            async with create_sem:
                return await run_deployment(
                    name=req.deployment_name,  # prefer dep_id if you have it
                    as_subflow=True,           # keep parent-child relationship
                    poll_interval=60,          # reduce polling frequency
                    client=client,             # reuse one client
                )

        tasks = []
        for j in range(len(requests)):
            for i in range(0, num_to_process, batch_size):
                tasks.append(asyncio.create_task(launch(requests[j])))

        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
Notes - In Prefect 3.x the parameter is
name
(or a UUID), not
deployment_name
— just making sure that matches your code. - If you do NOT need the parent to wait for completion, you could use
as_subflow=False
to avoid polling entirely (but you’ll lose the subflow link in the UI). If you want “linked, but don’t wait,” we can also discuss a two-phase approach where you create subflows and monitor them later in batches. References - Running deployments: How to run deployments - Concurrency limits: Global concurrency limits - Rate limits: Rate limits - Flow runs API: Flow runs API Follow-ups to tune this precisely: - Roughly how many child runs are you creating per minute? - Is a 60–120s polling interval acceptable for your SLAs? - How many nodes/pods can your cluster support concurrently right now? If needed, we can talk about worker replicas and cluster quotas to actually sustain ~250 concurrent runs.
w
@Marvin isnt the client already being reused since the run_deployment is being run within a flow run? or does that create a new one everytime?
m
thought for 124.3 seconds
Great question. Short answer: not automatically. -
run_deployment
accepts an optional
client
parameter. If you don’t pass one, it will open/manage its own client for that call. So even inside a flow run, each
run_deployment(...)
can create its own ephemeral client unless you provide one. - To guarantee reuse (and reduce connection churn + request overhead), create one
PrefectClient
in your parent flow and pass it to every
run_deployment
call. Example:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import PrefectClient
import asyncio

@flow
async def parent(requests):
    async with PrefectClient() as client:
        sem = asyncio.Semaphore(50)  # throttle creation
        async def launch(req):
            async with sem:
                return await run_deployment(
                    name=req.deployment_name,
                    as_subflow=True,      # keep parent-child relationship
                    poll_interval=60,     # reduce polling frequency
                    client=client,        # reuse the same client
                )
        tasks = [asyncio.create_task(launch(r)) for r in requests]
        return await asyncio.gather(*tasks, return_exceptions=True)
Notes: - Keeping
as_subflow=True
is what creates the parent-child link. Polling is what drives waiting; increasing
poll_interval
(e.g., 30–120s) reduces API load while preserving the relationship and blocking semantics. - If you don’t need to wait for completion, you could set
as_subflow=False
(no subflow link) to avoid polling entirely. There isn’t a “linked but non-blocking” mode today via
run_deployment
. Docs: Run deployments
w
@Marvin is run_deployment synchronous? like for the following:
Copy code
import asyncio
from asyncio import BoundedSemaphore
from typing import Union

from prefect import context, flow, get_run_logger
from prefect.client.orchestration import PrefectClient, SyncPrefectClient
from prefect.client.schemas.objects import FlowRun
from prefect.deployments import run_deployment


@flow
async def batch_deployment_runner(
    deployment_name: str,
    batch_items: list[dict],
    max_concurrent_api_calls: int = 10,
    batch_size: int = 5,
) -> dict:
    """
    Efficiently runs multiple deployment instances with controlled concurrency.

    Args:
        deployment_name: Name of the deployment to run
        batch_items: List of parameter dictionaries for each deployment run
        max_concurrent_api_calls: Max simultaneous API calls to Prefect (rate limiting)
        batch_size: Number of items to process per batch

    Returns:
        Metadata about the batch execution
    """
    logger = get_run_logger()

    # Create semaphore to control API call concurrency
    api_semaphore = BoundedSemaphore(max_concurrent_api_calls)

    # Get flow context for client and tags
    run_context = context.get_run_context()
    current_tags = run_context.flow_run.tags

    # Reuse single client connection for all operations
    async with run_context.client as client:
        # Resolve deployment ID once (optimization: avoids repeated API calls)
        deployment = await client.read_deployment_by_name(deployment_name)
        deployment_id = deployment.id if deployment else deployment_name

        # Create all deployment run futures immediately
        deployment_futures = []
        for i in range(0, len(batch_items), batch_size):
            batch = batch_items[i:i + batch_size]
            for j, item_params in enumerate(batch):
                deployment_futures.append(
                    asyncio.ensure_future(
                        run_single_deployment(
                            batch_num=i // batch_size,
                            item_index=j,
                            deployment_id=deployment_id,
                            parameters=item_params,
                            api_semaphore=api_semaphore,
                            tags=current_tags,
                            client=client,
                        )
                    )
                )

    <http://logger.info|logger.info>(f"Starting {len(deployment_futures)} deployment runs...")

    # Execute all deployments concurrently (with API throttling via semaphore)
    finished_runs: list[FlowRun] = await asyncio.gather(*deployment_futures)

    <http://logger.info|logger.info>(f"Completed all {len(finished_runs)} deployment runs")

    # Check for failures
    failed_runs = [
        run for run in finished_runs
        if run and run.state.is_final() and not run.state.is_completed()
    ]

    return {
        "total_runs": len(finished_runs),
        "successful_runs": len(finished_runs) - len(failed_runs),
        "failed_runs": len(failed_runs),
        "deployment_name": deployment_name,
    }


async def run_single_deployment(
    batch_num: int,
    item_index: int,
    deployment_id: str,
    parameters: dict,
    api_semaphore: BoundedSemaphore,
    tags: list[str],
    client: Union[PrefectClient, SyncPrefectClient],
) -> FlowRun | None:
    """
    Runs a single deployment with semaphore-controlled API access.

    Key insight: Semaphore only throttles the API call, not the total concurrency.
    All deployments will eventually run concurrently on Prefect infrastructure.
    """
    logger = get_run_logger()

    # Semaphore controls API submission rate (not total concurrency)
    async with api_semaphore:
        logger.debug(f"Submitting deployment run: batch={batch_num}, item={item_index}")

        result = await run_deployment(
            name=deployment_id,
            parameters=parameters,
            tags=tags,
            client=client,
            poll_interval=30,  # Optimized polling interval
        )

        logger.debug(f"Deployment submitted: batch={batch_num}, item={item_index}")
        return result
does the _api_semaphore_ limit the number of possible concurrent subflows being executed?
m
thought for 75.7 seconds
Short answer - run_deployment is an async function. - With the default as_subflow=True, it waits for the child flow to finish; your await will not return until the child run reaches a terminal state (or timeout). - With as_subflow=False, it just creates the run and returns quickly without waiting. In your code - You acquire the semaphore and then await run_deployment. Because run_deployment waits for completion (as_subflow defaults to True), the semaphore is held for the entire lifetime of each child run. That means the semaphore value effectively caps your concurrent subflows to max_concurrent_api_calls. So yes: your api_semaphore is limiting the number of subflows that can run at once. If your intent is to only rate-limit API calls (not cap subflow concurrency) You have a trade-off because the “linked subflow” behavior is coupled to run_deployment’s blocking wait. Options: 1) Keep subflow linkage, accept waiting, and reduce API load - Keep as_subflow=True, but: - Increase poll_interval (e.g., 30–120s) to cut polling traffic. - Raise the semaphore high enough so it doesn’t become your concurrency ceiling (or remove it), and rely on poll_interval to keep API pressure manageable. 2) Split “submit” from “wait” to avoid holding the semaphore - Submit runs under the semaphore, then wait outside it. The clean way is to use the client directly for submission and implement your own batch monitor: - Submit: await client.create_flow_run_from_deployment(...) - Monitor: periodically read run states in batches (read_flow_runs with filters) every N seconds. - Caveat: Creating runs directly via the client will not mark them as “subflows” of the parent in the UI. Today, the subflow linkage is what run_deployment(as_subflow=True) gives you. 3) If you’re okay losing the subflow link - Use run_deployment(as_subflow=False) to submit quickly under the semaphore, then implement your own batch monitor as above. Practical tweaks to your snippet - If you want to keep subflow linkage and not cap concurrency with the semaphore: - Move the semaphore out or increase it to the max you’re comfortable with. - Keep poll_interval=30 (or higher) to reduce API polling. - If you want a submit-only throttle: - Replace run_deployment with a submit function under the semaphore: - await client.create_flow_run_from_deployment(deployment_id, parameters=..., tags=...) - Store run IDs; outside the semaphore, run a batch monitor that polls every 30–60s for all IDs at once. Docs for reference - Running deployments: How to run deployments - Rate limits: Rate limits - Flow runs API: Flow runs API If you want, I can sketch the “submit under semaphore + batch monitor” pattern. It’s the best way to avoid API spikes while still hitting high actual concurrency.
w
@Marvin why do some many of my flow runs get stuck in a pending state?
m
🔄 Researching your answer... this may take a while 🔧 Using tool:
check_cli_command
📊 Tools used: •
check_cli_command
(1x) •
research_prefect_topic
(1x)
w
@Marvin why do some many of my flow runs get stuck in a pending state for like 2h?
m
thought for 186.8 seconds
Sorry you’re hitting that — let’s get it unblocked. In Prefect 3.x, a flow run that sits in Pending usually means a worker submitted the job to your infrastructure, but the job hasn’t started (or can’t start). Common causes: - Kubernetes: pod Pending (insufficient CPU/memory, no nodes, ImagePullBackOff) - Docker: slow/failed image pulls, container never starts - AWS Batch/ECS: queue backlogged or no capacity - Process/local: machine is overloaded or concurrency limits elsewhere - Worker crash after submission (job was created but not monitored to Running) Quick checks to pinpoint the issue - Inspect the flow run’s state message - It often includes the infra job ID/name and why it’s waiting. - CLI:
Copy code
prefect flow-run inspect <flow-run-id>
Then:
Copy code
prefect flow-run logs <flow-run-id>
- Verify work pool and worker health - Pool status, concurrency, last worker heartbeats:
Copy code
prefect work-pool inspect <pool-name>
- Make sure at least one worker is running and polling the pool (and not crashing):
Copy code
prefect worker start --pool <pool-name>
- Check work queues/backlog
Copy code
prefect work-queue ls
  prefect work-queue preview --work-pool <pool-name>
If runs are stuck due to pool/queue concurrency limits, they’ll usually remain Scheduled, not Pending — but it’s worth confirming limits aren’t set. Infra-specific quick checks - Kubernetes - Find pod name in the state message or worker logs, then:
Copy code
kubectl get pods -n <namespace> -w
    kubectl describe pod/<name> -n <namespace>
- Look for Unschedulable, insufficient resources, or ImagePullBackOff. - Docker - On the worker host:
Copy code
docker ps -a
    docker logs <container-id>
- If image pulls are slow, pre-pull/cache the image or use a closer registry. - AWS Batch/ECS - Check queue/service events for capacity or placement failures. - Ensure the compute environment can scale. If your runs are actually stuck in Scheduled (not Pending), that points to no workers, paused pool/queue, or concurrency limits — different root cause. But since you said Pending for ~2h, it strongly suggests infrastructure isn’t starting the job. Could you share: - Your work pool type (Kubernetes, Docker, Process, ECS/Batch, etc.) - One example flow run ID and pool name - The state message from
prefect flow-run inspect <id>
References - Manage work pools and workers - Worker CLI reference - Flow and state concepts
w
@Marvin would scaling the workers help?
m
thought for 231.4 seconds
It depends on what’s slowing you down. Scaling workers helps when: - You have a backlog of Scheduled/Pending flow runs in a work pool - Each flow run is relatively independent, and you want higher throughput - Your pool/queue concurrency is the bottleneck (not the flow’s internal task execution) Scaling workers will not speed up a single heavy flow run. For that, parallelize the flow’s tasks (e.g., Dask/Ray) or give its infrastructure more CPU/memory. Quick checks and actions: - Inspect your pool and queues:
Copy code
prefect work-pool inspect <pool>
  prefect work-queue ls
- If you see “Blocked by concurrency limit” or a backlog, raise limits:
Copy code
prefect work-pool set-concurrency-limit "<pool>" 10
  prefect work-queue set-concurrency-limit "<queue>" 10
- Add more workers to the pool:
Copy code
prefect worker start -p "<pool>" -n "worker-2"
Docs: - Workers and work pools: Workers &amp; Work Pools - Worker CLI: prefect worker CLI - Global concurrency limits: Global Concurrency Limits - Work queues: prefect work-queue CLI - Work pools: prefect work-pool CLI If you can share: - Are you seeing a backlog of flow runs, or is a single flow run too slow? - Which worker type and infrastructure are you using (process, Docker, Kubernetes, etc.)? - Any pool/queue/global concurrency limits configured? With that, I can recommend the most effective scaling approach.
w
@Marvin I am seeing a backlog of flow runs stuck in a pending state, using kubernetes
m
thought for 186.8 seconds
Got it — a backlog of Pending flow runs on Kubernetes usually means workers aren’t picking them up or something is blocking scheduling. Scaling workers often helps, but let’s confirm the bottleneck first. Quick diagnostics: 1) Check the work pool status and limits
Copy code
prefect work-pool inspect <pool>
- Ensure the pool isn’t paused - Look for “Open slots,” active worker heartbeats, and any “blocked by concurrency limit” messages 2) Check work queues and limits
Copy code
prefect work-queue ls
- Confirm your deployments target queues that your worker is polling - If needed, raise limits:
Copy code
prefect work-pool set-concurrency-limit "<pool>" 10
prefect work-queue set-concurrency-limit "<queue>" 10
3) Check global/task-tag concurrency limits
Copy code
prefect concurrency-limit ls
- If your flows have tags with low limits, they’ll stick in Pending even if workers are available 4) Inspect a specific flow run for a reason
Copy code
prefect flow-run inspect <flow_run_id>
- The “state_details” often says if it’s blocked by a limit or paused pool If it’s just lack of worker capacity: - Scale your Kubernetes workers. If you run the Prefect worker as a Kubernetes Deployment:
Copy code
kubectl scale deployment/prefect-worker -n <namespace> --replicas=5
- Or start additional workers (wherever you run them):
Copy code
prefect worker start -p "<pool>" -n "k8s-worker-2"
Kubernetes-specific checks (common causes of “Pending” backlogs): - Worker is not attached to the right pool/queues: verify your worker start command includes the correct
-p
and any
-q
- Cluster capacity: pods stuck “Unschedulable” due to CPU/memory/quotas - Image pull issues: private registry without imagePullSecrets - RBAC/service account issues for the job template - Namespace/resource limits in your job template Docs for reference: - Workers & Work Pools: Concepts: Workers &amp; Work Pools - Kubernetes how-to: Run on Kubernetes - Worker CLI: prefect worker CLI - Work queues: prefect work-queue CLI - Global concurrency limits: Global concurrency limits If you can share: - The output of
prefect work-pool inspect <pool>
- Whether your worker runs as a K8s Deployment (and its current replicas) - Any messages in
flow-run inspect
for a Pending run I can then confirm whether scaling workers will clear the backlog or if a limit/config is the real blocker.
Your message was too long, here's your message at the allowed limit: my work runs as a k8s deployment and here is the output of the command `base_job_template={ ‘variables’ { ‘type’ ‘object’ ‘properties’ { ‘env’ { ‘type’ ‘object’ ‘title’ ‘Environment Variables’ ‘default’ {‘ENVIRONMENT’ ‘prod’ ‘PREFECT_LOGGING_LEVEL’ ‘ERROR’} ‘description’ ‘Environment variables to set when starting a flow run ’ ‘additionalProperties’ {‘type’ ‘string’} } ‘name’ {‘type’ ‘string’ ‘title’ ‘Name’ ‘description’ ‘Name given to infrastructure created by a worker ’} ‘image’ { ‘type’ ‘string’ ‘title’ ‘Image’ ‘example’ ‘<http //docker io/prefecthq/prefect 2-latest|docker io/prefecthq/prefect 2-latest>’ ‘description’ ‘The image reference of a container image to use for created jobs If not set the latest Prefect image will be used ’ } ‘labels’ {‘type’ ‘object’ ‘title’ ‘Labels’ ‘description’ ‘Labels applied to infrastructure created by a worker ’ ‘additionalProperties’ {‘type’ ‘string’}} ‘command’ { ‘type’ ‘string’ ‘title’ ‘Command’ ‘description’ ‘The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker ’ } ‘namespace’ {‘type’ ‘string’ ‘title’ ‘Namespace’ ‘default’ ‘prefect’ ‘description’ ‘The Kubernetes namespace to create jobs within ’} ‘pod_labels’ {‘type’ ‘object’ ‘title’ ‘Labels’ ‘description’ ‘Labels applied to EKS pods created by a worker ’ ‘additionalProperties’ {‘type’ ‘string’}} ‘backoff_limit’ {‘type’ ‘integer’ ‘title’ ‘Backoff Limit’ ‘default’ 2 ‘description’ ‘The number of retries to attempt before giving up on a job ’} ‘stream_output’ {‘type’ ‘boolean’ ‘title’ ‘Stream Output’ ‘default’ True ‘description’ ‘If set output will be streamed from the job to local standard output ’} ‘cluster_config’ {‘allOf’ [{‘$ref’ ‘#/definitions/KubernetesClusterConfig’}] ‘title’ ‘Cluster Config’ ‘description’ ‘The Kubernetes cluster config to use for job creation ’} ‘pod_annotations’ {‘type’ ‘object’ ‘title’ ‘Annotations’ ‘description’ ‘Annotations applied to EKS pods created by a worker ’ ‘additionalProperties’ {‘type’ ‘string’}} ‘finished_job_ttl’ { ‘type’ ‘integer’ ‘title’ ‘Finished Job TTL’ ‘default’ 60 ‘description’ ‘The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely ’ } ‘image_pull_policy’ { ‘enum’ [‘IfNotPresent’ ‘Always’ ‘Never’] ‘type’ ‘string’ ‘title’ ‘Image Pull Policy’ ‘default’ ‘IfNotPresent’ ‘description’ ‘The Kubernetes image pull policy to use for job containers ’ } ‘service_account_name’ {‘type’ ‘string’ ‘title’ ‘Service Account Name’ ‘default’ ’<service-account-name> ‘description’ ‘The Kubernetes service account to use for job creation ’} ‘job_watch_timeout_seconds’ { ‘type’ ‘integer’ ‘title’ ‘Job Watch Timeout Seconds’ ‘description’ ‘Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely ’ } ‘pod_watch_timeout_seconds’ {‘type’ ‘integer’ ‘title’ ‘Pod Watch Timeout Seconds’ ‘default’ 1000 ‘description’ ‘Number of seconds to watch for pod creation before timing out ’} ‘kubernetes_pod_maximum_memory’ {‘type’ ‘string’ ‘default’ ‘1024Mi’ ‘description’ ‘Kubernetes pod memory setting’} ‘kubernetes_pod_minimum_memory’ {‘type’ ‘string’ ‘default’ ‘1024Mi’ ‘description’ ‘Kubernetes pod memory setting’} } ‘definitions’ { ‘KubernetesClusterConfig’ { ‘type’ ‘object’ ‘title’ ‘KubernetesClusterConfig’ ‘required’ [‘config’ ‘context_name’] ‘properties’ { ‘config’ {‘type’ ‘object’ ‘title’ ‘Config’ ‘description’ ‘The entire contents of a kubectl config file ’} ‘context_name’ {‘type’ ‘string’ ‘title’ ‘Context Name’ ‘description’ ‘The name of the kubectl context to use ’} } ‘description’ ‘Stores configuration for interaction with Kubernetes clusters \n\nSee
w
@Marvin my worker runs as a k8s deployment and we state is ready
status=WorkPoolStatus.READY
m
thought for 92.0 seconds
Thanks — if your Kubernetes work pool is READY and you still have a Pending backlog, it’s usually one of: - The worker isn’t polling the right queues or isn’t claiming runs - Pool/queue/global concurrency limits are capping throughput - K8s pods are failing to schedule/start (capacity, image pulls, RBAC, etc.) Here’s a quick path to confirm and fix: 1) Confirm the worker is actually claiming runs - Stream worker logs:
Copy code
kubectl logs deploy/<your-prefect-worker-deployment> -n <namespace> -f
You should see logs like “Found X scheduled runs” and “Claiming run …”. If you only see “No work found”, the worker may be pointed at the wrong queues. 2) Verify queues and limits - Which queues is the worker polling? If you started it with
prefect worker start -p "<pool>" -q "<queue>"
, make sure your deployments target the same queue(s). - Inspect the pool and queues:
Copy code
prefect work-pool inspect <pool>
  prefect work-queue ls
  prefect concurrency-limit ls
If you see “blocked by concurrency limit” or very low limits, raise them:
Copy code
prefect work-pool set-concurrency-limit "<pool>" 20
  prefect work-queue set-concurrency-limit "<queue>" 20
3) Preview scheduled work - See what’s expected to run vs open slots:
Copy code
prefect work-pool preview "<pool>" --hours 1
4) If runs are claimed but pods don’t start, check Kubernetes - Look for Prefect-created pods/jobs and their events:
Copy code
kubectl get pods -n <namespace> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>' -w
  kubectl describe pod <pod> -n <namespace>
  kubectl get events -n <namespace> --sort-by=.metadata.creationTimestamp
Common culprits: - Unschedulable: Insufficient CPU/memory or namespace quotas - Image pull failures: add
imagePullSecrets
in the pool’s base job template - RBAC/serviceAccount issues: ensure the service account in the job template can create pods in the namespace - Requests/limits too high in the job template 5) Scale for throughput - If the above look good and you just need more throughput, scale the worker Deployment and ensure limits are raised to match:
Copy code
kubectl scale deployment/<your-prefect-worker-deployment> -n <namespace> --replicas=5
And keep pool/queue limits high enough (step 2). Helpful docs: - Workers & Work Pools concepts: Workers &amp; Work Pools - Kubernetes how-to (job template, imagePullSecrets, service accounts): Run on Kubernetes - Worker CLI: prefect worker CLI - Work Pools/Queues CLI: work-pool CLI, work-queue CLI - Global concurrency limits: Global concurrency limits If you can share: - A snippet from
kubectl logs
for the worker during the backlog - Output of
prefect work-pool inspect <pool>
and
prefect work-queue ls
- One
kubectl describe pod
from a Prefect-run pod stuck Pending …I can pinpoint whether scaling replicas will clear the backlog or if a limit/config in Prefect/K8s is the blocker.
w
@Marvin we scaled up the deployment but we’re still not seeing nodes scaling up
m
thought for 93.0 seconds
Thanks — scaling the Prefect worker Deployment won’t, by itself, trigger node autoscaling. Nodes only scale when there are flow-run pods pending and unschedulable due to insufficient capacity. Let’s pinpoint where it’s stalling: 1) Are flow-run jobs/pods actually being created? - Worker logs should show claims:
Copy code
kubectl logs deploy/<prefect-worker> -n <ns> -f
Look for “Claiming run …” and job creation logs. - Do you see flow pods?
Copy code
kubectl get pods -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
2) If pods exist but are Pending, what’s the reason? - Describe one:
Copy code
kubectl describe pod <pod> -n <ns>
- If you see “Insufficient cpu/memory” → cluster autoscaler should kick in if it’s configured for that node group and not at max nodes. - If you see image pull / RBAC / quota errors → fix those first (autoscaler won’t scale for these). 3) If no flow pods are being created: - The worker may not be claiming runs or is blocked by limits. Check:
Copy code
prefect work-pool inspect <pool>
  prefect work-queue ls
  prefect concurrency-limit ls
If you see “blocked by concurrency limit,” raise them:
Copy code
prefect work-pool set-concurrency-limit "<pool>" 20
  prefect work-queue set-concurrency-limit "<queue>" 20
4) Ensure your flow pods request enough resources to trigger autoscaling - Set requests/limits in the Kubernetes work pool’s base job template (UI or CLI):
Copy code
prefect work-pool update "<pool>" --base-job-template ./k8s-job-template.json
In the template, add resources under the container:
Copy code
{
  "job": {
    "spec": {
      "template": {
        "spec": {
          "containers": [
            {
              "name": "prefect-job",
              "resources": {
                "requests": {
                  "cpu": "1",
                  "memory": "2Gi"
                },
                "limits": {
                  "cpu": "2",
                  "memory": "4Gi"
                }
              }
            }
          ]
        }
      }
    }
  }
}
Also set required
imagePullSecrets
,
serviceAccountName
, taints/tolerations, and nodeSelector/affinity to match your autoscaling node group. 5) If pods are Unschedulable but nodes still don’t scale - Check cluster autoscaler settings: min/max node count, taints/tolerations on the target node group, priority classes, and whether the pending pods match the ASG/NodePool’s selector. - Verify namespace/resource quotas aren’t blocking scheduling. Helpful references: - Kubernetes how-to (job template, imagePullSecrets, service accounts): Run on Kubernetes - Workers & Work Pools: Workers &amp; Work Pools - Work pool/queue CLI: work-pool CLI, work-queue CLI If you can share: - A few lines of worker logs around run claiming - Output of
kubectl get pods -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
and a
kubectl describe
of a Pending pod -
prefect work-pool inspect <pool>
output …I can tell you exactly whether we need to raise Prefect limits, fix job template/resources, or adjust your cluster autoscaler config.
w
@Marvin would old flow runs stuck in bad states like pending, running and cancelling impact the scheduling of pods and picking up of flow runs?
m
thought for 65.8 seconds
Short answer: yes — “zombie” flow runs can block new work. Ways they impact scheduling/pickup: - Work pool/queue concurrency: RUNNING (and often CANCELLING) runs count against pool/queue concurrency; stuck runs keep slots occupied so workers won’t claim new runs. - Global/tag concurrency limits: old runs can hold tag slots, preventing new runs from starting even if workers are available. - Kubernetes capacity: if the old runs still have Pods/Jobs alive or stuck Terminating/Pending, they consume resources and may prevent autoscaling or scheduling of new pods. Quick cleanup checklist 1) Check Prefect limits and open slots
Copy code
prefect work-pool inspect <pool>
prefect work-queue ls
prefect concurrency-limit ls
- If you see slots at 0 or “blocked by concurrency limit”, raise/clear limits:
Copy code
prefect work-pool set-concurrency-limit "<pool>" 20
prefect work-queue set-concurrency-limit "<queue>" 20
- For tag limits with orphaned slots:
Copy code
prefect concurrency-limit inspect <tag>
prefect concurrency-limit reset <tag>
2) Find and resolve stuck runs
Copy code
prefect flow-run ls --state RUNNING --limit 100
prefect flow-run ls --state CANCELLING --limit 100
prefect flow-run ls --state PENDING --limit 100
prefect flow-run inspect <flow_run_id>
Then cancel them (and re-run if needed):
Copy code
prefect flow-run cancel <flow_run_id>
If they won’t resolve and you need to unblock quickly:
Copy code
prefect flow-run delete <flow_run_id>
3) Clean up Kubernetes leftovers
Copy code
kubectl get pods -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
kubectl get jobs -n <ns> -l '<http://prefect.io/flow-run-id|prefect.io/flow-run-id>'
kubectl describe pod <pod> -n <ns>
kubectl delete job <job> -n <ns>
Consider adding TTL for finished Jobs in your work pool’s base job template so completed/failed jobs are cleaned automatically. Refs: - Workers & Work Pools: Concepts: Workers &amp; Work Pools - Global/tag concurrency limits: Global Concurrency Limits - Kubernetes how-to (job template, TTL, imagePullSecrets, service accounts): Run on Kubernetes If you share: -
prefect work-pool inspect <pool>
output - A
prefect concurrency-limit ls
snippet - One
prefect flow-run inspect <id>
for a stuck run - Whether you see leftover Pods/Jobs …I can pinpoint exactly which blocker applies and the fastest remediation.
w
@Marvin how do i add ttl for finished jobs?
m
thought for 71.4 seconds
Yes—you can add Kubernetes TTL for finished Jobs by customizing your Kubernetes work pool’s base job template to include
ttlSecondsAfterFinished
on the Job spec. Two common ways: A) Edit in the UI - Prefect Cloud/Server > Work Pools > your K8s pool > Edit base job template - Add this field under `job.spec`:
Copy code
"job": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {},
    "spec": {
      "ttlSecondsAfterFinished": 3600,
      "template": {
        "spec": {
          "containers": [ ... ],
          "restartPolicy": "Never"
        }
      },
      "backoffLimit": 0
    }
  }
- Save. New flow runs will create Jobs that Kubernetes deletes 1 hour after they finish (and their pods will be GC’d). B) Edit via CLI 1) Grab a starting template (default for K8s):
Copy code
prefect work-pool get-default-base-job-template --type kubernetes -f k8s-template.json
2) Edit
k8s-template.json
to add:
Copy code
"ttlSecondsAfterFinished": 3600
under
job.spec
. 3) Apply it to your pool:
Copy code
prefect work-pool update "<your-pool>" --base-job-template ./k8s-template.json
Notes: - The field must be at
job.spec.ttlSecondsAfterFinished
(not under
template.spec
). - Ensure your cluster supports the TTL controller (it’s standard on modern K8s). - TTL deletes the Job and GC’s its pods; make sure your logs/artifacts are persisted elsewhere before they’re cleaned up. Helpful docs: - Work pools (editing base job templates): Workers &amp; Work Pools - Kubernetes how-to (job templates, service accounts, imagePullSecrets): Run on Kubernetes - Work pool CLI: prefect work-pool CLI If you want, paste your current base job template snippet and I’ll show exactly where to drop the TTL line.
w
@Marvin here’s the error causing the flow runs to be stuck in pending:
Copy code
Failed to submit flow run '068d57f4-7464-74c8-8000-58ce61839ca8' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 72, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 192, in handle_async_request
    connection = await pool_request.wait_for_connection(timeout=timeout)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 35, in wait_for_connection
    await self._connection_acquired.wait(timeout=timeout)
  File "/usr/local/lib/python3.11/site-packages/httpcore/_synchronization.py", line 148, in wait
    with map_exceptions(anyio_exc_map):
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.PoolTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 863, in _submit_run_and_capture_errors
    configuration = await self._get_configuration(flow_run)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 942, in _get_configuration
    flow = await self._client.read_flow(flow_run.flow_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 502, in read_flow
    response = await self._client.get(f"/flows/{flow_id}")
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1814, in get
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1585, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 330, in send
    response = await self._send_with_retry(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 254, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1674, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1776, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 376, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.PoolTimeout
m
thought for 383.8 seconds
Thanks for sharing the traceback. The key part is the final httpx.PoolTimeout raised while the worker is trying to read the flow from the API. That means the worker could not acquire a connection from the HTTP connection pool in time (typically due to API slowness, network/proxy/DNS issues, or too many concurrent requests). When this happens at submit time, runs will sit in Pending. Quick questions to narrow this down: - Are you using Prefect Cloud or a self-hosted Prefect Server? - Where does the worker run (Kubernetes, VM, container) and what version of Prefect is it on? - Roughly how many flow runs are being submitted concurrently when this occurs? - Any HTTP(S) proxy in play? If self-hosted, is there a load balancer or reverse proxy in front of the API? - Do you see elevated API latency or errors in server logs around the same time? Immediate checks and fixes 1) Verify network and API config from the worker - Ensure the worker has the correct API settings: -
PREFECT_API_URL
and
PREFECT_API_KEY
(for Cloud) - From inside the worker environment/container: - curl health endpoint:
Copy code
echo $PREFECT_API_URL
    curl -sv "${PREFECT_API_URL%/}/api/health"
- DNS and reachability:
Copy code
nslookup $(echo $PREFECT_API_URL | awk -F[/:] '{print $4}')
- If you’re behind a proxy, ensure env vars are set appropriately and that your Prefect API host is in
NO_PROXY
to avoid needless proxying: -
HTTP_PROXY
/
HTTPS_PROXY
as needed -
NO_PROXY=<your-prefect-api-host>,localhost,127.0.0.1
2) Reduce burstiness with a work-pool concurrency limit If many runs are being submitted at once, the worker and/or API can get overwhelmed, leading to pool timeouts. Set a concurrency limit on the work pool to smooth load: - Inspect the pool and current limits:
Copy code
prefect work-pool inspect <your-pool>
- Set a limit (e.g., 5):
Copy code
prefect work-pool set-concurrency-limit <your-pool> 5
Docs: Work pool CLI 3) Increase client-side resiliency (retries) Increase the client retries to give the API more time during transient congestion: - Temporarily via environment on the worker:
Copy code
export PREFECT_CLIENT_MAX_RETRIES=10
  export PREFECT_CLIENT_RETRY_JITTER_FACTOR=0.5
- Or persist in your Prefect profile:
Copy code
prefect config set client.max_retries=10 client.retry_jitter_factor=0.5
Then restart the worker. 4) Turn on debug logs on the worker This will show each HTTP request/response and help pinpoint slow or failing requests:
Copy code
export PREFECT_LOGGING_LEVEL=DEBUG
prefect worker start -p <your-pool>
5) Self-hosted tuning (if applicable) - Check API service logs and DB performance at the same time as the timeouts. If the API/DB is slow or connection-limited, the client pool will back up. - Scale out API replicas and ensure your DB pool size and reverse proxy keepalive/connection limits are sufficient. - If you’re using Kubernetes and the Kubernetes observer, heavy event storms can spike API calls; consider isolating the observer or scaling the API to handle the load. Relevant issue for high event volume producing PoolTimeouts: Kubernetes worker PoolTimeout under heavy k8s events 6) Validate you’re on a recent Prefect version Pool timeout behavior has come up in various load scenarios; ensure both server and worker are on current releases so you have recent fixes and improvements. Why this happens - httpx.PoolTimeout means the HTTP client couldn’t acquire a connection from its pool quickly enough. In practice this occurs when: - The API is slow or overloaded (server CPU/DB bottlenecks, rate limiting). - Network path issues (proxy misconfig, DNS resolution delays, load balancer connection limits). - Excessive concurrency from workers or auxiliary services (many runs submitted at once, Kubernetes observer generating bursts of API calls).
- Prefect’s client retries transient failures but if all connections are queued up or the API is consistently slow, you’ll still see PoolTimeouts. Helpful docs - Work pool CLI: Work pool CLI - Self-hosting overview: Self-hosted Prefect - Rate limits concept: Rate limits If you can share: - Cloud vs self-hosted and Prefect versions (server and worker) - Worker environment details (K8s/VM, proxy) - How many runs are queued/submitted at once - Any API/DB logs around the incident …I can suggest more targeted tuning steps.
w
@Marvin is doing the following no longer good for prfect 3.4?
Copy code
deployments:
- name: dep-name
  entrypoint: entry  
  parameters: {}
  work_pool:
    name: kubernetes-pool
    work_queue_name:
    job_variables:
      env:
        - name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
          value: "json"
        - name: DD_AGENT_HOST
          valueFrom:
            fieldRef:
              fieldPath: status.hostIP
getting the following error: Failed to submit flow run ‘068e439a-08ab-7901-8000-0e196b204376’ to infrastructure. Traceback (most recent call last): File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 1277, in _submit_run_and_capture_errors configuration = await self._get_configuration(flow_run) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 1378, in _get_configuration configuration = await self.job_configuration.from_template_and_values( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py”, line 99, in with_injected_client return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.11/site-packages/prefect/workers/base.py”, line 194, in from_template_and_values base_config[“env”] = base_config.get(“env”) | deployment_env ~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~ TypeError: unsupported operand type(s) for |: ‘dict’ and ‘list’
m
thought for 435.1 seconds
Short answer: In Prefect 3.4,
work_pool.job_variables.env
must be a dict (key -> value), not a Kubernetes-style list. Your list form is why you’re getting:
Copy code
TypeError: unsupported operand type(s) for |: 'dict' and 'list'
Prefect is trying to merge dict env from the base job template with your deployment’s env, but it’s a list. What to do instead - For simple env vars, use a dict:
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  parameters: {}
  work_pool:
    name: kubernetes-pool
    job_variables:
      env:
        PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
        # DD_AGENT_HOST can't use valueFrom here; needs a manifest override
- For Kubernetes features like valueFrom/fieldRef, override the Kubernetes job manifest and supply the container env list there. In Prefect 3.x, the Kubernetes worker’s base job template has a
job_manifest
you can override. A minimal example that preserves the built-in templating:
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  parameters: {}
  work_pool:
    name: kubernetes-pool
    job_variables:
      job_manifest:
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: "{{ name }}-"
          labels: "{{ labels }}"
          namespace: "{{ namespace }}"
        spec:
          backoffLimit: 0
          ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
          template:
            spec:
              restartPolicy: "Never"
              serviceAccountName: "{{ service_account_name }}"
              containers:
              - name: prefect-job
                image: "{{ image }}"
                imagePullPolicy: "{{ image_pull_policy }}"
                args: "{{ command }}"
                env:
                - name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
                  value: "json"
                - name: DD_AGENT_HOST
                  valueFrom:
                    fieldRef:
                      fieldPath: status.hostIP
Notes: - Don’t also set
job_variables.env
if you fully control the
job_manifest.env
list; you’d be bypassing the dict-based
env
. - If you want to see the default schema/template for your pool type, you can print it and use it as a starting point:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes
Then copy it into your deployment’s
job_variables.job_manifest
and edit just the bits you need (like the
env
list). Docs: - Customize job variables If you share your current pool’s base job template (
prefect work-pool inspect "kubernetes-pool"
), I can help you craft the exact
job_manifest
override needed. Are you specifically trying to set
DD_AGENT_HOST
to the node’s host IP via the downward API?
w
Copy code
{
  "variables": {
    "type": "object",
    "properties": {
      "env": {
        "type": "object",
        "title": "Environment Variables",
        "default": {
          "ENVIRONMENT": "dev"
        },
        "description": "Environment variables to set when starting a flow run.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "name": {
        "type": "string",
        "title": "Name",
        "description": "Name given to infrastructure created by a worker."
      },
      "image": {
        "type": "string",
        "title": "Image",
        "example": "<http://docker.io/prefecthq/prefect:2-latest|docker.io/prefecthq/prefect:2-latest>",
        "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used."
      },
      "labels": {
        "type": "object",
        "title": "Labels",
        "description": "Labels applied to infrastructure created by a worker.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "command": {
        "type": "string",
        "title": "Command",
        "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
      },
      "namespace": {
        "type": "string",
        "title": "Namespace",
        "default": "prefect",
        "description": "The Kubernetes namespace to create jobs within."
      },
      "stream_output": {
        "type": "boolean",
        "title": "Stream Output",
        "default": true,
        "description": "If set, output will be streamed from the job to local standard output."
      },
      "cluster_config": {
        "allOf": [
          {
            "$ref": "#/definitions/KubernetesClusterConfig"
          }
        ],
        "title": "Cluster Config",
        "description": "The Kubernetes cluster config to use for job creation."
      },
      "finished_job_ttl": {
        "type": "integer",
        "title": "Finished Job TTL",
        "default": 60,
        "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
      },
      "image_pull_policy": {
        "enum": [
          "IfNotPresent",
          "Always",
          "Never"
        ],
        "type": "string",
        "title": "Image Pull Policy",
        "default": "IfNotPresent",
        "description": "The Kubernetes image pull policy to use for job containers."
      },
      "service_account_name": {
        "type": "string",
        "title": "Service Account Name",
        "default": "d-maestro-k8s",
        "description": "The Kubernetes service account to use for job creation."
      },
      "job_watch_timeout_seconds": {
        "type": "integer",
        "title": "Job Watch Timeout Seconds",
        "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
      },
      "pod_watch_timeout_seconds": {
        "type": "integer",
        "title": "Pod Watch Timeout Seconds",
        "default": 1000,
        "description": "Number of seconds to watch for pod creation before timing out."
      },
      "kubernetes_pod_minimum_memory": {
        "type": "string",
        "default": "1024Mi",
        "description": "Kubernetes pod memory setting"
      },
      "kubernetes_pod_maximum_memory": {
        "type": "string",
        "default": "1024Mi",
        "description": "Kubernetes pod memory setting"
      },
      "kubernetes_pod_minimum_cpu": {
        "type": "string",
        "default": "2",
        "description": "Kubernetes pod CPU setting"
      },
      "pod_labels": {
        "type": "object",
        "title": "Labels",
        "description": "Labels applied to EKS pods created by a worker.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "pod_annotations": {
        "type": "object",
        "title": "Annotations",
        "description": "Annotations applied to EKS pods created by a worker.",
        "additionalProperties": {
          "type": "string"
        }
      }
    },
    "definitions": {
      "KubernetesClusterConfig": {
        "type": "object",
        "title": "KubernetesClusterConfig",
        "required": [
          "config",
          "context_name"
        ],
        "properties": {
          "config": {
            "type": "object",
            "title": "Config",
            "description": "The entire contents of a kubectl config file."
          },
          "context_name": {
            "type": "string",
            "title": "Context Name",
            "description": "The name of the kubectl context to use."
          }
        },
        "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
        "secret_fields": [],
        "block_type_slug": "kubernetes-cluster-config",
        "block_schema_references": {}
      }
    },
    "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
  },
  "job_configuration": {
    "env": "{{ env }}",
    "name": "{{ name }}",
    "labels": "{{ labels }}",
    "command": "{{ command }}",
    "namespace": "{{ namespace }}",
    "job_manifest": {
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "env": "{{ env }}",
                "args": "{{ command }}",
                "name": "prefect-job",
                "image": "{{ image }}",
                "imagePullPolicy": "{{ image_pull_policy }}",
                "resources": {
                  "requests": {
                    "memory": "{{ kubernetes_pod_minimum_memory }}",
                    "cpu": "{{ kubernetes_pod_minimum_cpu }}"
                  },
                  "limits": {
                    "memory": "{{ kubernetes_pod_maximum_memory }}"
                  }
                }
              }
            ],
            "completions": 1,
            "parallelism": 1,
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          },
          "metadata": {
            "labels": "{{ pod_labels }}",
            "annotations": "{{ pod_annotations }}"
          }
        },
        "backoffLimit": 0,
        "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
      },
      "metadata": {
        "labels": "{{ labels }}",
        "namespace": "{{ namespace }}",
        "generateName": "{{ name }}-"
      },
      "apiVersion": "batch/v1"
    },
    "stream_output": "{{ stream_output }}",
    "cluster_config": "{{ cluster_config }}",
    "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
    "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
  }
}
m
thought for 17.8 seconds
I can’t open that Slack thread from here—could you paste the manifest YAML into this thread? In the meantime, here’s why you’re seeing that error and how to fix it: - In Prefect 3.4,
work_pool.job_variables.env
must be a dict of key -> value. Your current list-of-env-vars style is Kubernetes-native, but Prefect expects a dict there, so it fails when trying to merge dicts:
Copy code
TypeError: unsupported operand type(s) for |: 'dict' and 'list'
- If you only need simple env vars, use a dict under `job_variables.env`:
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  parameters: {}
  work_pool:
    name: kubernetes-pool
    job_variables:
      env:
        PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
        # DD_AGENT_HOST requires valueFrom; see the next option
- If you need Kubernetes features like valueFrom/fieldRef (e.g., node host IP), override the Kubernetes Job manifest and set the container env list there. Keep the Prefect templating placeholders:
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  parameters: {}
  work_pool:
    name: kubernetes-pool
    job_variables:
      job_manifest:
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: "{{ name }}-"
          labels: "{{ labels }}"
          namespace: "{{ namespace }}"
        spec:
          backoffLimit: 0
          ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
          template:
            spec:
              restartPolicy: "Never"
              serviceAccountName: "{{ service_account_name }}"
              containers:
              - name: prefect-job
                image: "{{ image }}"
                imagePullPolicy: "{{ image_pull_policy }}"
                args: "{{ command }}"
                env:
                - name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
                  value: "json"
                - name: DD_AGENT_HOST
                  valueFrom:
                    fieldRef:
                      fieldPath: status.hostIP
Tips: - Don’t also set
job_variables.env
if you’re defining
env
in the `job_manifest`—otherwise you may override or confuse the merge. - You can view the default Kubernetes worker template (handy to copy and edit) with:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes
- And inspect your specific pool’s current template with:
Copy code
prefect work-pool inspect "kubernetes-pool"
Docs: Customize job variables If you paste your manifest here, I’ll tailor it to the correct Prefect 3.4 shape and ensure the
valueFrom
bit is wired up properly.
w
@Marvin cant i override only parts of the job manifest?
m
thought for 172.3 seconds
Yes—partially. Prefect 3.x deep‑merges dictionaries in the job template, but list fields are replaced. That means: - Dicts (e.g., metadata.labels, spec.template.spec) can be overridden piecemeal. - Lists (e.g., spec.template.spec.containers, containers[0].env) are replaced as a whole when you provide them. Practical options for your case (adding a valueFrom env var): Option A: Concatenate to the default env via Jinja This preserves the env values generated from
job_variables.env
(dict) and adds your
valueFrom
entry. Because
containers[0].env
is a list, we replace it but use Jinja to include the original env list plus your addition.
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  work_pool:
    name: kubernetes-pool
    job_variables:
      env:
        PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
      job_manifest:
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: "{{ name }}-"
          labels: "{{ labels }}"
          namespace: "{{ namespace }}"
        spec:
          backoffLimit: 0
          ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
          template:
            spec:
              restartPolicy: "Never"
              serviceAccountName: "{{ service_account_name }}"
              containers:
              - name: prefect-job
                image: "{{ image }}"
                imagePullPolicy: "{{ image_pull_policy }}"
                args: "{{ command }}"
                env: "{{ env + [{'name': 'DD_AGENT_HOST', 'valueFrom': {'fieldRef': {'fieldPath': 'status.hostIP'}}}] }}"
Notes: -
env
on the container is now a Jinja expression that takes the rendered default
env
list and appends your
valueFrom
item. - Keep the templated placeholders (
{{ image }}
,
{{ command }}
, etc.) so you don’t lose defaults. Option B: Replace env entirely in the manifest If you don’t need
job_variables.env
, you can fully define the container’s
env
list yourself. This is simpler but does not pull in the dict-based
env
at all.
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  work_pool:
    name: kubernetes-pool
    job_variables:
      job_manifest:
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: "{{ name }}-"
          labels: "{{ labels }}"
          namespace: "{{ namespace }}"
        spec:
          backoffLimit: 0
          ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
          template:
            spec:
              restartPolicy: "Never"
              serviceAccountName: "{{ service_account_name }}"
              containers:
              - name: prefect-job
                image: "{{ image }}"
                imagePullPolicy: "{{ image_pull_policy }}"
                args: "{{ command }}"
                env:
                - name: PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER
                  value: "json"
                - name: DD_AGENT_HOST
                  valueFrom:
                    fieldRef:
                      fieldPath: status.hostIP
Why your original failed - In 3.4,
job_variables.env
must be a dict (key -> value). Supplying a list of Kubernetes EnvVar objects there causes the merge error you saw: TypeError: unsupported operand type(s) for |: 'dict' and 'list' Helpful commands - Print the default Kubernetes worker template (good starting point to copy/modify):
Copy code
prefect work-pool get-default-base-job-template --type kubernetes
- Inspect your specific work pool’s base job template to see what you’re merging with:
Copy code
prefect work-pool inspect "your-pool-name"
Docs - Customize job variables: Customize job variables If you paste your current manifest snippet here, I can help rewrite it using Option A (Jinja concatenation) so you only “override” the env list while preserving the rest.
w
@Marvin would using volumes wprk like :
Copy code
volumes:
        - name: podinfo
          downwardAPI:
            items:
              - path: host_ip
                fieldRef:
                  fieldPath: status.hostIP
      volume_mounts:
        - name: podinfo
          mountPath: /etc/podinfo
          readOnly: true
m
thought for 30.4 seconds
Short answer: Yes, a downwardAPI volume works in Kubernetes, but make sure you put it in the right place and use the correct field names (camelCase). Also note: because containers/volumeMounts are lists, you’ll replace those lists when you provide them in the manifest—so re-include the default placeholders to preserve Prefect’s defaults. Example job_manifest override that adds the volume + mount while keeping Prefect’s defaults intact:
Copy code
deployments:
- name: dep-name
  entrypoint: entry
  work_pool:
    name: kubernetes-pool
    job_variables:
      job_manifest:
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: "{{ name }}-"
          labels: "{{ labels }}"
          namespace: "{{ namespace }}"
        spec:
          backoffLimit: 0
          ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
          template:
            spec:
              restartPolicy: "Never"
              serviceAccountName: "{{ service_account_name }}"
              volumes:
              - name: podinfo
                downwardAPI:
                  items:
                  - path: host_ip
                    fieldRef:
                      fieldPath: status.hostIP
              containers:
              - name: prefect-job
                image: "{{ image }}"
                imagePullPolicy: "{{ image_pull_policy }}"
                args: "{{ command }}"
                env: "{{ env }}"
                volumeMounts:
                - name: podinfo
                  mountPath: /etc/podinfo
                  readOnly: true
Notes: - Use volumeMounts (camelCase), not volume_mounts. - Place volumes under spec.template.spec.volumes and mounts under the container spec. - This will write the node IP to /etc/podinfo/host_ip inside the container. If your goal is to set DD_AGENT_HOST, you’d still need to read that file and export it (e.g., via a wrapper entrypoint). If you want an env var directly, it’s simpler to add it via valueFrom:
Copy code
env: "{{ env + [{'name': 'DD_AGENT_HOST', 'valueFrom': {'fieldRef': {'fieldPath': 'status.hostIP'}}}] }}"
That appends the downward API env var to the existing env generated from job_variables.env. Related docs: Customize job variables If you share your current deployment/pool template, I can fold the volume/mount or the valueFrom env into it with the least disruption.
w
@Marvin
Copy code
Failed to submit flow run '068e44e0-d35e-7325-8000-43ee8c28b2a5' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 917, in _create_job
    job = await batch_client.create_namespaced_job(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 192, in __call_api
    raise e
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
    response_data = await self.request(
                    ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 249, in POST
    return (await self.request("POST", url,
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 206, in request
    raise ApiException(http_resp=r)
kubernetes_asyncio.client.exceptions.ApiException: (422)
Reason: Unprocessable Entity
HTTP response headers: <CIMultiDictProxy('Audit-Id': 'dfbd34dd-0efd-4fbc-b845-e231e3cd6e9b', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_ENV\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_SERVICE\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_VERSION\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_MAIN_PACKAGE\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_TRACE_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_PROFILING_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_PROFILING_STACK_V2_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].DD_PROFILING_TIMELINE_ENABLED\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.containers[0].env[16].PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.completions\\""', 'Warning': '299 - "unknown field \\"spec.template.spec.parallelism\\""', 'X-Kubernetes-Pf-Flowschema-Uid': '0c64a2c2-6776-49b6-bcb7-8244c03aa61d', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'ed1ab123-74e2-448b-8af9-4a37f73567d5', 'Date': 'Mon, 06 Oct 2025 23:17:48 GMT', 'Content-Length': '409')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"caped-wolf-ppw86\" is invalid: spec.template.spec.containers[0].env[16].name: Required value","reason":"Invalid","details":{"name":"caped-wolf-ppw86","group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].env[16].name"}]},"code":422}



The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 1281, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 745, in run
    job = await self._create_job(configuration, client)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
    return await copy(fn, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__
    do = await self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
    result = await action(retry_state)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/_utils.py", line 99, in inner
    return call(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 420, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 187, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__
    result = await fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 929, in _create_job
    raise InfrastructureError(
prefect.exceptions.InfrastructureError: Unable to create Kubernetes job: Unprocessable Entity: Job.batch "caped-wolf-ppw86" is invalid: spec.template.spec.containers[0].env[16].name: Required value
m
thought for 90.4 seconds
Thanks for the full error — that pinpoints the issue. What Kubernetes is telling you: - spec.template.spec.containers[0].env[16].name: Required value - unknown field "…env[16].DD_ENV", "…DD_SERVICE", etc. That happens when one item in the env list is a dict of many envs instead of a proper EnvVar object. For Kubernetes, containers[].env must be a list where each item is: - either { name: SOME_NAME, value: SOME_VALUE } - or { name: SOME_NAME, valueFrom: {...} } If you accidentally did something like this in your job_manifest:
Copy code
env: "{{ env + [ { 'DD_ENV': 'prod', 'DD_SERVICE': 'svc', ... } ] }}"
Kubernetes sees a single env list item with keys DD_ENV, DD_SERVICE, … — none of which are valid fields — and there’s no required name field. Hence the 422. How to fix it - Keep simple variables in job_variables.env as a dict (Prefect will convert them to the proper list of {name,value}):
Copy code
work_pool:
  name: kubernetes-pool
  job_variables:
    env:
      DD_ENV: "prod"
      DD_SERVICE: "your-service"
      DD_VERSION: "1.2.3"
      DD_MAIN_PACKAGE: "your_pkg"
      DD_TRACE_ENABLED: "true"
      DD_PROFILING_ENABLED: "true"
      DD_PROFILING_STACK_V2_ENABLED: "true"
      DD_PROFILING_TIMELINE_ENABLED: "true"
      PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER: "json"
- Only append the special valueFrom variable(s) in the container’s env via Jinja, making sure each appended item has name/valueFrom:
Copy code
job_variables:
  job_manifest:
    apiVersion: batch/v1
    kind: Job
    metadata:
      generateName: "{{ name }}-"
      labels: "{{ labels }}"
      namespace: "{{ namespace }}"
    spec:
      # parallelism/completions belong here on Job.spec (see note below)
      backoffLimit: 0
      ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
      template:
        spec:
          restartPolicy: "Never"
          serviceAccountName: "{{ service_account_name }}"
          containers:
          - name: prefect-job
            image: "{{ image }}"
            imagePullPolicy: "{{ image_pull_policy }}"
            args: "{{ command }}"
            env: "{{ env + [ {'name': 'DD_AGENT_HOST', 'valueFrom': {'fieldRef': {'fieldPath': 'status.hostIP'}}} ] }}"
Important: Don’t put multiple envs into a single dict item in the list. Each env var should be its own object with name/value or name/valueFrom. About the other warnings (completions/parallelism) Your error also shows: - unknown field "spec.template.spec.completions" - unknown field "spec.template.spec.parallelism" Those fields belong under Job.spec, not under Pod template spec. If you set them, place them at spec.parallelism and spec.completions (sibling to template), not under spec.template.spec. Example:
Copy code
spec:
  parallelism: 1
  completions: 1
  template:
    spec:
      containers:
      - ...
Docs: Customize job variables If you paste your current job_manifest block (just the spec.template.spec and containers/env bits), I’ll rewrite it so it validates and preserves Prefect’s defaults.