<@ULVA73B9P> what is the default concurrency behav...
# ask-marvin
l
@Marvin what is the default concurrency behavior for flow runs?
m
thought for 454.4 seconds
n
short answer: none - ie no concurrency limits by default
1
m
Short answer: Unlimited, unless you set a limit. Details (Prefect 3.x): - Flow runs across your workspace: There is no default global limit. Any available worker will start flow runs as it can. You can add limits with Global Concurrency Limits (GCL) or per work pool; neither is enabled by default. - Work pool concurrency: New pools have no limit by default. You can set one:
Copy code
prefect work-pool set-concurrency-limit <pool-name> --limit <N>
Docs: Work pool CLI, Global concurrency limits - Inside a single flow run (tasks): By default, Prefect uses
ThreadPoolTaskRunner
. If you do not set a value, concurrency is whatever the underlying Python ThreadPoolExecutor defaults to for your Python version. You can pin it explicitly:
Copy code
from prefect import flow
  from prefect.task_runners import ThreadPoolTaskRunner

  @flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
  def my_flow():
      ...
If you want me to suggest a concrete setup (e.g., “no more than 5 flow runs per pool and 10 tasks per flow at once”), tell me your target numbers and I’ll provide exact commands/code.
l
@Marvin how about deployments, can you control the concurrency for it? For example if the current deployment flow is still running will the scheduled flow run wait for this running flow?
m
thought for 610.5 seconds
Short answer: By default, deployments do not serialize their own runs. If a deployment’s schedule fires while a previous run is still active, Prefect will create another flow run; it won’t automatically wait. Ways to control/serialize a deployment’s runs (Prefect 3.x): 1) Put the deployment in a work pool with a concurrency limit of 1 - If you place this deployment in a dedicated work pool and set the pool limit to 1, only one flow run from that pool can execute at a time. Additional scheduled runs will remain pending and start when the current run finishes. - Command:
Copy code
prefect work-pool set-concurrency-limit <pool-name> --limit 1
- Docs: Work pool CLI 2) Use a Global Concurrency Limit (GCL) with a limit of 1 and acquire it in the flow - Create a named GCL:
Copy code
prefect gcl create my-deployment-serial --limit 1
- Acquire the slot at the beginning of your flow so a second run waits until the first releases it:
Copy code
from prefect import flow
  from prefect.concurrency.sync import concurrency

  @flow
  def my_flow():
      with concurrency("my-deployment-serial", timeout_seconds=None):
          # entire flow body runs under this lock
          ...
- Docs: Global concurrency limits Notes and guidance: - Option 1 is the simplest if you can give the deployment its own pool; it will queue additional runs automatically. - Option 2 is more targeted when you can’t limit the entire pool. It ensures only one run of this flow proceeds at a time, regardless of which worker picks it up. - If you’re on Prefect Cloud, there may be a UI control for deployment-level “max concurrent runs” depending on your plan; if you don’t see it, use one of the options above. If you tell me whether you want “queue” or “skip/cancel” behavior for overlapping runs, and whether you’re using a shared or dedicated work pool, I can recommend the exact setup.
l
Can you give more example on the Option 2 @Marvin
m
thought for 86.6 seconds
n
i think marvin is not showing you the simplest path here, there is deployment level concurrency
m
Absolutely—here are a few concrete patterns for Option 2 (Global Concurrency Limits) to serialize a deployment’s runs. Prereq: create a global concurrency limit (GCL) with limit=1
Copy code
prefect gcl create my-deployment-serial --limit 1
# Useful tooling
prefect gcl ls
prefect gcl inspect my-deployment-serial
prefect gcl update my-deployment-serial --limit 1 --enable
Docs: Global concurrency limits Example A: Serialize the entire flow (queue behavior) - This blocks until the slot is free, so new runs wait for the current run to finish.
Copy code
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency

@flow
def my_flow():
    log = get_run_logger()
    # Block until the global slot is acquired
    with concurrency("my-deployment-serial", timeout_seconds=None):
        <http://log.info|log.info>("Acquired slot; running exclusively")
        # ... all flow logic here ...
        <http://log.info|log.info>("Done")
Example B: “Skip” if already running (short timeout + early return) - Instead of waiting, try a short timeout. If acquisition times out, return early.
Copy code
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
from contextlib import contextmanager

@contextmanager
def try_concurrency(name: str, timeout: float):
    # Helper to convert timeout into a clean skip without stacktraces
    try:
        with concurrency(name, timeout_seconds=timeout):
            yield True
    except Exception as exc:
        # Could be a timeout acquiring the slot
        yield False

@flow
def my_flow():
    log = get_run_logger()
    with try_concurrency("my-deployment-serial", timeout=1.0) as acquired:
        if not acquired:
            <http://log.info|log.info>("Another run is active; skipping this run.")
            return  # ends quickly; no work done
        <http://log.info|log.info>("Got slot; proceeding")
        # ... work ...
Notes: - This pattern returns early, so the run completes quickly with no work. If you prefer to mark the run as failed/cancelled instead, raise an exception after logging. Example C: Serialize only a critical section (not the whole flow) - Useful if most of the flow can run concurrently, but a shared resource must be exclusive.
Copy code
from prefect import flow
from prefect.concurrency.sync import concurrency

@flow
def my_flow():
    # do concurrent work ...
    with concurrency("my-deployment-serial", timeout_seconds=None):
        # critical section (e.g., DB migration, writing to a shared file, etc.)
        perform_exclusive_update()
    # continue with concurrent steps...
Example D: Async flow (use the asyncio variant) - If your flow is async, use the asyncio concurrency context.
Copy code
import asyncio
from prefect import flow, get_run_logger
from prefect.concurrency.asyncio import concurrency

@flow
async def my_async_flow():
    log = get_run_logger()
    async with concurrency("my-deployment-serial", timeout_seconds=None):
        <http://log.info|log.info>("Acquired slot; running exclusively")
        await asyncio.sleep(1)
Example E: One GCL per deployment (name hygiene) - Create a unique GCL per deployment (e.g., “deploy:my-deployment-name”) and use that name in code (or derive it from your deployment metadata if you prefer). Just be sure to create the matching GCL first.
Copy code
# assume you've created: `prefect gcl create deploy:my-deployment --limit 1`
from prefect import flow
from prefect.concurrency.sync import concurrency

@flow
def my_flow():
    with concurrency("deploy:my-deployment", timeout_seconds=None):
        ...
Behavior summary - Queue behavior: Use
timeout_seconds=None
to block until the slot is available. - Skip behavior: Use a short timeout and return early if you can’t acquire the slot. - Automatic release: The context manager releases the slot when it exits, even on exceptions. Handy CLI references - List/inspect limits:
Copy code
prefect gcl ls
  prefect gcl inspect my-deployment-serial
- Update limit:
Copy code
prefect gcl update my-deployment-serial --limit 1 --enable
Want me to tailor this to your deployment names and pooling setup? Share your deployment name(s) and whether you want queue or skip behavior, and I’ll give you the exact commands and code.
l
Thanks @Nate! This is what I needed 🙂
n
catjam
😸 1