<@ULVA73B9P> we have several flow runs that start ...
# ask-marvin
s
@Marvin we have several flow runs that start and don't move past the pending state on the first task. This isn't happening to all flow runs which is making it hard to determine the root cause. I have cancelled and restarted one of the affected flows as well as restarting the worker pod which has made no difference. any ideas?
m
thought for 421.4 seconds
Sorry you’re hitting this — a few things can cause “first task stuck in Pending” in Prefect 3.x. Let’s quickly narrow it down. First, a couple of clarifying questions: - Which work pool/worker type are you using (Kubernetes, Process, etc.)? You mentioned a worker pod restart, so I’m guessing Kubernetes. - In the UI, if you click the pending task, what is the exact state type and message? (e.g., “AwaitingConcurrencySlot for tag X”, “AwaitingRetry”, or just “Pending” with no message) - Do your tasks use tags or have any global/tag-based concurrency limits? Fast checks and actions: 1) Check for tag/global concurrency limits - If a limit is exhausted, tasks will sit waiting for a slot and look like “pending” from a distance. - List limits:
Copy code
prefect concurrency-limit ls
- Inspect a specific tag’s limit to see active slots:
Copy code
prefect concurrency-limit inspect <tag>
Docs: Global concurrency limits in Prefect 3 2) Verify the task’s actual state and history - In the UI task details, look for state type and message. - Or via CLI:
Copy code
prefect task-run inspect <task_run_id>
This will show the state history (e.g., AwaitingConcurrencySlot, Retrying, etc.) 3) Confirm it’s not pool/queue throttling (affects flow start, not tasks, but worth a peek)
Copy code
prefect work-pool inspect <pool_name>
4) Check the flow-run pod logs (Kubernetes) - Restarting the worker pod won’t affect already-launched flow run pods. The flow run executes in its own job/pod. Check that pod’s logs for: - API connectivity issues (retries proposing state) - Any exceptions before the first task executes - If there’s an async/sync mismatch or the event loop is blocked 5) Enable debug logging on the run to see state proposals - Set
PREFECT_LOGGING_LEVEL=DEBUG
in the job environment for the deployment, then re-run. You’ll see messages like “proposing Running” and if it’s rejected due to concurrency. 6) Sanity check task runner configuration - If you’re using a custom task runner or async flow, try explicitly using the default runner:
Copy code
from prefect import flow
from prefect.task_runners import PrefectTaskRunner

@flow(task_runner=PrefectTaskRunner())
def my_flow():
    ...
- Rare misconfigurations can prevent tasks from being picked up. If you can share: - One affected flow_run_id and the first task_run_id - The task’s state type/message from the UI (or output of
prefect task-run inspect <id>
) - Output of
prefect concurrency-limit ls
- Your work pool type and any job template concurrency settings …I can help pinpoint the root cause quickly.
s
@Marvin does this apply to Prefect v2.x as well?
m
thought for 65.1 seconds
Short answer: yes — most of the debugging steps apply to Prefect 2.x as well, with a few terminology differences. What’s the same in 2.x: - Concurrency limits can keep tasks in a waiting/pending state. Check the task’s state message in the UI for “AwaitingConcurrencySlot”. - You can inspect concurrency limits by tag:
Copy code
prefect concurrency-limit ls
prefect concurrency-limit inspect <tag>
- Restarting an agent/worker does not affect already-launched flow run processes/pods; check the flow run’s own logs (e.g., the Kubernetes job/pod) for errors or API connectivity issues. - The UI task details and state history are your best first clue (Pending vs AwaitingRetry vs AwaitingConcurrencySlot). Key differences to keep in mind for 2.x: - You may be using an “agent” (earlier 2.x) or “worker” (later 2.x). Ensure whichever you use is healthy and polling the right work queue. - Task runners differ: in 2.x you’d typically use
ConcurrentTaskRunner
or
DaskTaskRunner
instead of
PrefectTaskRunner
. If you suspect a task-runner config issue, try explicitly setting a basic runner:
Copy code
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
    ...
If you can share your exact 2.x version and whether you’re using an agent or worker (and Kubernetes vs Process), I can tailor the steps. If upgrading is an option, I’d recommend moving to Prefect 3.x where task execution and concurrency controls have been simplified and improved.
s
@Marvin we are on version 2.20.16
m
thought for 530.3 seconds
Got it — thanks for the version. Here’s a focused checklist for Prefect 2.20.16. What to check first 1) Inspect the exact task state and message - In the UI, open the first task and check if it’s Pending, AwaitingConcurrencySlot, AwaitingRetry, etc. - Or via CLI:
Copy code
prefect task-run inspect <task_run_id>
prefect task-run logs <task_run_id>
2) Look for exhausted tag/global concurrency limits - If the state is AwaitingConcurrencySlot, a tag limit is likely the cause.
Copy code
prefect concurrency-limit ls
prefect concurrency-limit inspect <tag>
- If you see “ghost” active slots, you can clear them:
Copy code
prefect concurrency-limit reset <tag>
3) Check pool/queue throttling (affects flows submitted, but worth validating) - Work pool concurrency (flow-run level):
Copy code
prefect work-pool inspect <pool_name>
- Work queue concurrency:
Copy code
prefect work-queue inspect <queue_name> -p <pool_name>
If limits are too low, bump them:
Copy code
prefect work-pool set-concurrency-limit <pool_name> <limit>
prefect work-queue set-concurrency-limit <queue_name> <limit> -p <pool_name>
4) Task runner specifics - If you’re using DaskTaskRunner, a cluster/scheduler hiccup can strand tasks “pending”. - Test the same flow with a simple local runner to isolate:
Copy code
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
    ...
- If this runs fine, check your Dask scheduler/worker logs and connectivity from the flow pod. 5) Kubernetes specifics - Restarting the worker pod doesn’t affect already-launched flow pods. Check the logs of the flow’s Kubernetes job/pod itself for: - API connectivity errors (state proposals failing) - Exceptions before the first task executes - Consider enabling debug logs for a repro: - Set
PREFECT_LOGGING_LEVEL=DEBUG
in the deployment/job environment and re-run to see state proposal details. 6) Hidden tag limits - Make sure tasks aren’t inheriting tags (e.g., from the flow or
PREFECT_TAGS
env var) that match an active concurrency limit. If you can share: - One affected flow_run_id and the first task_run_id - The task state message (from UI or
prefect task-run inspect
) - Output of
prefect concurrency-limit ls
- Whether you’re using DaskTaskRunner or ConcurrentTaskRunner - Your work pool type (sounds like Kubernetes) …I can help pinpoint the exact cause and next steps quickly.