Lester Cayabyab
08/26/2025, 6:50 PMMarvin
08/26/2025, 6:50 PMNate
08/26/2025, 6:50 PMMarvin
08/26/2025, 6:57 PMprefect work-pool set-concurrency-limit <pool-name> --limit <N>
Docs: Work pool CLI, Global concurrency limits
- Inside a single flow run (tasks): By default, Prefect uses ThreadPoolTaskRunner
. If you do not set a value, concurrency is whatever the underlying Python ThreadPoolExecutor defaults to for your Python version. You can pin it explicitly:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow():
...
If you want me to suggest a concrete setup (e.g., “no more than 5 flow runs per pool and 10 tasks per flow at once”), tell me your target numbers and I’ll provide exact commands/code.Lester Cayabyab
08/26/2025, 7:09 PMMarvin
08/26/2025, 7:09 PMMarvin
08/26/2025, 7:20 PMprefect work-pool set-concurrency-limit <pool-name> --limit 1
- Docs: Work pool CLI
2) Use a Global Concurrency Limit (GCL) with a limit of 1 and acquire it in the flow
- Create a named GCL:
prefect gcl create my-deployment-serial --limit 1
- Acquire the slot at the beginning of your flow so a second run waits until the first releases it:
from prefect import flow
from prefect.concurrency.sync import concurrency
@flow
def my_flow():
with concurrency("my-deployment-serial", timeout_seconds=None):
# entire flow body runs under this lock
...
- Docs: Global concurrency limits
Notes and guidance:
- Option 1 is the simplest if you can give the deployment its own pool; it will queue additional runs automatically.
- Option 2 is more targeted when you can’t limit the entire pool. It ensures only one run of this flow proceeds at a time, regardless of which worker picks it up.
- If you’re on Prefect Cloud, there may be a UI control for deployment-level “max concurrent runs” depending on your plan; if you don’t see it, use one of the options above.
If you tell me whether you want “queue” or “skip/cancel” behavior for overlapping runs, and whether you’re using a shared or dedicated work pool, I can recommend the exact setup.Lester Cayabyab
08/26/2025, 7:32 PMMarvin
08/26/2025, 7:32 PMNate
08/26/2025, 7:33 PMMarvin
08/26/2025, 7:33 PMprefect gcl create my-deployment-serial --limit 1
# Useful tooling
prefect gcl ls
prefect gcl inspect my-deployment-serial
prefect gcl update my-deployment-serial --limit 1 --enable
Docs: Global concurrency limits
Example A: Serialize the entire flow (queue behavior)
- This blocks until the slot is free, so new runs wait for the current run to finish.
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
@flow
def my_flow():
log = get_run_logger()
# Block until the global slot is acquired
with concurrency("my-deployment-serial", timeout_seconds=None):
<http://log.info|log.info>("Acquired slot; running exclusively")
# ... all flow logic here ...
<http://log.info|log.info>("Done")
Example B: “Skip” if already running (short timeout + early return)
- Instead of waiting, try a short timeout. If acquisition times out, return early.
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
from contextlib import contextmanager
@contextmanager
def try_concurrency(name: str, timeout: float):
# Helper to convert timeout into a clean skip without stacktraces
try:
with concurrency(name, timeout_seconds=timeout):
yield True
except Exception as exc:
# Could be a timeout acquiring the slot
yield False
@flow
def my_flow():
log = get_run_logger()
with try_concurrency("my-deployment-serial", timeout=1.0) as acquired:
if not acquired:
<http://log.info|log.info>("Another run is active; skipping this run.")
return # ends quickly; no work done
<http://log.info|log.info>("Got slot; proceeding")
# ... work ...
Notes:
- This pattern returns early, so the run completes quickly with no work. If you prefer to mark the run as failed/cancelled instead, raise an exception after logging.
Example C: Serialize only a critical section (not the whole flow)
- Useful if most of the flow can run concurrently, but a shared resource must be exclusive.
from prefect import flow
from prefect.concurrency.sync import concurrency
@flow
def my_flow():
# do concurrent work ...
with concurrency("my-deployment-serial", timeout_seconds=None):
# critical section (e.g., DB migration, writing to a shared file, etc.)
perform_exclusive_update()
# continue with concurrent steps...
Example D: Async flow (use the asyncio variant)
- If your flow is async, use the asyncio concurrency context.
import asyncio
from prefect import flow, get_run_logger
from prefect.concurrency.asyncio import concurrency
@flow
async def my_async_flow():
log = get_run_logger()
async with concurrency("my-deployment-serial", timeout_seconds=None):
<http://log.info|log.info>("Acquired slot; running exclusively")
await asyncio.sleep(1)
Example E: One GCL per deployment (name hygiene)
- Create a unique GCL per deployment (e.g., “deploy:my-deployment-name”) and use that name in code (or derive it from your deployment metadata if you prefer). Just be sure to create the matching GCL first.
# assume you've created: `prefect gcl create deploy:my-deployment --limit 1`
from prefect import flow
from prefect.concurrency.sync import concurrency
@flow
def my_flow():
with concurrency("deploy:my-deployment", timeout_seconds=None):
...
Behavior summary
- Queue behavior: Use timeout_seconds=None
to block until the slot is available.
- Skip behavior: Use a short timeout and return early if you can’t acquire the slot.
- Automatic release: The context manager releases the slot when it exits, even on exceptions.
Handy CLI references
- List/inspect limits:
prefect gcl ls
prefect gcl inspect my-deployment-serial
Marvin
08/26/2025, 7:33 PMprefect gcl update my-deployment-serial --limit 1 --enable
Want me to tailor this to your deployment names and pooling setup? Share your deployment name(s) and whether you want queue or skip behavior, and I’ll give you the exact commands and code.Nate
08/26/2025, 7:33 PMLester Cayabyab
08/26/2025, 8:15 PMNate
08/26/2025, 8:17 PM