<@ULVA73B9P> Is it possible to create concurrency ...
# ask-marvin
o
@Marvin Is it possible to create concurrency rules based on the combination of task tagging and worker pool? Eg only Z amounts of concurrency for task tag Y across work pool X and W? Or, if not, any other way of achieving the same, such as setting task tags dynamically based on worker pool?
m
thought for 403.9 seconds
Short answer: there isn’t a built-in way to define a single concurrency rule that is scoped to “tag Y AND only work pools X and W.” In Prefect 3.x, limits are enforced independently: - Tag-based concurrency limits are global (they apply regardless of the work pool). - Work pool and work queue concurrency limits apply to all runs in that pool/queue, regardless of tags. That said, you can achieve your goal in a couple of practical ways: Option A — Use a single Global Concurrency Limit shared by pools X and W - Create one GCL name to represent the “Y across X and W” limiter (e.g.,
y:xw
). - Acquire that limit only when runs are executed by pools X or W (so other pools don’t consume it). Example:
Copy code
# 1) Create the shared global concurrency limit:
#    (verify command structure with --help; this is supported)
prefect gcl create "y:xw" 5

# 2) In your task/flow code, acquire it only for X and W
import os
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency

@task
async def do_work():
    # Suggestion: set USE_XW_LIMIT=1 in the base job template ENV for pools X and W
    if os.getenv("USE_XW_LIMIT") == "1":
        async with concurrency(names="y:xw"):
            # Critical section limited across pools X and W
            ...
    else:
        # Runs in other pools proceed without consuming y:xw
        ...
This pattern leverages a small ENV toggle you add at the work-pool job template level so only those pools acquire the shared limit. It’s simple and avoids needing to discover pool name at runtime. Option B — Dynamic task tags per pool, then use tag-based limits If you prefer tag-based limits, you can dynamically add a pool-specific tag (e.g.,
y:x
or
y:w
) at runtime, then create tag-based limits for those tags. Example:
Copy code
import os
from prefect import flow, task, tags

@task
def do_work():
    ...

@flow
def my_flow():
    pool_tag = os.getenv("PREFECT_POOL_TAG")  # e.g., set this per work-pool in the job template
    with tags(f"y:{pool_tag}"):
        do_work()
Then in the UI/CLI, create tag-based limits for
y:x
and/or
y:w
. This gives you per-pool limits. If you want a single shared limit across X and W, you could instead tag
y:xw
in both pools. Notes and references - Work pool concurrency (independent of tags): -
prefect work-pool set-concurrency-limit "my-pool" 10
- Work pools overview - Work queue concurrency (per-queue): -
prefect work-queue set-concurrency-limit "my-queue" 10 --pool "my-pool"
- Global concurrency limits (shared, name-based; acquire in code): -
prefect gcl create NAME LIMIT
- Global concurrency limits - Tag-based concurrency limits (applies to tagged runs/tasks globally): - Tag-based concurrency limits To your question “can I set task tags dynamically based on worker pool?” — yes, via: - An env var set in the work pool’s base job template (recommended, simple), or - Looking up the deployment/work pool at runtime and branching your logic. For example, you can query the deployment’s work pool name using the API inside your run, then conditionally acquire a limit or set tags. This is more complex and usually unnecessary if you can set an env var per pool. If you share a bit more about whether you want a single shared limit for X+W or distinct limits per pool, and whether code changes vs UI-only config is preferable, I can tailor an exact snippet for your setup.