<@ULVA73B9P> should i specify a concurrency for a ...
# ask-marvin
ö
@Marvin should i specify a concurrency for a task (using tags) if i already specified a concurrency for deployment, work pool, or work queue? I want a deployment to run every flow and task in 1 concurrency, meaning no concurrency i think. My other deployments and tasks run in 8 concurrency
m
thought for 582.1 seconds
Short answer: No — you don’t need task tag-based concurrency if your goal is just “one run of this deployment at a time.” Use a deployment-level concurrency limit of 1. Only add tag-based limits if you need to serialize specific tasks across multiple flows/deployments. How Prefect 3.x limits combine (the most restrictive wins): - Work pool concurrency limit: caps how many flow runs can execute across the entire pool. - Work queue concurrency limit: caps runs started from that queue. - Deployment concurrency limit: caps concurrent flow runs for that deployment only. - Tag-based concurrency limits: cap tasks (and optionally flows) carrying that tag across your workspace, regardless of deployment/pool. Recommended setup for your case - Keep your pool/queue at 8 (so other deployments can still run with higher throughput). - For the one deployment you want serialized: - Set deployment concurrency_limit = 1 (so only one flow run of that deployment runs at a time). - If you also want tasks within that flow run to be strictly one-at-a-time, set the flow’s task runner to a single worker. Example to serialize tasks within a flow run
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def my_flow():
    ...
How to set the limits - Deployment-level: set
concurrency_limit: 1
when defining the deployment (via UI or in your deployment definition). The
RunnerDeployment
supports a
concurrency_limit
field. - Work pool limit (keep at 8 for your “normal” concurrency):
Copy code
prefect work-pool set-concurrency-limit "my-pool" 8
- Work queue limit (optional if you use queue-level caps):
Copy code
prefect work-queue set-concurrency-limit my-queue 8 --pool my-pool
When to use tag-based concurrency - Only if you need to limit a specific task (or group of tasks) across many flows/deployments — e.g., a shared resource that must be accessed by one task at a time. Then:
Copy code
prefect concurrency-limit create serialize-me 1
and tag those tasks:
Copy code
from prefect import task

@task(tags=["serialize-me"])
def critical_section(...):
    ...
Docs for reference - Work pools: Work pools - Task runners: Task runners - Global/tag-based concurrency: Global concurrency limits - Work pool CLI: Work pool CLI Quick check: Do you also need tasks within that deployment’s flow run to be serialized, or is “one flow run at a time” sufficient? I can tailor the exact config based on that.
ö
@Marvin i have this sort of structure there is task X, it has tags: A there is flow Z that runs task X, using X.map there is a concurrency limit of 8 for A there is Deployment 1, with concurrency_limit: 8 and there is Deployment 2, with concurrency_limit: 1 my question is, will task X for deployment 2 run with 1 concurrency?
m
thought for 54.1 seconds
Short answer: No. A deployment’s
concurrency_limit
only limits how many flow runs of that deployment can run at once. It does not limit task-level parallelism inside a single flow run. With your setup: - Tag A has a global limit of 8 - Deployment 1 has
concurrency_limit: 8
- Deployment 2 has
concurrency_limit: 1
A single run of Deployment 2 can still execute
X.map(...)
with up to 8 concurrent task runs, because of the tag A limit (and the flow’s task runner), even though only one flow run of Deployment 2 can be active at a time. If you want task X to run with 1 concurrency for Deployment 2 only, you have two good options: Option A: Add a deployment-specific tag with a limit of 1 - Create a new limit:
Copy code
prefect concurrency-limit create dep2-serial 1
- Add the tag
dep2-serial
to Deployment 2 (via UI or deployment definition). Task runs inherit flow-run tags, so
X
will have tags
["A", "dep2-serial"]
when run by Deployment 2. - Because a task must acquire slots for each tagged limit it has, the effective concurrency becomes 1 for Deployment 2, while remaining 8 for other deployments using only tag A. Option B: Make tasks run sequentially within the flow run - Set the flow’s task runner to a single worker:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def Z(...):
    ...
- Note: this applies to all deployments using that flow code, so Option A is usually better if you only want Deployment 2 serialized. References: - Global/tag-based concurrency: Global concurrency limits - Task runners: Task runners If you only want to serialize task X (and not other tasks in the same flow) for Deployment 2, say so and I can show how to tag just that task for the dep2 runs.
ö
@Marvin "Task runs inherit flow-run tags" are you sure? please show how to tag just that task for the dep2 runs.
m
thought for 149.9 seconds
Great catch — my earlier phrasing was imprecise. In Prefect 3.x, task runs do not automatically inherit deployment or flow-run tags. Tasks get tags from: - The task’s own
tags
(via decorator or
.with_options
) - Any active
prefect.context.tags(...)
context when the task is submitted How to tag just task X for Deployment 2 Goal: Keep tag A’s global limit at 8, but force X to run at 1 when invoked by Deployment 2. Do this by adding a second tag (e.g.,
dep2-serial
) only for Deployment 2 runs, then set a concurrency limit of 1 on that tag. 1) Create the additional concurrency limit
Copy code
prefect concurrency-limit create dep2-serial 1
2) Conditionally add the tag only for Deployment 2 using the tags context This keeps the task’s original tag A and adds
dep2-serial
only for Deployment 2 runs:
Copy code
from prefect import flow, task
from prefect.context import tags as tag_context
from prefect.runtime import deployment as runtime_deployment

@task(tags=["A"])
def X(i):
    ...

@flow
def Z(items):
    if runtime_deployment.name == "Deployment 2":
        # Only Deployment 2 runs will add the extra tag
        with tag_context("dep2-serial"):
            X.map(items)
    else:
        X.map(items)
Notes: - Each mapped task run will carry both tags
["A", "dep2-serial"]
for Deployment 2 runs, so it must acquire both limits. Effective concurrency becomes 1 for those runs (min of 8 and 1). - Other deployments will only have tag A and keep the 8 concurrency. Alternative: Parameter-driven toggle (avoids relying on the deployment name) Set a flow parameter in Deployment 2 (e.g.,
serialize_x=True
), then:
Copy code
from prefect import flow, task
from prefect.context import tags as tag_context

@task(tags=["A"])
def X(i):
    ...

@flow
def Z(items, serialize_x: bool = False):
    if serialize_x:
        with tag_context("dep2-serial"):
            X.map(items)
    else:
        X.map(items)
Why not
.with_options(tags=[...])
here? -
.with_options(tags=...)
replaces the task’s tags. If you use it, include both A and dep2-serial:
Copy code
task_for_dep2 = X.with_options(tags=["A", "dep2-serial"])
task_for_dep2.map(items)
- The tags context is simpler when you want to add a tag without losing existing ones. References - Tag/global concurrency limits and examples using `with tags(...)`: Global concurrency limits - Runtime values (for reading deployment info): Runtime
ö
@Marvin in flow code, i want to check if the running deployment has tag dep2-serial and apply concurrency manually
m
thought for 77.0 seconds
Yes — you can check the running flow/deployment’s tags in code, then enforce a concurrency slot manually. Key points: - Read flow-run tags via
prefect.runtime.flow_run.tags
(these include deployment tags). - To actually limit task execution, acquire the slot inside the task (or add a task tag). Wrapping
X.map(...)
at the flow level won’t constrain the tasks themselves. Example (sync tasks) — conditional manual slot acquisition inside the task:
Copy code
from prefect import flow, task, unmapped
from prefect.runtime import flow_run as rt_flow_run
from prefect.concurrency.sync import concurrency

def work(i):
    # your business logic
    return i

@task
def X(i, serialize: bool = False):
    if serialize:
        with concurrency("dep2-serial"):
            return work(i)
    return work(i)

@flow
def Z(items):
    # True if this flow run (and therefore its deployment) has the tag
    dep2_serial = "dep2-serial" in rt_flow_run.tags
    return X.map(items, serialize=unmapped(dep2_serial))
If your task is async, use the asyncio version:
Copy code
from prefect.concurrency.asyncio import concurrency

@task
async def X(i, serialize: bool = False):
    if serialize:
        async with concurrency("dep2-serial"):
            return await do_work(i)
    return await do_work(i)
Alternative (no manual context): inject a tag onto the task runs when the deployment tag is present, so the tag-based limit applies automatically:
Copy code
from prefect.context import tags as tag_context

@flow
def Z(items):
    if "dep2-serial" in rt_flow_run.tags:
        with tag_context("dep2-serial"):
            return X.map(items)  # each mapped task gets the tag
    return X.map(items)
Make sure the concurrency limit exists:
Copy code
prefect concurrency-limit create dep2-serial 1
Docs: - Global/tag-based concurrency: Global concurrency limits - Runtime (reading flow/task context): Runtime Want me to adapt this to your exact flow/task signatures or deployment naming?
ö
@Marvin are deployment and flow tags same thing? And, does deployment/flow tags apply to background tasks for concurrency? or should i specify tags for background tasks too I dont want to specify every task a tag in the app. I want just one place to define concurrency
m
thought for 57.2 seconds
Great questions — here’s the straight answer: - Are deployment and flow tags the same? - Deployments have tags. When a flow run is created from a deployment, those deployment tags are copied onto the flow run (they become flow-run tags). - There is no automatic propagation of those tags to task runs. - Do deployment/flow tags apply to tasks for concurrency? - Not automatically. Tag-based concurrency is enforced on the run being orchestrated. For tasks to be constrained by a tag, that tag must be present on the task run itself. - So if you only put a tag on the deployment (and thus the flow run), it won’t constrain task runs unless you add that tag to the tasks when they’re created. You want one place to define concurrency without tagging every task - Use a single concurrency tag (e.g., dep2-serial) and apply it to all task runs from one place in your flow code via the tags context. You can key this off the deployment’s tags so it only applies when that deployment is running. Pattern: apply a tag to all tasks for “dep2-serial” deployments
Copy code
from prefect import flow, task
from prefect.context import tags as tag_context
from prefect.runtime import flow_run as rt_flow_run

@task
def X(i):
    ...

@flow
def Z(items):
    if "dep2-serial" in rt_flow_run.tags:
        # One place: all tasks created in this block get the tag
        with tag_context("dep2-serial"):
            return X.map(items)
    else:
        return X.map(items)
- Then create the concurrency limit once:
Copy code
prefect concurrency-limit create dep2-serial 1
- Result: Tasks in deployments tagged dep2-serial are limited to 1; others (with only tag A) can run up to 8. Alternative “one place” that doesn’t use tags at all - If your goal is simply “no parallel tasks within this flow run,” set the task runner to a single worker for that flow:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def Z(items):
    ...
- This serializes tasks for all deployments of this flow. Use this if you don’t need per-deployment differences. Key docs - Global/tag-based concurrency: Global concurrency limits - Runtime (reading flow/deployment tags at run-time): Runtime If you want this to also cover subflows or only certain tasks, I can tailor the pattern further.