Scott Pedersen
09/18/2025, 6:30 AMMarvin
09/18/2025, 6:30 AMMarvin
09/18/2025, 6:37 AMprefect concurrency-limit ls
- Inspect a specific tag’s limit to see active slots:
prefect concurrency-limit inspect <tag>
Docs: Global concurrency limits in Prefect 3
2) Verify the task’s actual state and history
- In the UI task details, look for state type and message.
- Or via CLI:
prefect task-run inspect <task_run_id>
This will show the state history (e.g., AwaitingConcurrencySlot, Retrying, etc.)
3) Confirm it’s not pool/queue throttling (affects flow start, not tasks, but worth a peek)
prefect work-pool inspect <pool_name>
4) Check the flow-run pod logs (Kubernetes)
- Restarting the worker pod won’t affect already-launched flow run pods. The flow run executes in its own job/pod. Check that pod’s logs for:
- API connectivity issues (retries proposing state)
- Any exceptions before the first task executes
- If there’s an async/sync mismatch or the event loop is blocked
5) Enable debug logging on the run to see state proposals
- Set PREFECT_LOGGING_LEVEL=DEBUG
in the job environment for the deployment, then re-run. You’ll see messages like “proposing Running” and if it’s rejected due to concurrency.
6) Sanity check task runner configuration
- If you’re using a custom task runner or async flow, try explicitly using the default runner:
from prefect import flow
from prefect.task_runners import PrefectTaskRunner
@flow(task_runner=PrefectTaskRunner())
def my_flow():
...
- Rare misconfigurations can prevent tasks from being picked up.
If you can share:
- One affected flow_run_id and the first task_run_id
- The task’s state type/message from the UI (or output of prefect task-run inspect <id>
)
- Output of prefect concurrency-limit ls
- Your work pool type and any job template concurrency settings
…I can help pinpoint the root cause quickly.Scott Pedersen
09/18/2025, 6:58 AMMarvin
09/18/2025, 6:58 AMMarvin
09/18/2025, 6:59 AMprefect concurrency-limit ls
prefect concurrency-limit inspect <tag>
- Restarting an agent/worker does not affect already-launched flow run processes/pods; check the flow run’s own logs (e.g., the Kubernetes job/pod) for errors or API connectivity issues.
- The UI task details and state history are your best first clue (Pending vs AwaitingRetry vs AwaitingConcurrencySlot).
Key differences to keep in mind for 2.x:
- You may be using an “agent” (earlier 2.x) or “worker” (later 2.x). Ensure whichever you use is healthy and polling the right work queue.
- Task runners differ: in 2.x you’d typically use ConcurrentTaskRunner
or DaskTaskRunner
instead of PrefectTaskRunner
. If you suspect a task-runner config issue, try explicitly setting a basic runner:
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
...
If you can share your exact 2.x version and whether you’re using an agent or worker (and Kubernetes vs Process), I can tailor the steps. If upgrading is an option, I’d recommend moving to Prefect 3.x where task execution and concurrency controls have been simplified and improved.Scott Pedersen
09/18/2025, 7:00 AMMarvin
09/18/2025, 7:00 AMMarvin
09/18/2025, 7:09 AMprefect task-run inspect <task_run_id>
prefect task-run logs <task_run_id>
2) Look for exhausted tag/global concurrency limits
- If the state is AwaitingConcurrencySlot, a tag limit is likely the cause.
prefect concurrency-limit ls
prefect concurrency-limit inspect <tag>
- If you see “ghost” active slots, you can clear them:
prefect concurrency-limit reset <tag>
3) Check pool/queue throttling (affects flows submitted, but worth validating)
- Work pool concurrency (flow-run level):
prefect work-pool inspect <pool_name>
- Work queue concurrency:
prefect work-queue inspect <queue_name> -p <pool_name>
If limits are too low, bump them:
prefect work-pool set-concurrency-limit <pool_name> <limit>
prefect work-queue set-concurrency-limit <queue_name> <limit> -p <pool_name>
4) Task runner specifics
- If you’re using DaskTaskRunner, a cluster/scheduler hiccup can strand tasks “pending”.
- Test the same flow with a simple local runner to isolate:
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
...
- If this runs fine, check your Dask scheduler/worker logs and connectivity from the flow pod.
5) Kubernetes specifics
- Restarting the worker pod doesn’t affect already-launched flow pods. Check the logs of the flow’s Kubernetes job/pod itself for:
- API connectivity errors (state proposals failing)
- Exceptions before the first task executes
- Consider enabling debug logs for a repro:
- Set PREFECT_LOGGING_LEVEL=DEBUG
in the deployment/job environment and re-run to see state proposal details.
6) Hidden tag limits
- Make sure tasks aren’t inheriting tags (e.g., from the flow or PREFECT_TAGS
env var) that match an active concurrency limit.
If you can share:
- One affected flow_run_id and the first task_run_id
- The task state message (from UI or prefect task-run inspect
)
- Output of prefect concurrency-limit ls
- Whether you’re using DaskTaskRunner or ConcurrentTaskRunner
- Your work pool type (sounds like Kubernetes)
…I can help pinpoint the exact cause and next steps quickly.