Fredrik Hoem Grelland
09/29/2025, 6:45 AMMarvin
09/29/2025, 6:45 AMMarvin
09/29/2025, 6:48 AMprefect.concurrency.*.concurrency(...)
- Docs: Global concurrency limits
- Tag-based concurrency limits (task-level, managed by tags)
- Tag tasks with e.g. resource:my-db
and set the tag limit to 1
- Applies only to tasks with the tag
- Docs: Tag-based concurrency limits
Quick setup: global concurrency limits
1) Create the limit (CLI)
prefect global-concurrency-limit create resource:my-db --limit 1
2) Use it in code at the point of resource use (wrap the critical section)
- Sync code
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def write_to_db(resource_name: str):
with concurrency(f"resource:{resource_name}", occupy=1, timeout_seconds=600):
# Only one run across the workspace can enter here at a time
... # do your resource-specific work
@flow
def my_flow(resource_name: str):
write_to_db(resource_name)
- Async code
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def write_to_db_async(resource_name: str):
async with concurrency(f"resource:{resource_name}", occupy=1, timeout_seconds=600):
... # do your resource-specific work
@flow
async def my_flow_async(resource_name: str):
await write_to_db_async(resource_name)
Notes and tips
- Workspace scope: global concurrency limits are per workspace, so this enforces exclusivity across all flows in that workspace.
- Thousands of resources: if you have many resource names, you can programmatically ensure the limit exists before use:
from prefect.client.orchestration import get_client
async def ensure_limit(resource_name: str):
async with get_client() as client:
# create if missing; ignore if it already exists
try:
await client.create_global_concurrency_limit(name=f"resource:{resource_name}", limit=1)
except Exception:
pass
- Waiting/timeout behavior: concurrency(...)
supports timeout_seconds
and max_retries
to control how long a run should wait for the slot before raising.
- Multiple resources: concurrency
accepts a list of names if you need to lock more than one resource at once:
with concurrency([f"resource:{a}", f"resource:{b}"]):
...
CLI references we verified
- prefect global-concurrency-limit --help
- prefect global-concurrency-limit create --help
- prefect concurrency-limit --help
(for tag-based limits)
Questions to tailor this for you
- Do you want to limit the entire flow, or just the tasks that actually touch the resource?
- Do your resource names come from parameters (dynamic), and should we auto‑create the limits on first use?
- Would you prefer a no-code approach via tags, or are code changes acceptable?Fredrik Hoem Grelland
09/29/2025, 7:13 AMMarvin
09/29/2025, 7:13 AMMarvin
09/29/2025, 7:14 AMf"resource:{resource_id}"
).
- Ensure a global concurrency limit with limit=1 exists for that name (create if missing).
- Acquire the concurrency slot for the entire task body so only one run across the workspace can execute it at a time.
Docs:
- Global concurrency limits: Global concurrency limits
- Concurrency context managers: Using in code
Reusable helpers
- Async
from prefect import flow, task
from prefect.client.orchestration import get_client
from prefect.concurrency.asyncio import concurrency
from prefect.exceptions import ObjectAlreadyExists
async def ensure_gcl(name: str, limit: int = 1):
async with get_client() as client:
try:
await client.create_global_concurrency_limit(name=name, limit=limit)
except ObjectAlreadyExists:
pass # already exists; good to go
@task
async def do_work_with_resource_async(resource_name: str, timeout_s: float = 600):
lock_name = f"resource:{resource_name}"
await ensure_gcl(lock_name, limit=1)
async with concurrency(lock_name, occupy=1, timeout_seconds=timeout_s):
# Entire task is now single-threaded across the workspace for this resource
... # your work here
@flow
async def my_flow_async(resource_name: str):
await do_work_with_resource_async(resource_name)
- Sync
from prefect import flow, task
from prefect.client.orchestration import get_client
from prefect.concurrency.sync import concurrency
from prefect.exceptions import ObjectAlreadyExists
def ensure_gcl_sync(name: str, limit: int = 1):
with get_client(sync_client=True) as client:
try:
client.create_global_concurrency_limit(name=name, limit=limit)
except ObjectAlreadyExists:
pass
@task
def do_work_with_resource(resource_name: str, timeout_s: float = 600):
lock_name = f"resource:{resource_name}"
ensure_gcl_sync(lock_name, limit=1)
with concurrency(lock_name, occupy=1, timeout_seconds=timeout_s):
# Entire task is now single-threaded across the workspace for this resource
... # your work here
@flow
def my_flow(resource_name: str):
do_work_with_resource(resource_name)
Dynamic names discovered at runtime
- If you discover the resource name inside the task (e.g., via an API), compute lock_name
and use the same pattern above.
- If you need to lock multiple resources at once (e.g., cross-resource operation), acquire all locks together. To minimize deadlocks, use a deterministic ordering:
from prefect.concurrency.sync import concurrency
def cross_resource_task(resource_names: list[str]):
# Ensure all GCLs exist
for rn in resource_names:
ensure_gcl_sync(f"resource:{rn}", limit=1)
# Acquire all at once in a stable order
lock_names = [f"resource:{rn}" for rn in sorted(resource_names)]
with concurrency(lock_names, occupy=1, timeout_seconds=600):
... # work that needs all resources
Why this meets your requirements
- Exactly one at a time across flows: enforced by the global concurrency limit per resource name at the workspace scope.
- Works for schedules, events, manual runs: orchestration will queue at the concurrency boundary.
- Code-managed end-to-end: limits are created on-demand in your code; no manual UI/CLI setup required.
- Entire task covered: you acquire the slot before any work and release on exit.
Optional tweaks
- Set timeout_seconds
and/or max_retries
on concurrency(...)
to control waiting behavior.Marvin
09/29/2025, 7:14 AMresource:{id}
) across all flows that touch the same resource.
If you share a quick sketch of how you compute the resource names (single vs multiple, sync vs async), I can tailor the wrapper to your exact pattern.Fredrik Hoem Grelland
09/29/2025, 7:17 AMMarvin
09/29/2025, 7:17 AMMarvin
09/29/2025, 7:18 AMfrom prefect import flow, task, tags
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import ObjectAlreadyExists
async def ensure_tag_limit(tag: str, limit: int = 1):
async with get_client() as client:
try:
await client.create_concurrency_limit(
ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
)
except ObjectAlreadyExists:
pass # already exists
@task
async def do_work(resource_name: str):
... # your task body
@flow
async def my_flow(resource_name: str):
tag = f"resource:{resource_name}"
await ensure_tag_limit(tag, 1)
with tags(tag):
await do_work(resource_name)
- Sync flow
from prefect import flow, task, tags
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import ObjectAlreadyExists
def ensure_tag_limit_sync(tag: str, limit: int = 1):
with get_client(sync_client=True) as client:
try:
client.create_concurrency_limit(
ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
)
except ObjectAlreadyExists:
pass
@task
def do_work(resource_name: str):
... # your task body
@flow
def my_flow(resource_name: str):
tag = f"resource:{resource_name}"
ensure_tag_limit_sync(tag, 1)
with tags(tag):
do_work(resource_name)
When tag-based is a great fit
- You only need to serialize a single task per resource across the workspace.
- You’re okay with “wait until free” semantics (no timeout).
- You don’t need to atomically lock multiple resources at once.
When to prefer global concurrency in code
- You need a timeout or retry policy while waiting (use concurrency(..., timeout_seconds=..., max_retries=...)
).
- You need to atomically lock multiple resources at once (pass a list of names to concurrency([...])
).
- You want per-run occupancy (e.g., occupy more than 1 slot).
Docs
- Tag-based concurrency limits: Tag-based concurrency limits
- Global concurrency limits (for comparison): Global concurrency limits
If you’ll ever need multi-resource atomic locking or time-bounded waits, go with global concurrency. Otherwise, tag-based limits as shown above will meet your “exactly one at a time” requirement for tasks, fully managed in code.Fredrik Hoem Grelland
09/29/2025, 7:47 AMMarvin
09/29/2025, 7:47 AMMarvin
09/29/2025, 7:50 AMprefect.client.schemas.actions
that represents the payload for the “v2” concurrency limits endpoints. It’s paired with client methods like PrefectClient.create_concurrency_limit_v2(...)
and returns a ConcurrencyLimitV2
.
- It exists alongside the simpler, user-facing options:
- Tag-based concurrency: use ConcurrencyLimitCreate
with PrefectClient.create_concurrency_limit(...)
or the CLI prefect concurrency-limit create
.
- Global concurrency: use PrefectClient.create_global_concurrency_limit(...)
or the CLI prefect global-concurrency-limit create
. This supports features like seeding active_slots
and slot_decay_per_second
.
When to use it
- Generally, don’t. It’s part of the newer/advanced API surface and not required for typical workflows. Prefer the documented helpers and CLI above, which are stable and simpler.
References
- Tag-based: Tag-based concurrency limits
- Global: Global concurrency limits
- REST API (v2) reference: Create concurrency limit v2
If you’re auto-managing tag limits in code, stick with `ConcurrencyLimitCreate`:
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import ObjectAlreadyExists
async def ensure_tag_limit(tag: str, limit: int = 1):
async with get_client() as client:
try:
await client.create_concurrency_limit(
ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
)
except ObjectAlreadyExists:
pass
And for code-managed global limits, use `create_global_concurrency_limit(...)`:
from prefect.client.orchestration import get_client
async def ensure_gcl(name: str, limit: int = 1):
async with get_client() as client:
try:
await client.create_global_concurrency_limit(name=name, limit=limit)
except Exception:
pass
Fredrik Hoem Grelland
09/29/2025, 7:52 AMMarvin
09/29/2025, 7:52 AMMarvin
09/29/2025, 7:54 AMPrefectClient.create_concurrency_limit
expects a ConcurrencyLimitCreate
model, not separate `tag`/`limit` parameters. Import the Pydantic model and pass it in.
- Duplicates: creating a tag limit that already exists typically returns a 409 Conflict from the API. The Python client raises PrefectHTTPStatusError
for that; it does not raise ObjectAlreadyExists
. The safe pattern is “read-or-create,” or catch the 409.
Working examples
- Async “ensure” helper (read first, then create; tolerate races)
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import PrefectHTTPStatusError
async def ensure_tag_limit(tag: str, limit: int = 1):
async with get_client() as client:
existing = await client.read_concurrency_limit_by_tag(tag)
if existing:
return existing
try:
return await client.create_concurrency_limit(
ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
)
except PrefectHTTPStatusError as exc:
# Another run may have created it concurrently
if getattr(exc, "status_code", None) == 409:
return await client.read_concurrency_limit_by_tag(tag)
raise
- Sync variant
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import PrefectHTTPStatusError
def ensure_tag_limit_sync(tag: str, limit: int = 1):
with get_client(sync_client=True) as client:
existing = client.read_concurrency_limit_by_tag(tag)
if existing:
return existing
try:
return client.create_concurrency_limit(
ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
)
except PrefectHTTPStatusError as exc:
if getattr(exc, "status_code", None) == 409:
return client.read_concurrency_limit_by_tag(tag)
raise
Then wrap your task invocation in a tag context:
from prefect import tags
with tags(f"resource:{resource_name}"):
do_work.submit(resource_name) # or await do_work(...)
Docs
- Tag-based limits: Tag-based concurrency limits
- REST reference: Create concurrency limit
If your method signature looks different (accepting parameters instead of a model), can you confirm your Prefect version? In 3.x, the client method takes a ConcurrencyLimitCreate
. In older versions, signatures may differ.Fredrik Hoem Grelland
09/29/2025, 8:32 AMMarvin
09/29/2025, 8:32 AMMarvin
09/29/2025, 8:33 AMwith tags(...)
block.
Quick checklist
- Place the tags context around the thing you want to tag:
- To tag a task run: wrap the task invocation inside the flow.
- To tag a flow run: wrap the flow invocation at the call site.
- Make sure you’re inside a running flow when tagging a task; using with tags(...)
at import time won’t affect later runs.
- Look in the right place in the UI:
- Flow run tags: in the flow run’s details page (Tags).
- Task run tags: click the task run in the flow run’s timeline; tags are shown in the task run details.
- Tags must be strings; pass them as separate args or a sequence of strings.
Minimal examples
- Tag a task run (recommended for tag-based concurrency)
from prefect import flow, task, tags
@task
def do_work():
return 1
@flow
def my_flow(resource_name: str):
with tags(f"resource:{resource_name}"):
do_work() # this task run will have the tag
- Tag a flow run (if you need to)
from prefect import flow, tags
@flow
def my_flow():
...
if __name__ == "__main__":
with tags("my-flow-tag"):
my_flow() # the flow run will have the tag
Common reasons tags don’t appear
- The with tags(...)
block is not around the actual task invocation.
- The block executes before a flow is running (e.g., at module import time).
- You’re checking the flow run UI for a tag that was only applied to the task run (or vice versa).
- The tag string is empty or not a string.
For tag-based concurrency to take effect, the tag must be on the task run. You can verify with:
- Create the limit: prefect concurrency-limit create resource:xyz 1
- Run the flow that wraps a task call with with tags("resource:xyz")
- In the flow run UI, click the task; confirm the tag is present. If another task with the same tag is already Running, the new one should sit in “AwaitingConcurrencySlot”.
If you’re still not seeing the tag, can you share a small code snippet of where you placed with tags(...)
and where you’re checking in the UI? Also confirm your Prefect version so I can match behavior exactly.Fredrik Hoem Grelland
09/29/2025, 9:41 AMMarvin
09/29/2025, 9:41 AMMarvin
09/29/2025, 9:42 AMwith tags(...)
context, so it gets the tag and occupies the 1 available slot.
- Inside that outer task, you start a second task (@materialize
), which is created while the tag context is still active, so it also gets the same tag.
- The inner task now waits for a slot that will never free until the outer task finishes… but the outer task is waiting for the inner task. Deadlock.
How to fix it
- Only tag the unit you want to serialize. Don’t tag both parent and child.
- Specifically, move the with tags(...)
around the call to the materialize task at the flow level, not inside the parent task.
Example: tag only the materialize task run
from prefect import flow, task, tags
@task
def parent_task(...):
# do non-resource work
...
@task # or @materialize
def materialize_resource(...):
# resource-consuming work
...
@flow
def my_flow(resource_name: str):
parent_task(...)
with tags(f"resource:{resource_name}"):
materialize_resource(...) # only this task run carries the tag
If you must keep the current structure
- Convert the “outer task” into a flow (or subflow) and call the @materialize
task inside it with the tag context as above.
- Or switch to global concurrency locks: acquire the lock once in the outer task and ensure the inner operation does not apply the same tag/lock.
- Example:
from prefect import task
from prefect.concurrency.sync import concurrency
@task
def outer(resource_name: str):
with concurrency(f"resource:{resource_name}", timeout_seconds=600):
# call a plain function (not a Prefect task) that does the materialization
do_materialization() # no tag, no extra lock here
- As a last resort, you could increase the tag limit to 2 to allow parent+child to run, but that breaks the “exactly one at a time” guarantee across flows.
Related docs
- Tag-based limits: Tag-based concurrency limits
- Global locks: Global concurrency limits
If you can share a small snippet of your current structure (outer task + @materialize
call), I can show the minimal change to avoid the deadlock.Fredrik Hoem Grelland
09/29/2025, 10:37 AMMarvin
09/29/2025, 10:37 AMMarvin
09/29/2025, 10:39 AMwith tags(...)
is additive and scoped. There’s no public API to “remove” or subtract a tag from the current context; the only way to drop it is to exit the context.
Recommended fixes
- Only wrap the specific task you want to serialize
- Put with tags("resource:...")
around the materialize task call (in the flow), not around a parent task that calls it.
- Or switch to global concurrency locks for the inner section instead of tags, which avoids this parent/child deadlock entirely:
from prefect.concurrency.sync import concurrency
@task
def parent(resource):
with concurrency(f"resource:{resource}", timeout_seconds=600):
do_materialization() # plain function or a task without the tag
If you absolutely must clear tags in a nested block
- There isn’t a documented “clear tags” context. You could reach for internal constructs (e.g., prefect.context.TagsContext
) to replace the tag set, but that’s not a stable public API and may break with upgrades. I don’t recommend it.
Why this matters
- With tag limits of 1, tagging both the outer task and the inner @materialize
task deadlocks: the outer task holds the only slot while the inner waits for it.
Docs
- Tag-based concurrency limits: Tag-based concurrency limits
- Global concurrency limits: Global concurrency limits
If you share a small snippet of your current structure, I can show the minimal refactor to ensure only the materialize task carries the tag.