Georg Raiser
08/19/2025, 1:12 PMMarvin
08/19/2025, 1:13 PMMarvin
08/19/2025, 1:16 PMfrom prefect import flow
@flow
def flow_b(x: int) -> int:
return x * 2
@flow
def flow_a():
# do your work...
result = flow_b(21) # runs as a subflow
return result
- Docs: Subflows
3) Programmatically run a deployment from Flow A (loosely coupled, still in code)
- Use the Prefect client to kick off Flow B’s deployment when A finishes.
from prefect import flow
from prefect.client import PrefectClient
@flow
async def flow_a():
# ...do work in A...
async with PrefectClient() as client:
dep = await client.read_deployment_by_name("flow-b/prod") # "flow-name/deployment-name"
fr = await client.create_flow_run_from_deployment(
deployment_id=dep.id,
parameters={"x": 42},
)
# Optionally poll or just return the new flow run id
return fr.id
- API ref: PrefectClient
4) External trigger (if A lives elsewhere or you want integrations)
- Use Automations on the “A completed” event to run B (as in #1), or call the REST API/Webhook to create a flow run for B from any external system.
Which do you prefer?
- Cloud UI no-code vs. code
- Do you need A to synchronously wait for B, or just fire-and-forget?
- Are A and B in the same codebase (subflows are simpler), or separate with deployments?
If you share those constraints, I’ll sketch the exact setup.Georg Raiser
08/29/2025, 2:22 PMMarvin
08/29/2025, 2:22 PMMarvin
08/29/2025, 2:27 PMfrom prefect import flow
from prefect.blocks.notifications import SlackWebhook
from prefect.artifacts import create_markdown_artifact
# Runs on any terminal state; you can also set on_failure / on_running / on_crashed, etc.
async def post_run_updates(flow, flow_run, state):
# Attach a summary as a flow-run artifact (visible in the UI under Artifacts)
create_markdown_artifact(
markdown=f"# Run summary\n- Name: {flow_run.name}\n- State: {state.type}\n- Tags: {flow_run.tags}",
key=f"summary-{flow_run.id}",
description="Per-run summary"
)
# Notify on failures (Slack block must be created beforehand in the UI or via code)
if state.is_failed():
slack = SlackWebhook.load("slack-alerts")
slack.notify(f"Flow '{flow.name}' run '{flow_run.name}' failed. Run ID: {flow_run.id}")
@flow(
name="example-flow",
on_completion=post_run_updates, # or on_failure=post_run_updates for failures only
# on_failure=post_run_updates,
)
def example_flow(x: int = 1):
return x + 1
if __name__ == "__main__":
example_flow.with_options(tags={"prod"}).submit()
2) Use a reusable FlowStateHook class
- Useful when you want to bundle complex logic or reuse across flows.
from prefect import flow
from prefect.flows import FlowStateHook
from prefect.blocks.notifications import SlackWebhook
class SlackOnFailure(FlowStateHook):
def __init__(self, block_name: str):
self.block_name = block_name
async def __call__(self, flow, flow_run, state):
if state.is_failed():
slack = SlackWebhook.load(self.block_name)
slack.notify(f"[{flow_run.name}] failed in flow '{flow.name}'. Run ID: {flow_run.id}")
@flow(
name="example-flow-with-hook",
on_failure=SlackOnFailure("slack-alerts"),
)
def example_flow_with_hook(x: int = 1):
return x + 1
Notes and tips
- Flow-run specificity: In both approaches you get flow
, flow_run
, and state
, so you can branch by tags, parameters, or anything on the run object. If you need even more context in tasks, prefect.context.get_run_context()
exposes the current flow run inside the run.
- Artifacts: Use artifacts to attach structured, per-run output (links, markdown, tables, progress) that shows up on the flow-run page.
- Tags: Add tags per run with flow.with_options(tags={...})
or inside the flow using from prefect import tags
and with tags("prod"):
.
Related references for deeper dives
- Flow state hooks and Flow API (source): prefect.flows source
- Artifacts API (source): prefect.artifacts
- Notification blocks: prefect.blocks.notifications (Slack, Teams, PagerDuty, etc.)
Quick question so I can tailor this further: what per–flow-run behavior do you want to implement programmatically? Examples:
- Send Slack/Teams/email alerts on specific states
- Attach run artifacts (tables, markdown, links)
- Call an external webhook
- Pause/resume or change state based on conditions
- Something else?
If you share the exact behavior and any filters (e.g., only when tag=prod, or only for a specific flow), I’ll provide a drop-in snippet.Georg Raiser
08/29/2025, 2:31 PMMarvin
08/29/2025, 2:32 PMMarvin
08/29/2025, 2:38 PMfrom prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
def orchestrate_and_join(
parent_deployments: list[str] = ["parent-a/main", "parent-b/main"],
child_deployment: str = "child-flow/main",
shared_parameters: dict | None = None,
):
# 1) Kick off all parent runs
parent_run_ids = []
for dep_name in parent_deployments:
parent_fr = run_deployment(dep_name, parameters=shared_parameters, as_subflow=False)
parent_run_ids.append(parent_fr.id)
# 2) Wait for each to finish and ensure success
for run_id in parent_run_ids:
fr = wait_for_flow_run(run_id, log_states=True)
if not fr.state.is_completed():
raise RuntimeError(f"Parent run {run_id} did not complete successfully (state={fr.state.type}).")
# 3) All successful, trigger the child
child_fr = run_deployment(child_deployment, parameters=shared_parameters, as_subflow=False)
return {"child_run_id": child_fr.id, "parent_run_ids": parent_run_ids}
- wait_for_flow_run docs (source): prefect.flow_runs
- run_deployment docs (source): prefect.deployments
B) Join-only flow for existing runs
- If parents are launched elsewhere (e.g., on schedules) and you just need a “join” step when certain run IDs are done.
from prefect import flow
from prefect.flow_runs import wait_for_flow_run
from prefect.deployments import run_deployment
@flow
def trigger_when_parents_done(
parent_run_ids: list[str],
child_deployment: str,
child_parameters: dict | None = None,
):
for run_id in parent_run_ids:
fr = wait_for_flow_run(run_id, log_states=True)
if not fr.state.is_completed():
raise RuntimeError(f"Parent run {run_id} finished in state {fr.state.type}; aborting downstream trigger.")
child_fr = run_deployment(child_deployment, parameters=child_parameters, as_subflow=False)
return child_fr.id
Which version fits your setup better?
- Do you want this controller to also launch the parents (A), or just wait on known parent run IDs (B)?
- How do you identify the parents (explicit run IDs, deployment names, tags)? If you share that, I can tailor the example (e.g., query for “latest successful run of flows A, B” before triggering the child).