Josh Stone
09/19/2025, 4:43 AMMarvin
09/19/2025, 4:43 AMMarvin
09/19/2025, 4:46 AMprefect concurrency-limit ...
) — applies to TaskRuns only
- Global by name (CLI: prefect global-concurrency-limit ...
) — requires you to wrap code in a concurrency(...)
context
- Work pool/queue concurrency limits — caps how many FlowRuns a pool/queue dispatches
If you “set a concurrency limit on a deployment”, it won’t automatically reserve slots for the flow; you must use one of the mechanisms above.
- Task-level limits only apply if your running task carries the matching tag. If your tasks/flows don’t have that tag, no slot is acquired and inspect
won’t show an active slot. Note: prefect concurrency-limit inspect
lists TaskRun IDs, not FlowRun IDs.
- Global concurrency limits only apply if your code enters a concurrency context:
- If you created a global limit but didn’t wrap the code with prefect.concurrency.*.concurrency(...)
, no slot will be acquired.
- Lease renewal failures
- Network/API reachability issues or timeouts during the run prevent heartbeats from renewing the lease
- Long blocking operations on the event loop can pause the renewal (e.g., CPU-bound work in the main thread of an async flow)
- Significant clock skew between your worker and the API can cause leases to be considered expired
- Older Prefect versions had a few lease-related fixes; upgrading can help
How to set it up correctly and verify
1) If you want to cap a specific task across deployments (task-level limit by tag)
- Tag your task (or use a tags context):
from prefect import task, flow, tags
@task(tags={"db-write"})
def write_db():
...
@flow
def my_flow():
with tags("db-write"):
write_db()
- Create and inspect the limit:
prefect concurrency-limit create db-write --limit 2
prefect concurrency-limit inspect db-write
- You’ll see TaskRun IDs in active_slots
while those tasks run.
- Docs: Global and task-level concurrency overview
2) If you want to cap any code section or entire flow body (global limit)
- Create a global concurrency limit:
prefect global-concurrency-limit create my-limit --limit 2
prefect global-concurrency-limit inspect my-limit
- Acquire it in code:
from prefect import flow
from prefect.concurrency.asyncio import concurrency
@flow
async def limited_flow():
async with concurrency("my-limit"):
# do work while holding a slot
...
- While inside the context, the engine acquires and periodically renews the lease. If renewals fail, you’ll see the warning you mentioned.
- Docs: Global concurrency limits
3) If you want to cap concurrent FlowRuns of a deployment (scheduling-level control)
- Use work pool or work queue concurrency limits (these control dispatching, not leases inside the run):
prefect work-pool set_concurrency_limit "<pool-name>" 2
prefect work-pool inspect "<pool-name>"
# or per-queue
prefect work-queue set_concurrency_limit "<queue-name>" 2
prefect work-queue inspect "<queue-name>"
Troubleshooting the “lease renewal failed” warning
- Check you’re using the right mechanism for the goal (task tag vs global concurrency vs pool/queue limits).
- Confirm you actually acquired a slot:
- Task-level: prefect concurrency-limit inspect <tag>
- Global: prefect global-concurrency-limit inspect <name>
- Ensure the run can reach your Prefect API throughout execution (no intermittent network issues).Marvin
09/19/2025, 4:46 AMconcurrency("name")
context?
- Are you seeing the warning in task logs or in the flow log?
- Prefect Cloud or self-hosted Server?