Olve Heitmann
09/14/2025, 9:32 PMMarvin
09/14/2025, 9:32 PMMarvin
09/14/2025, 9:39 PMy:xw).
- Acquire that limit only when runs are executed by pools X or W (so other pools don’t consume it).
Example:
# 1) Create the shared global concurrency limit:
# (verify command structure with --help; this is supported)
prefect gcl create "y:xw" 5
# 2) In your task/flow code, acquire it only for X and W
import os
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def do_work():
# Suggestion: set USE_XW_LIMIT=1 in the base job template ENV for pools X and W
if os.getenv("USE_XW_LIMIT") == "1":
async with concurrency(names="y:xw"):
# Critical section limited across pools X and W
...
else:
# Runs in other pools proceed without consuming y:xw
...
This pattern leverages a small ENV toggle you add at the work-pool job template level so only those pools acquire the shared limit. It’s simple and avoids needing to discover pool name at runtime.
Option B — Dynamic task tags per pool, then use tag-based limits
If you prefer tag-based limits, you can dynamically add a pool-specific tag (e.g., y:x or y:w) at runtime, then create tag-based limits for those tags.
Example:
import os
from prefect import flow, task, tags
@task
def do_work():
...
@flow
def my_flow():
pool_tag = os.getenv("PREFECT_POOL_TAG") # e.g., set this per work-pool in the job template
with tags(f"y:{pool_tag}"):
do_work()
Then in the UI/CLI, create tag-based limits for y:x and/or y:w. This gives you per-pool limits. If you want a single shared limit across X and W, you could instead tag y:xw in both pools.
Notes and references
- Work pool concurrency (independent of tags):
- prefect work-pool set-concurrency-limit "my-pool" 10
- Work pools overview
- Work queue concurrency (per-queue):
- prefect work-queue set-concurrency-limit "my-queue" 10 --pool "my-pool"
- Global concurrency limits (shared, name-based; acquire in code):
- prefect gcl create NAME LIMIT
- Global concurrency limits
- Tag-based concurrency limits (applies to tagged runs/tasks globally):
- Tag-based concurrency limits
To your question “can I set task tags dynamically based on worker pool?” — yes, via:
- An env var set in the work pool’s base job template (recommended, simple), or
- Looking up the deployment/work pool at runtime and branching your logic. For example, you can query the deployment’s work pool name using the API inside your run, then conditionally acquire a limit or set tags. This is more complex and usually unnecessary if you can set an env var per pool.
If you share a bit more about whether you want a single shared limit for X+W or distinct limits per pool, and whether code changes vs UI-only config is preferable, I can tailor an exact snippet for your setup.