ÖmerFİ
09/29/2025, 12:19 PMMarvin
09/29/2025, 12:20 PMMarvin
09/29/2025, 12:29 PMfrom prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def my_flow():
...
How to set the limits
- Deployment-level: set concurrency_limit: 1
when defining the deployment (via UI or in your deployment definition). The RunnerDeployment
supports a concurrency_limit
field.
- Work pool limit (keep at 8 for your “normal” concurrency):
prefect work-pool set-concurrency-limit "my-pool" 8
- Work queue limit (optional if you use queue-level caps):
prefect work-queue set-concurrency-limit my-queue 8 --pool my-pool
When to use tag-based concurrency
- Only if you need to limit a specific task (or group of tasks) across many flows/deployments — e.g., a shared resource that must be accessed by one task at a time. Then:
prefect concurrency-limit create serialize-me 1
and tag those tasks:
from prefect import task
@task(tags=["serialize-me"])
def critical_section(...):
...
Docs for reference
- Work pools: Work pools
- Task runners: Task runners
- Global/tag-based concurrency: Global concurrency limits
- Work pool CLI: Work pool CLI
Quick check: Do you also need tasks within that deployment’s flow run to be serialized, or is “one flow run at a time” sufficient? I can tailor the exact config based on that.ÖmerFİ
09/29/2025, 12:38 PMMarvin
09/29/2025, 12:38 PMMarvin
09/29/2025, 12:39 PMconcurrency_limit
only limits how many flow runs of that deployment can run at once. It does not limit task-level parallelism inside a single flow run.
With your setup:
- Tag A has a global limit of 8
- Deployment 1 has concurrency_limit: 8
- Deployment 2 has concurrency_limit: 1
A single run of Deployment 2 can still execute X.map(...)
with up to 8 concurrent task runs, because of the tag A limit (and the flow’s task runner), even though only one flow run of Deployment 2 can be active at a time.
If you want task X to run with 1 concurrency for Deployment 2 only, you have two good options:
Option A: Add a deployment-specific tag with a limit of 1
- Create a new limit:
prefect concurrency-limit create dep2-serial 1
- Add the tag dep2-serial
to Deployment 2 (via UI or deployment definition). Task runs inherit flow-run tags, so X
will have tags ["A", "dep2-serial"]
when run by Deployment 2.
- Because a task must acquire slots for each tagged limit it has, the effective concurrency becomes 1 for Deployment 2, while remaining 8 for other deployments using only tag A.
Option B: Make tasks run sequentially within the flow run
- Set the flow’s task runner to a single worker:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def Z(...):
...
- Note: this applies to all deployments using that flow code, so Option A is usually better if you only want Deployment 2 serialized.
References:
- Global/tag-based concurrency: Global concurrency limits
- Task runners: Task runners
If you only want to serialize task X (and not other tasks in the same flow) for Deployment 2, say so and I can show how to tag just that task for the dep2 runs.ÖmerFİ
09/29/2025, 1:10 PMMarvin
09/29/2025, 1:10 PMMarvin
09/29/2025, 1:12 PMtags
(via decorator or .with_options
)
- Any active prefect.context.tags(...)
context when the task is submitted
How to tag just task X for Deployment 2
Goal: Keep tag A’s global limit at 8, but force X to run at 1 when invoked by Deployment 2. Do this by adding a second tag (e.g., dep2-serial
) only for Deployment 2 runs, then set a concurrency limit of 1 on that tag.
1) Create the additional concurrency limit
prefect concurrency-limit create dep2-serial 1
2) Conditionally add the tag only for Deployment 2 using the tags context
This keeps the task’s original tag A and adds dep2-serial
only for Deployment 2 runs:
from prefect import flow, task
from prefect.context import tags as tag_context
from prefect.runtime import deployment as runtime_deployment
@task(tags=["A"])
def X(i):
...
@flow
def Z(items):
if runtime_deployment.name == "Deployment 2":
# Only Deployment 2 runs will add the extra tag
with tag_context("dep2-serial"):
X.map(items)
else:
X.map(items)
Notes:
- Each mapped task run will carry both tags ["A", "dep2-serial"]
for Deployment 2 runs, so it must acquire both limits. Effective concurrency becomes 1 for those runs (min of 8 and 1).
- Other deployments will only have tag A and keep the 8 concurrency.
Alternative: Parameter-driven toggle (avoids relying on the deployment name)
Set a flow parameter in Deployment 2 (e.g., serialize_x=True
), then:
from prefect import flow, task
from prefect.context import tags as tag_context
@task(tags=["A"])
def X(i):
...
@flow
def Z(items, serialize_x: bool = False):
if serialize_x:
with tag_context("dep2-serial"):
X.map(items)
else:
X.map(items)
Why not .with_options(tags=[...])
here?
- .with_options(tags=...)
replaces the task’s tags. If you use it, include both A and dep2-serial:
task_for_dep2 = X.with_options(tags=["A", "dep2-serial"])
task_for_dep2.map(items)
- The tags context is simpler when you want to add a tag without losing existing ones.
References
- Tag/global concurrency limits and examples using `with tags(...)`: Global concurrency limits
- Runtime values (for reading deployment info): RuntimeÖmerFİ
09/29/2025, 1:26 PMMarvin
09/29/2025, 1:26 PMMarvin
09/29/2025, 1:27 PMprefect.runtime.flow_run.tags
(these include deployment tags).
- To actually limit task execution, acquire the slot inside the task (or add a task tag). Wrapping X.map(...)
at the flow level won’t constrain the tasks themselves.
Example (sync tasks) — conditional manual slot acquisition inside the task:
from prefect import flow, task, unmapped
from prefect.runtime import flow_run as rt_flow_run
from prefect.concurrency.sync import concurrency
def work(i):
# your business logic
return i
@task
def X(i, serialize: bool = False):
if serialize:
with concurrency("dep2-serial"):
return work(i)
return work(i)
@flow
def Z(items):
# True if this flow run (and therefore its deployment) has the tag
dep2_serial = "dep2-serial" in rt_flow_run.tags
return X.map(items, serialize=unmapped(dep2_serial))
If your task is async, use the asyncio version:
from prefect.concurrency.asyncio import concurrency
@task
async def X(i, serialize: bool = False):
if serialize:
async with concurrency("dep2-serial"):
return await do_work(i)
return await do_work(i)
Alternative (no manual context): inject a tag onto the task runs when the deployment tag is present, so the tag-based limit applies automatically:
from prefect.context import tags as tag_context
@flow
def Z(items):
if "dep2-serial" in rt_flow_run.tags:
with tag_context("dep2-serial"):
return X.map(items) # each mapped task gets the tag
return X.map(items)
Make sure the concurrency limit exists:
prefect concurrency-limit create dep2-serial 1
Docs:
- Global/tag-based concurrency: Global concurrency limits
- Runtime (reading flow/task context): Runtime
Want me to adapt this to your exact flow/task signatures or deployment naming?ÖmerFİ
09/29/2025, 8:07 PMMarvin
09/29/2025, 8:07 PMMarvin
09/29/2025, 8:08 PMfrom prefect import flow, task
from prefect.context import tags as tag_context
from prefect.runtime import flow_run as rt_flow_run
@task
def X(i):
...
@flow
def Z(items):
if "dep2-serial" in rt_flow_run.tags:
# One place: all tasks created in this block get the tag
with tag_context("dep2-serial"):
return X.map(items)
else:
return X.map(items)
- Then create the concurrency limit once:
prefect concurrency-limit create dep2-serial 1
- Result: Tasks in deployments tagged dep2-serial are limited to 1; others (with only tag A) can run up to 8.
Alternative “one place” that doesn’t use tags at all
- If your goal is simply “no parallel tasks within this flow run,” set the task runner to a single worker for that flow:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def Z(items):
...
- This serializes tasks for all deployments of this flow. Use this if you don’t need per-deployment differences.
Key docs
- Global/tag-based concurrency: Global concurrency limits
- Runtime (reading flow/deployment tags at run-time): Runtime
If you want this to also cover subflows or only certain tasks, I can tailor the pattern further.