Hi <@ULVA73B9P>, I am trying to trigger a flow run...
# ask-marvin
g
Hi @Marvin, I am trying to trigger a flow run based on the successful completion of another flow run. What are my options?
m
thought for 194.0 seconds
Short answer: you’ve got a few good paths, depending on whether you want no-code, orchestration inside code, or programmatic control. Assuming Prefect 3.x: Options 1) Automations (UI, no-code) - Create an Automation: Trigger = “Flow run entered Completed” filtered to your upstream flow (by name, tag, or deployment). Action = “Run a deployment” for the downstream flow. - You can pass parameters to the downstream deployment, even templated from the event payload. - Easiest if you want “run B after A succeeds” without changing code. - Docs: Automations 2) Subflows (code-only, tightly coupled) - Call one flow from another; the called flow runs as a subflow and is tracked in the UI. - Put the call at the end of Flow A to ensure it only runs if A succeeds.
Copy code
from prefect import flow

@flow
def flow_b(x: int) -> int:
    return x * 2

@flow
def flow_a():
    # do your work...
    result = flow_b(21)  # runs as a subflow
    return result
- Docs: Subflows 3) Programmatically run a deployment from Flow A (loosely coupled, still in code) - Use the Prefect client to kick off Flow B’s deployment when A finishes.
Copy code
from prefect import flow
from prefect.client import PrefectClient

@flow
async def flow_a():
    # ...do work in A...
    async with PrefectClient() as client:
        dep = await client.read_deployment_by_name("flow-b/prod")  # "flow-name/deployment-name"
        fr = await client.create_flow_run_from_deployment(
            deployment_id=dep.id,
            parameters={"x": 42},
        )
        # Optionally poll or just return the new flow run id
        return fr.id
- API ref: PrefectClient 4) External trigger (if A lives elsewhere or you want integrations) - Use Automations on the “A completed” event to run B (as in #1), or call the REST API/Webhook to create a flow run for B from any external system. Which do you prefer? - Cloud UI no-code vs. code - Do you need A to synchronously wait for B, or just fire-and-forget? - Are A and B in the same codebase (subflows are simpler), or separate with deployments? If you share those constraints, I’ll sketch the exact setup.
g
@Marvin thanks. I would like to use a programmatic approach, and I would also need to make it specific to flow runs
m
thought for 300.3 seconds
Great — if you want a code-first approach that targets specific flow runs, the most flexible pattern in Prefect 3.x is to use flow state hooks. Hooks run inside the flow’s process whenever the run’s state changes, so you can send notifications, create artifacts, call webhooks, etc., with full access to the current flow run. Here are two lightweight, programmatic options: 1) Pass a simple hook function to the flow - Works well when you just need one or two actions on completion/failure/running.
Copy code
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
from prefect.artifacts import create_markdown_artifact

# Runs on any terminal state; you can also set on_failure / on_running / on_crashed, etc.
async def post_run_updates(flow, flow_run, state):
    # Attach a summary as a flow-run artifact (visible in the UI under Artifacts)
    create_markdown_artifact(
        markdown=f"# Run summary\n- Name: {flow_run.name}\n- State: {state.type}\n- Tags: {flow_run.tags}",
        key=f"summary-{flow_run.id}",
        description="Per-run summary"
    )

    # Notify on failures (Slack block must be created beforehand in the UI or via code)
    if state.is_failed():
        slack = SlackWebhook.load("slack-alerts")
        slack.notify(f"Flow '{flow.name}' run '{flow_run.name}' failed. Run ID: {flow_run.id}")

@flow(
    name="example-flow",
    on_completion=post_run_updates,   # or on_failure=post_run_updates for failures only
    # on_failure=post_run_updates,
)
def example_flow(x: int = 1):
    return x + 1

if __name__ == "__main__":
    example_flow.with_options(tags={"prod"}).submit()
2) Use a reusable FlowStateHook class - Useful when you want to bundle complex logic or reuse across flows.
Copy code
from prefect import flow
from prefect.flows import FlowStateHook
from prefect.blocks.notifications import SlackWebhook

class SlackOnFailure(FlowStateHook):
    def __init__(self, block_name: str):
        self.block_name = block_name

    async def __call__(self, flow, flow_run, state):
        if state.is_failed():
            slack = SlackWebhook.load(self.block_name)
            slack.notify(f"[{flow_run.name}] failed in flow '{flow.name}'. Run ID: {flow_run.id}")

@flow(
    name="example-flow-with-hook",
    on_failure=SlackOnFailure("slack-alerts"),
)
def example_flow_with_hook(x: int = 1):
    return x + 1
Notes and tips - Flow-run specificity: In both approaches you get
flow
,
flow_run
, and
state
, so you can branch by tags, parameters, or anything on the run object. If you need even more context in tasks,
prefect.context.get_run_context()
exposes the current flow run inside the run. - Artifacts: Use artifacts to attach structured, per-run output (links, markdown, tables, progress) that shows up on the flow-run page. - Tags: Add tags per run with
flow.with_options(tags={...})
or inside the flow using
from prefect import tags
and
with tags("prod"):
. Related references for deeper dives - Flow state hooks and Flow API (source): prefect.flows source - Artifacts API (source): prefect.artifacts - Notification blocks: prefect.blocks.notifications (Slack, Teams, PagerDuty, etc.) Quick question so I can tailor this further: what per–flow-run behavior do you want to implement programmatically? Examples: - Send Slack/Teams/email alerts on specific states - Attach run artifacts (tables, markdown, links) - Call an external webhook - Pause/resume or change state based on conditions - Something else? If you share the exact behavior and any filters (e.g., only when tag=prod, or only for a specific flow), I’ll provide a drop-in snippet.
g
@Marvin I would like to trigger a flow run when a it's potentially multiple parent flow runs have successfully terminated. Can you provide me with an example?
m
thought for 312.0 seconds
Absolutely — there are two common programmatic patterns for “run child only after multiple parents succeed” in Prefect 3.x: A) Orchestrate parents and then join - A controller flow kicks off the parent deployments, waits for them to finish, and then triggers the child.
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow
def orchestrate_and_join(
    parent_deployments: list[str] = ["parent-a/main", "parent-b/main"],
    child_deployment: str = "child-flow/main",
    shared_parameters: dict | None = None,
):
    # 1) Kick off all parent runs
    parent_run_ids = []
    for dep_name in parent_deployments:
        parent_fr = run_deployment(dep_name, parameters=shared_parameters, as_subflow=False)
        parent_run_ids.append(parent_fr.id)

    # 2) Wait for each to finish and ensure success
    for run_id in parent_run_ids:
        fr = wait_for_flow_run(run_id, log_states=True)
        if not fr.state.is_completed():
            raise RuntimeError(f"Parent run {run_id} did not complete successfully (state={fr.state.type}).")

    # 3) All successful, trigger the child
    child_fr = run_deployment(child_deployment, parameters=shared_parameters, as_subflow=False)
    return {"child_run_id": child_fr.id, "parent_run_ids": parent_run_ids}
- wait_for_flow_run docs (source): prefect.flow_runs - run_deployment docs (source): prefect.deployments B) Join-only flow for existing runs - If parents are launched elsewhere (e.g., on schedules) and you just need a “join” step when certain run IDs are done.
Copy code
from prefect import flow
from prefect.flow_runs import wait_for_flow_run
from prefect.deployments import run_deployment

@flow
def trigger_when_parents_done(
    parent_run_ids: list[str],
    child_deployment: str,
    child_parameters: dict | None = None,
):
    for run_id in parent_run_ids:
        fr = wait_for_flow_run(run_id, log_states=True)
        if not fr.state.is_completed():
            raise RuntimeError(f"Parent run {run_id} finished in state {fr.state.type}; aborting downstream trigger.")

    child_fr = run_deployment(child_deployment, parameters=child_parameters, as_subflow=False)
    return child_fr.id
Which version fits your setup better? - Do you want this controller to also launch the parents (A), or just wait on known parent run IDs (B)? - How do you identify the parents (explicit run IDs, deployment names, tags)? If you share that, I can tailor the example (e.g., query for “latest successful run of flows A, B” before triggering the child).