<@ULVA73B9P> how to impliment tag for flow subflow
# ask-marvin
a
@Marvin how to impliment tag for flow subflow
m
thought for 311.0 seconds
Are you looking to tag the parent flow, all subflows, or just specific subflows? And are you running from deployments or ad-hoc? Here’s how to add tags in Prefect 3.x (for both flows and subflows): - At runtime (recommended): use the
prefect.context.tags
context manager. Any runs created inside the context will include those tags. - From a deployment: set
tags=[...]
on the deployment so all runs from that deployment inherit them. Key nuance for subflows: - To tag the subflow run itself, wrap the subflow call in a
with tags(...):
block in the parent flow. Putting
with tags
inside the subflow function will tag the tasks inside the subflow, but not the subflow run if the tag is set after the run is created. Examples 1) Tag the parent flow and a subflow run
Copy code
from prefect import flow, get_run_logger, tags, runtime

@flow
def child_flow(x: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Child flow tags: {runtime.flow_run.tags}")
    return x + 1

@flow
def parent_flow():
    # Tags applied to this parent flow’s run and anything created within
    with tags("team:ml", "env:prod"):
        result = child_flow(1)  # subflow inherits these tags

    # Tag a specific subflow differently
    with tags("subflow:payments", "priority:high"):
        child_flow(2)

if __name__ == "__main__":
    parent_flow()
2) Tag tasks inside a subflow (but NOT the subflow run itself)
Copy code
from prefect import flow, task, tags

@task
def do_work():
    ...

@flow
def child_flow():
    # Tags here will apply to tasks created inside this block
    with tags("inside-subflow"):
        do_work()
3) Set tags on a deployment (all runs inherit these)
Copy code
from prefect import flow

@flow
def my_flow():
    ...

# Deploy with default tags
my_flow.from_source(
    "<https://github.com/owner/repo>",
    entrypoint="flows/my_flow.py:my_flow",
).deploy(
    name="prod",
    work_pool_name="my-pool",
    tags=["env:prod", "team:ml"]
)
Reading tags at runtime (for logging/logic)
Copy code
from prefect import runtime

current_tags = runtime.flow_run.tags  # a set[str]
Useful links - Tags context manager: prefect.context.tags (source) - Flow runtime fields (including tags): prefect.runtime.flow_run - Deployments overview: Deployments in Prefect 3 - Concurrency by tag (why tags matter): Concurrency limits by tag API If you share how you’re triggering subflows (direct calls vs. via
run_deployment
) and whether you need tags for filtering, concurrency limits, or UI organization, I can tailor the exact pattern.
a
how can i limit concurrency in subflow
@Marvin, how can i limit runs in subflow
m
thought for 224.2 seconds
Great question — there are a few ways to limit “subflow runs” in Prefect 3.x, depending on how you trigger the subflow. Quick clarifier: Are you calling the subflow directly as a Python function (
child_flow()
) or triggering it via a deployment (
run_deployment("child-flow/prod")
)? The best approach depends on that. Options 1) If your subflow is its own deployment: set a per-deployment concurrency limit - This caps the number of concurrent runs for that deployment.
Copy code
from prefect import flow

@flow
def child_flow(...):
    ...

child_flow.from_source(
    "<https://github.com/owner/repo>",
    entrypoint="flows/child.py:child_flow",
).deploy(
    name="child-prod",
    work_pool_name="my-pool",
    concurrency_limit=2,  # limit concurrent child_flow runs from this deployment
)
- Docs: Deployments in Prefect 3 2) If you call the subflow directly: use a Global Concurrency Limit (GCL) inside the subflow - Create a global limit once:
Copy code
prefect global-concurrency-limit create child-subflow --limit 2
- Acquire it inside the subflow body:
Copy code
from prefect import flow
from prefect.concurrency.sync import concurrency

@flow
def child_flow(...):
    with concurrency("child-subflow"):
        ...  # your subflow work
- Any concurrent runs of
child_flow
will wait until a slot is free. - Docs: Global concurrency limits 3) If you trigger the subflow via run_deployment: the deployment’s concurrency applies - Trigger child as a deployment so its
concurrency_limit
(option 1) governs it:
Copy code
from prefect.deployments.flow_runs import run_deployment

@flow
def parent_flow():
    run_deployment(
        name="child-flow/child-prod",
        parameters={"x": 1},
    )
- API ref: run_deployment 4) Pool-level cap (broad control) - Limit total concurrent jobs for a work pool (affects any runs using that pool):
Copy code
prefect work-pool set-concurrency-limit "my-pool" 5
- Docs: Work pools Notes - Tag-based concurrency limits are task-level (not flow-run level). You can use them to throttle tasks inside a subflow but they won’t cap the number of subflow runs:
Copy code
prefect concurrency-limit create my-tag --limit 3

from prefect import task, tags

@task
def heavy():
    ...

with tags("my-tag"):
    heavy.submit()
- Docs: Tag-based concurrency limits If you can share how you’re invoking the subflow (direct call vs
run_deployment
) and what limit you want (e.g., “only 2 child runs at a time”), I’ll tailor the exact snippet.