<@ULVA73B9P> I have a few hundred to a few thousan...
# ask-marvin
f
@Marvin I have a few hundred to a few thousand different resources that can start running based on schedule, events and manually. I need a mechanism to limit running a named resource to exactly one at a time across flows but within the same workspace.
m
thought for 207.1 seconds
You can do this in Prefect 3 with workspace‑scoped global concurrency limits keyed by your resource name. Set each resource’s limit to 1, then acquire that limit in code anywhere the resource is used. This guarantees exactly one run at a time across all flows in the same workspace. Two ways to set limits - Global concurrency limits (recommended for cross‑flow resource locks) - Create one limit per resource name (limit=1) - In code, wrap the critical section with
prefect.concurrency.*.concurrency(...)
- Docs: Global concurrency limits - Tag-based concurrency limits (task-level, managed by tags) - Tag tasks with e.g.
resource:my-db
and set the tag limit to 1 - Applies only to tasks with the tag - Docs: Tag-based concurrency limits Quick setup: global concurrency limits 1) Create the limit (CLI)
Copy code
prefect global-concurrency-limit create resource:my-db --limit 1
2) Use it in code at the point of resource use (wrap the critical section) - Sync code
Copy code
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def write_to_db(resource_name: str):
    with concurrency(f"resource:{resource_name}", occupy=1, timeout_seconds=600):
        # Only one run across the workspace can enter here at a time
        ...  # do your resource-specific work

@flow
def my_flow(resource_name: str):
    write_to_db(resource_name)
- Async code
Copy code
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency

@task
async def write_to_db_async(resource_name: str):
    async with concurrency(f"resource:{resource_name}", occupy=1, timeout_seconds=600):
        ...  # do your resource-specific work

@flow
async def my_flow_async(resource_name: str):
    await write_to_db_async(resource_name)
Notes and tips - Workspace scope: global concurrency limits are per workspace, so this enforces exclusivity across all flows in that workspace. - Thousands of resources: if you have many resource names, you can programmatically ensure the limit exists before use:
Copy code
from prefect.client.orchestration import get_client

async def ensure_limit(resource_name: str):
    async with get_client() as client:
        # create if missing; ignore if it already exists
        try:
            await client.create_global_concurrency_limit(name=f"resource:{resource_name}", limit=1)
        except Exception:
            pass
- Waiting/timeout behavior:
concurrency(...)
supports
timeout_seconds
and
max_retries
to control how long a run should wait for the slot before raising. - Multiple resources:
concurrency
accepts a list of names if you need to lock more than one resource at once:
Copy code
with concurrency([f"resource:{a}", f"resource:{b}"]):
    ...
CLI references we verified -
prefect global-concurrency-limit --help
-
prefect global-concurrency-limit create --help
-
prefect concurrency-limit --help
(for tag-based limits) Questions to tailor this for you - Do you want to limit the entire flow, or just the tasks that actually touch the resource? - Do your resource names come from parameters (dynamic), and should we auto‑create the limits on first use? - Would you prefer a no-code approach via tags, or are code changes acceptable?
f
@Marvin resources and names come from parameters to the flow, or are retrieved dynamically when running. the entire task should be covered by concurrency limits and all needs to be managed by code automatically
m
thought for 57.0 seconds
Great — since names are dynamic and you want code-only management that covers the entire task, use Prefect 3 global concurrency limits that are created on-demand, then acquired around the task body. Pattern - Derive a stable limit name from your resource (e.g.,
f"resource:{resource_id}"
). - Ensure a global concurrency limit with limit=1 exists for that name (create if missing). - Acquire the concurrency slot for the entire task body so only one run across the workspace can execute it at a time. Docs: - Global concurrency limits: Global concurrency limits - Concurrency context managers: Using in code Reusable helpers - Async
Copy code
from prefect import flow, task
from prefect.client.orchestration import get_client
from prefect.concurrency.asyncio import concurrency
from prefect.exceptions import ObjectAlreadyExists

async def ensure_gcl(name: str, limit: int = 1):
    async with get_client() as client:
        try:
            await client.create_global_concurrency_limit(name=name, limit=limit)
        except ObjectAlreadyExists:
            pass  # already exists; good to go

@task
async def do_work_with_resource_async(resource_name: str, timeout_s: float = 600):
    lock_name = f"resource:{resource_name}"
    await ensure_gcl(lock_name, limit=1)
    async with concurrency(lock_name, occupy=1, timeout_seconds=timeout_s):
        # Entire task is now single-threaded across the workspace for this resource
        ...  # your work here

@flow
async def my_flow_async(resource_name: str):
    await do_work_with_resource_async(resource_name)
- Sync
Copy code
from prefect import flow, task
from prefect.client.orchestration import get_client
from prefect.concurrency.sync import concurrency
from prefect.exceptions import ObjectAlreadyExists

def ensure_gcl_sync(name: str, limit: int = 1):
    with get_client(sync_client=True) as client:
        try:
            client.create_global_concurrency_limit(name=name, limit=limit)
        except ObjectAlreadyExists:
            pass

@task
def do_work_with_resource(resource_name: str, timeout_s: float = 600):
    lock_name = f"resource:{resource_name}"
    ensure_gcl_sync(lock_name, limit=1)
    with concurrency(lock_name, occupy=1, timeout_seconds=timeout_s):
        # Entire task is now single-threaded across the workspace for this resource
        ...  # your work here

@flow
def my_flow(resource_name: str):
    do_work_with_resource(resource_name)
Dynamic names discovered at runtime - If you discover the resource name inside the task (e.g., via an API), compute
lock_name
and use the same pattern above. - If you need to lock multiple resources at once (e.g., cross-resource operation), acquire all locks together. To minimize deadlocks, use a deterministic ordering:
Copy code
from prefect.concurrency.sync import concurrency

def cross_resource_task(resource_names: list[str]):
    # Ensure all GCLs exist
    for rn in resource_names:
        ensure_gcl_sync(f"resource:{rn}", limit=1)

    # Acquire all at once in a stable order
    lock_names = [f"resource:{rn}" for rn in sorted(resource_names)]
    with concurrency(lock_names, occupy=1, timeout_seconds=600):
        ...  # work that needs all resources
Why this meets your requirements - Exactly one at a time across flows: enforced by the global concurrency limit per resource name at the workspace scope. - Works for schedules, events, manual runs: orchestration will queue at the concurrency boundary. - Code-managed end-to-end: limits are created on-demand in your code; no manual UI/CLI setup required. - Entire task covered: you acquire the slot before any work and release on exit. Optional tweaks - Set
timeout_seconds
and/or
max_retries
on
concurrency(...)
to control waiting behavior.
- Use a consistent naming scheme (e.g.,
resource:{id}
) across all flows that touch the same resource. If you share a quick sketch of how you compute the resource names (single vs multiple, sync vs async), I can tailor the wrapper to your exact pattern.
f
@Marvin how about tag based. will that not work?
m
thought for 101.5 seconds
Yes, tag-based concurrency can work for your case—if you tag each task run with the resource name and ensure a tag limit of 1 exists for that tag. That will guarantee only one task with that tag is Running across the workspace at a time. Key points - Scope: tag limits apply to tasks (not flows). Since you want the entire task covered, that’s fine. - Dynamic names: you can add tags at runtime and create the tag limit on-demand in code. - Behavior: the task will sit in “AwaitingConcurrencySlot” until a slot is free; there’s no built-in timeout for this wait. Code-only pattern (auto-create the tag limit, then tag the task call) - Async flow
Copy code
from prefect import flow, task, tags
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import ObjectAlreadyExists

async def ensure_tag_limit(tag: str, limit: int = 1):
    async with get_client() as client:
        try:
            await client.create_concurrency_limit(
                ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
            )
        except ObjectAlreadyExists:
            pass  # already exists

@task
async def do_work(resource_name: str):
    ...  # your task body

@flow
async def my_flow(resource_name: str):
    tag = f"resource:{resource_name}"
    await ensure_tag_limit(tag, 1)
    with tags(tag):
        await do_work(resource_name)
- Sync flow
Copy code
from prefect import flow, task, tags
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import ObjectAlreadyExists

def ensure_tag_limit_sync(tag: str, limit: int = 1):
    with get_client(sync_client=True) as client:
        try:
            client.create_concurrency_limit(
                ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
            )
        except ObjectAlreadyExists:
            pass

@task
def do_work(resource_name: str):
    ...  # your task body

@flow
def my_flow(resource_name: str):
    tag = f"resource:{resource_name}"
    ensure_tag_limit_sync(tag, 1)
    with tags(tag):
        do_work(resource_name)
When tag-based is a great fit - You only need to serialize a single task per resource across the workspace. - You’re okay with “wait until free” semantics (no timeout). - You don’t need to atomically lock multiple resources at once. When to prefer global concurrency in code - You need a timeout or retry policy while waiting (use
concurrency(..., timeout_seconds=..., max_retries=...)
). - You need to atomically lock multiple resources at once (pass a list of names to
concurrency([...])
). - You want per-run occupancy (e.g., occupy more than 1 slot). Docs - Tag-based concurrency limits: Tag-based concurrency limits - Global concurrency limits (for comparison): Global concurrency limits If you’ll ever need multi-resource atomic locking or time-bounded waits, go with global concurrency. Otherwise, tag-based limits as shown above will meet your “exactly one at a time” requirement for tasks, fully managed in code.
f
@Marvin what is ConcurrencyLimitV2Create ?
m
thought for 202.5 seconds
Short answer: it’s a low-level, “v2” request schema used by Prefect’s API for creating concurrency limits. Most users don’t need to use it directly. Details - ConcurrencyLimitV2Create is a Pydantic model in
prefect.client.schemas.actions
that represents the payload for the “v2” concurrency limits endpoints. It’s paired with client methods like
PrefectClient.create_concurrency_limit_v2(...)
and returns a
ConcurrencyLimitV2
. - It exists alongside the simpler, user-facing options: - Tag-based concurrency: use
ConcurrencyLimitCreate
with
PrefectClient.create_concurrency_limit(...)
or the CLI
prefect concurrency-limit create
. - Global concurrency: use
PrefectClient.create_global_concurrency_limit(...)
or the CLI
prefect global-concurrency-limit create
. This supports features like seeding
active_slots
and
slot_decay_per_second
. When to use it - Generally, don’t. It’s part of the newer/advanced API surface and not required for typical workflows. Prefer the documented helpers and CLI above, which are stable and simpler. References - Tag-based: Tag-based concurrency limits - Global: Global concurrency limits - REST API (v2) reference: Create concurrency limit v2 If you’re auto-managing tag limits in code, stick with `ConcurrencyLimitCreate`:
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import ObjectAlreadyExists

async def ensure_tag_limit(tag: str, limit: int = 1):
    async with get_client() as client:
        try:
            await client.create_concurrency_limit(
                ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
            )
        except ObjectAlreadyExists:
            pass
And for code-managed global limits, use `create_global_concurrency_limit(...)`:
Copy code
from prefect.client.orchestration import get_client

async def ensure_gcl(name: str, limit: int = 1):
    async with get_client() as client:
        try:
            await client.create_global_concurrency_limit(name=name, limit=limit)
        except Exception:
            pass
f
@Marvin but this code: client.create_concurrency_limit( ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit) ) does not seem to be correct. It takes parameters, not an object. Also it will not raise the error?
m
thought for 123.0 seconds
Good callout. Here’s the accurate picture for Prefect 3.x: - Signature:
PrefectClient.create_concurrency_limit
expects a
ConcurrencyLimitCreate
model, not separate `tag`/`limit` parameters. Import the Pydantic model and pass it in. - Duplicates: creating a tag limit that already exists typically returns a 409 Conflict from the API. The Python client raises
PrefectHTTPStatusError
for that; it does not raise
ObjectAlreadyExists
. The safe pattern is “read-or-create,” or catch the 409. Working examples - Async “ensure” helper (read first, then create; tolerate races)
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import PrefectHTTPStatusError

async def ensure_tag_limit(tag: str, limit: int = 1):
    async with get_client() as client:
        existing = await client.read_concurrency_limit_by_tag(tag)
        if existing:
            return existing
        try:
            return await client.create_concurrency_limit(
                ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
            )
        except PrefectHTTPStatusError as exc:
            # Another run may have created it concurrently
            if getattr(exc, "status_code", None) == 409:
                return await client.read_concurrency_limit_by_tag(tag)
            raise
- Sync variant
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import ConcurrencyLimitCreate
from prefect.exceptions import PrefectHTTPStatusError

def ensure_tag_limit_sync(tag: str, limit: int = 1):
    with get_client(sync_client=True) as client:
        existing = client.read_concurrency_limit_by_tag(tag)
        if existing:
            return existing
        try:
            return client.create_concurrency_limit(
                ConcurrencyLimitCreate(tag=tag, concurrency_limit=limit)
            )
        except PrefectHTTPStatusError as exc:
            if getattr(exc, "status_code", None) == 409:
                return client.read_concurrency_limit_by_tag(tag)
            raise
Then wrap your task invocation in a tag context:
Copy code
from prefect import tags

with tags(f"resource:{resource_name}"):
    do_work.submit(resource_name)  # or await do_work(...)
Docs - Tag-based limits: Tag-based concurrency limits - REST reference: Create concurrency limit If your method signature looks different (accepting parameters instead of a model), can you confirm your Prefect version? In 3.x, the client method takes a
ConcurrencyLimitCreate
. In older versions, signatures may differ.
f
@Marvin I do not see the tag show up in the UI when using the with tags manager
m
thought for 53.9 seconds
It should work, but there are a few gotchas. Tags only apply to runs created while the context is active, and they show up on the run (flow or task) that was created inside the
with tags(...)
block. Quick checklist - Place the tags context around the thing you want to tag: - To tag a task run: wrap the task invocation inside the flow. - To tag a flow run: wrap the flow invocation at the call site. - Make sure you’re inside a running flow when tagging a task; using
with tags(...)
at import time won’t affect later runs. - Look in the right place in the UI: - Flow run tags: in the flow run’s details page (Tags). - Task run tags: click the task run in the flow run’s timeline; tags are shown in the task run details. - Tags must be strings; pass them as separate args or a sequence of strings. Minimal examples - Tag a task run (recommended for tag-based concurrency)
Copy code
from prefect import flow, task, tags

@task
def do_work():
    return 1

@flow
def my_flow(resource_name: str):
    with tags(f"resource:{resource_name}"):
        do_work()   # this task run will have the tag
- Tag a flow run (if you need to)
Copy code
from prefect import flow, tags

@flow
def my_flow():
    ...

if __name__ == "__main__":
    with tags("my-flow-tag"):
        my_flow()   # the flow run will have the tag
Common reasons tags don’t appear - The
with tags(...)
block is not around the actual task invocation. - The block executes before a flow is running (e.g., at module import time). - You’re checking the flow run UI for a tag that was only applied to the task run (or vice versa). - The tag string is empty or not a string. For tag-based concurrency to take effect, the tag must be on the task run. You can verify with: - Create the limit:
prefect concurrency-limit create resource:xyz 1
- Run the flow that wraps a task call with
with tags("resource:xyz")
- In the flow run UI, click the task; confirm the tag is present. If another task with the same tag is already Running, the new one should sit in “AwaitingConcurrencySlot”. If you’re still not seeing the tag, can you share a small code snippet of where you placed
with tags(...)
and where you’re checking in the UI? Also confirm your Prefect version so I can match behavior exactly.
f
@Marvin I have a @materialize task inside of my task with a slot size of 1 .. this task is never started because it is within a with tag() context manager.
m
thought for 50.9 seconds
You’ve likely run into a self-deadlock with tag-based concurrency. What’s happening - You set a tag limit of 1. - Your outer task is running inside a
with tags(...)
context, so it gets the tag and occupies the 1 available slot. - Inside that outer task, you start a second task (
@materialize
), which is created while the tag context is still active, so it also gets the same tag. - The inner task now waits for a slot that will never free until the outer task finishes… but the outer task is waiting for the inner task. Deadlock. How to fix it - Only tag the unit you want to serialize. Don’t tag both parent and child. - Specifically, move the
with tags(...)
around the call to the materialize task at the flow level, not inside the parent task. Example: tag only the materialize task run
Copy code
from prefect import flow, task, tags

@task
def parent_task(...):
    # do non-resource work
    ...

@task  # or @materialize
def materialize_resource(...):
    # resource-consuming work
    ...

@flow
def my_flow(resource_name: str):
    parent_task(...)
    with tags(f"resource:{resource_name}"):
        materialize_resource(...)  # only this task run carries the tag
If you must keep the current structure - Convert the “outer task” into a flow (or subflow) and call the
@materialize
task inside it with the tag context as above. - Or switch to global concurrency locks: acquire the lock once in the outer task and ensure the inner operation does not apply the same tag/lock. - Example:
Copy code
from prefect import task
  from prefect.concurrency.sync import concurrency

  @task
  def outer(resource_name: str):
      with concurrency(f"resource:{resource_name}", timeout_seconds=600):
          # call a plain function (not a Prefect task) that does the materialization
          do_materialization()  # no tag, no extra lock here
- As a last resort, you could increase the tag limit to 2 to allow parent+child to run, but that breaks the “exactly one at a time” guarantee across flows. Related docs - Tag-based limits: Tag-based concurrency limits - Global locks: Global concurrency limits If you can share a small snippet of your current structure (outer task +
@materialize
call), I can show the minimal change to avoid the deadlock.
f
@Marvin Is there a way to remove the tag from the context?
m
thought for 89.1 seconds
Short answer: not in a supported way. Prefect’s
with tags(...)
is additive and scoped. There’s no public API to “remove” or subtract a tag from the current context; the only way to drop it is to exit the context. Recommended fixes - Only wrap the specific task you want to serialize - Put
with tags("resource:...")
around the materialize task call (in the flow), not around a parent task that calls it. - Or switch to global concurrency locks for the inner section instead of tags, which avoids this parent/child deadlock entirely:
Copy code
from prefect.concurrency.sync import concurrency

@task
def parent(resource):
    with concurrency(f"resource:{resource}", timeout_seconds=600):
        do_materialization()  # plain function or a task without the tag
If you absolutely must clear tags in a nested block - There isn’t a documented “clear tags” context. You could reach for internal constructs (e.g.,
prefect.context.TagsContext
) to replace the tag set, but that’s not a stable public API and may break with upgrades. I don’t recommend it. Why this matters - With tag limits of 1, tagging both the outer task and the inner
@materialize
task deadlocks: the outer task holds the only slot while the inner waits for it. Docs - Tag-based concurrency limits: Tag-based concurrency limits - Global concurrency limits: Global concurrency limits If you share a small snippet of your current structure, I can show the minimal refactor to ensure only the materialize task carries the tag.