ASHIK
09/26/2025, 9:59 AMMarvin
09/26/2025, 9:59 AMMarvin
09/26/2025, 10:04 AMprefect.context.tags
context manager. Any runs created inside the context will include those tags.
- From a deployment: set tags=[...]
on the deployment so all runs from that deployment inherit them.
Key nuance for subflows:
- To tag the subflow run itself, wrap the subflow call in a with tags(...):
block in the parent flow. Putting with tags
inside the subflow function will tag the tasks inside the subflow, but not the subflow run if the tag is set after the run is created.
Examples
1) Tag the parent flow and a subflow run
from prefect import flow, get_run_logger, tags, runtime
@flow
def child_flow(x: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Child flow tags: {runtime.flow_run.tags}")
return x + 1
@flow
def parent_flow():
# Tags applied to this parent flow’s run and anything created within
with tags("team:ml", "env:prod"):
result = child_flow(1) # subflow inherits these tags
# Tag a specific subflow differently
with tags("subflow:payments", "priority:high"):
child_flow(2)
if __name__ == "__main__":
parent_flow()
2) Tag tasks inside a subflow (but NOT the subflow run itself)
from prefect import flow, task, tags
@task
def do_work():
...
@flow
def child_flow():
# Tags here will apply to tasks created inside this block
with tags("inside-subflow"):
do_work()
3) Set tags on a deployment (all runs inherit these)
from prefect import flow
@flow
def my_flow():
...
# Deploy with default tags
my_flow.from_source(
"<https://github.com/owner/repo>",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="prod",
work_pool_name="my-pool",
tags=["env:prod", "team:ml"]
)
Reading tags at runtime (for logging/logic)
from prefect import runtime
current_tags = runtime.flow_run.tags # a set[str]
Useful links
- Tags context manager: prefect.context.tags (source)
- Flow runtime fields (including tags): prefect.runtime.flow_run
- Deployments overview: Deployments in Prefect 3
- Concurrency by tag (why tags matter): Concurrency limits by tag API
If you share how you’re triggering subflows (direct calls vs. via run_deployment
) and whether you need tags for filtering, concurrency limits, or UI organization, I can tailor the exact pattern.ASHIK
09/26/2025, 10:05 AMASHIK
09/26/2025, 10:05 AMMarvin
09/26/2025, 10:05 AMMarvin
09/26/2025, 10:09 AMchild_flow()
) or triggering it via a deployment (run_deployment("child-flow/prod")
)? The best approach depends on that.
Options
1) If your subflow is its own deployment: set a per-deployment concurrency limit
- This caps the number of concurrent runs for that deployment.
from prefect import flow
@flow
def child_flow(...):
...
child_flow.from_source(
"<https://github.com/owner/repo>",
entrypoint="flows/child.py:child_flow",
).deploy(
name="child-prod",
work_pool_name="my-pool",
concurrency_limit=2, # limit concurrent child_flow runs from this deployment
)
- Docs: Deployments in Prefect 3
2) If you call the subflow directly: use a Global Concurrency Limit (GCL) inside the subflow
- Create a global limit once:
prefect global-concurrency-limit create child-subflow --limit 2
- Acquire it inside the subflow body:
from prefect import flow
from prefect.concurrency.sync import concurrency
@flow
def child_flow(...):
with concurrency("child-subflow"):
... # your subflow work
- Any concurrent runs of child_flow
will wait until a slot is free.
- Docs: Global concurrency limits
3) If you trigger the subflow via run_deployment: the deployment’s concurrency applies
- Trigger child as a deployment so its concurrency_limit
(option 1) governs it:
from prefect.deployments.flow_runs import run_deployment
@flow
def parent_flow():
run_deployment(
name="child-flow/child-prod",
parameters={"x": 1},
)
- API ref: run_deployment
4) Pool-level cap (broad control)
- Limit total concurrent jobs for a work pool (affects any runs using that pool):
prefect work-pool set-concurrency-limit "my-pool" 5
- Docs: Work pools
Notes
- Tag-based concurrency limits are task-level (not flow-run level). You can use them to throttle tasks inside a subflow but they won’t cap the number of subflow runs:
prefect concurrency-limit create my-tag --limit 3
from prefect import task, tags
@task
def heavy():
...
with tags("my-tag"):
heavy.submit()
- Docs: Tag-based concurrency limits
If you can share how you’re invoking the subflow (direct call vs run_deployment
) and what limit you want (e.g., “only 2 child runs at a time”), I’ll tailor the exact snippet.