Matthew McKay
10/03/2025, 11:59 PMMarvin
10/03/2025, 11:59 PMMarvin
10/04/2025, 12:03 AMfrom prefect import flow, get_run_logger
from prefect.flow_runs import wait_for_flow_run
@flow
def flow_b(upstream_run_id: str | None = None):
logger = get_run_logger()
# If we were given A's run id, wait for it to finish
if upstream_run_id:
fr = wait_for_flow_run(upstream_run_id, timeout=6 * 60 * 60) # 6h cap; adjust as needed
state_name = fr.state.name # "Completed", "Failed", etc.
<http://logger.info|logger.info>(f"Upstream Flow A finished with state: {state_name}")
if state_name != "Completed":
raise RuntimeError("Upstream Flow A did not complete successfully; aborting Flow B.")
# ...rest of B's work...
- How to pass `upstream_run_id`:
- If you use Prefect Cloud Automations, you can trigger/run B when A completes and include A’s flow_run_id
as a parameter to B. If A finishes before 2pm, set scheduled_time
for B to 2pm so B won’t start early. If A finishes after 2pm, schedule B “now”.
- Reference: Run deployments and Automations
- Alternate: If you don’t want to pass a run id, you can query for A’s “expected” run (e.g., the 1pm slot that corresponds to B’s 2pm slot) via the Prefect client, then use wait_for_flow_run
on that id.
Option 3: Let A’s success drive B (remove B’s cron)
- What it gives you:
- B only runs when A completes successfully.
- If A finishes early, schedule B for 2pm; if A finishes late, schedule B immediately; if A fails, don’t run B at all.
- How:
- Set an Automation on “A completed successfully” with an action to “RunDeployment” for B.
- If A completes before 2pm, set B’s scheduled_time
to 2pm; if after 2pm, omit scheduled_time
so B runs now.
- Docs: Automations and Run deployments
Recommendation
- If you want minimal changes and you’re okay with B being created at 2pm and waiting when A overruns: use Option 1 (global concurrency limit). Add an Automation to cancel B if A fails.
- If you want explicit “A must have completed successfully” logic in code: use Option 2 with the wait_for_flow_run
gate in B.Marvin
10/04/2025, 12:03 AMMatthew McKay
10/05/2025, 1:50 AMMarvin
10/05/2025, 1:50 AMMarvin
10/05/2025, 1:53 AMwait_for_flow_run
) will keep B running and consume compute.
- Use Prefect Cloud Automations to start B only after all upstream flows complete successfully. This is event-driven and does not hold compute.
- Retries: set them on each flow (and/or tasks) independently; B should trigger only on Completed upstreams so you don’t have to manually retry upstreams from B.
Recommended pattern (no compute while waiting)
- Create a single Cloud Automation that runs Deployment B when all upstreams complete:
1) Create an Automation (Reactive posture).
2) Add a condition for each upstream deployment: “Flow run enters Completed” filtered to that deployment.
3) Combine conditions with “all” semantics (i.e., all must occur). Choose a time window that matches how long you’re willing to wait for the slowest upstream.
4) Action: Run Deployment (select B), optionally pass parameters/tags.
This starts B only after your upstream runs are all successful; there is no running flow holding resources while it “waits.”
- Docs/reading:
- Event triggers and automations (Prefect 3)
- Automations overview blog
- No flow is an island
- Workflow design patterns
About retries with a dependency graph
- Configure retries on each flow independently. For example:
from prefect import flow
@flow(retries=3, retry_delay_seconds=300)
def upstream_A(...):
...
@flow(retries=3, retry_delay_seconds=300)
def upstream_B(...):
...
@flow(retries=2, retry_delay_seconds=600)
def flow_B(...):
...
- Your automation should trigger B only on upstream “Completed.” This means upstream retries will run to completion (or final failure) before the downstream is ever triggered.
- If an upstream ultimately fails, B will simply never run. You can add another automation to send a notification or declare an incident if all upstreams haven’t completed within your expected window.
Why not “wait inside B”?
- Programmatic waiting (e.g., prefect.flow_runs.wait_for_flow_run
) works, but the B flow stays Running and holds a worker slot/compute the entire time. This is typically not what you want in a hybrid AWS setup.
Optional: barrier/pause pattern
- You can technically start B on its own schedule, immediately check your gating conditions, and if unmet, pause/suspend the flow run to release compute:
from prefect import flow
from prefect.flow_runs import suspend_flow_run
@flow
def flow_B():
# if dependencies not satisfied:
suspend_flow_run() # releases compute; an automation can resume later
# resume and proceed only after dependencies are ready
- However, resuming a specific paused run requires targeted automations and extra coordination. In most cases, the “Run Deployment B when all upstreams complete” automation is simpler and cleaner.
One nuance to confirm
- Do you need B to run only when the “matching” upstream runs for the same logical period (e.g., same partition/date) have all completed? If yes:
- You’ll want a way to correlate runs (e.g., a date parameter/tag) and configure the automation window/filters accordingly.
- We can sketch an approach using tags/parameters or an asset/materialization pattern if you share how you want to group runs (daily, hourly, etc.) and whether you pass a partition key/date to your deployments.
If you share:
- the list of upstream deployments
- how you want to align “cycles” (daily/hourly and any partition keys)
- the maximum wait time you’re comfortable withMarvin
10/05/2025, 1:53 AMMatthew McKay
10/06/2025, 5:24 PMMarvin
10/06/2025, 5:24 PMMarvin
10/06/2025, 5:29 PMday:YYYY-MM-DD
to every upstream run. For example:
from datetime import date
from prefect import flow, tags
@flow
def flow_a1():
day_tag = f"day:{date.today().isoformat()}"
with tags(day_tag):
# ... flow logic ...
pass
Repeat for each upstream flow (A2, A3, etc.). If your flows receive a day
parameter, derive the tag from that parameter to coordinate backfills easily.
2) Configure Flow B’s deployment with a compound trigger
When deploying Flow B, set triggers so it requires completion events from each upstream flow for the same day tag, all within a 1-day window.
from datetime import timedelta
from prefect import flow
from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger, DeploymentCompoundTrigger
@flow
def flow_b():
# ... your fan-in logic ...
pass
# Build Flow B’s deployment and attach triggers
flow_b.deploy(
name="b",
work_pool_name="your-pool",
# Other deployment args as needed: image, parameters, etc.
triggers=[
DeploymentCompoundTrigger(
require="all", # wait for ALL the child triggers
within=timedelta(days=1), # group events within the same day window
triggers=[
# One event trigger per upstream flow (A1)
DeploymentEventTrigger(
expect={"prefect.flow-run.Completed"},
# Match the upstream flow by name (or match a deployment)
match={"prefect.resource.name": "flow_a1"},
# Match the related tag that starts with "day:"
# This ensures we only consider runs tagged for a specific day
match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
# Group by the specific day tag so all triggers must refer to the same tag value
for_each=["prefect.tag"],
),
# A2
DeploymentEventTrigger(
expect={"prefect.flow-run.Completed"},
match={"prefect.resource.name": "flow_a2"},
match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
for_each=["prefect.tag"],
),
# Add more triggers (A3, A4, ...) as needed
],
)
],
)
How it works:
- Each DeploymentEventTrigger fires when its upstream flow completes and has a related tag that begins with day:
(e.g., day:2025-10-06
).
- for_each=["prefect.tag"] groups events by the exact tag value so the compound trigger only fires when all upstream completions share the same day tag.
- require="all" ensures Flow B only runs when all upstream flows for that tag have completed.
You can also configure this visually in Prefect Cloud’s Automations UI:
- Create a Compound trigger
- Child triggers are “Flow run Completed” filters for each upstream flow
- Add Related resource filter “Tag” with prefix day:
for each
- Set “For each related” to Tag
- Set “Require” to All
Docs: Creating deployment triggers and Event triggers concepts
Option B: In-flow coordination (Flow B waits programmatically)
If you prefer to keep this logic inside Flow B, you can:
- Tag upstream runs as above
- Start Flow B daily (e.g., via schedule)Marvin
10/06/2025, 5:29 PMimport asyncio
from datetime import date
from prefect import flow
from prefect import get_client
from prefect.flow_runs import wait_for_flow_run
UPSTREAM_FLOW_NAMES = ["flow_a1", "flow_a2"] # extend as needed
@flow
async def flow_b():
day_tag = f"day:{date.today().isoformat()}"
async with get_client() as client:
# Find upstream flow runs for this tag/day
runs = await client.read_flow_runs(
# Filter by upstream flow names and the day tag
flow_filter={"name": {"any_": UPSTREAM_FLOW_NAMES}},
flow_run_filter={"tags": {"all_": [day_tag]}},
limit=200,
)
if len(runs) < len(UPSTREAM_FLOW_NAMES):
# If some upstream runs haven’t been created yet, you can add polling here
# or raise a pause/retry, depending on your pattern.
pass
# Wait for all found runs to complete
await asyncio.gather(*[
wait_for_flow_run(run.id, client=client, log_states=True)
for run in runs
])
# Now all upstream daily runs have completed; proceed
# ... fan-in logic ...
Notes and decisions
- Failure handling: In Option A, expect={"prefect.flow-run.Completed"} means failures will not trigger B. If you want B to run after successes and/or failures, expand expect to include Failed or handle failures explicitly in Option B.
- Time window: Adjust within=timedelta(...) to the window you consider the same “day” (e.g., 24h).
- Tag shape: Using a single day tag like day:YYYY-MM-DD
is simple and works well with “prefix: day:” matching and for_each by tag.
Would you like me to:
- Generate the exact triggers for your specific upstream flow names?
- Show the UI configuration with screenshots/step-by-step?
- Or adapt the in-flow pattern to backfills (passing an explicit day
parameter)?Matthew McKay
10/06/2025, 6:07 PMMarvin
10/06/2025, 6:07 PMMarvin
10/06/2025, 6:09 PMfrom datetime import date
from prefect import flow, tags
@flow
def time_gate(day: str | None = None):
# Use explicit param for backfills; fallback to "today"
day_str = day or date.today().isoformat()
with tags(f"day:{day_str}"):
# No-op; completes immediately
return
# Deploy with your desired cron (e.g., 05:00 daily, UTC by default)
time_gate.deploy(
name="time-gate-0500",
work_pool_name="your-pool",
cron="0 5 * * *",
)
2) Add the gate as another trigger in Flow B’s compound trigger
from datetime import timedelta
from prefect.events.schemas.deployment_triggers import (
DeploymentEventTrigger,
DeploymentCompoundTrigger,
)
flow_b.deploy(
name="b",
work_pool_name="your-pool",
triggers=[
DeploymentCompoundTrigger(
require="all",
within=timedelta(days=1),
triggers=[
# Upstream A1
DeploymentEventTrigger(
expect={"prefect.flow-run.Completed"},
match={"prefect.resource.name": "flow_a1"},
match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
for_each=["prefect.tag"],
),
# Upstream A2
DeploymentEventTrigger(
expect={"prefect.flow-run.Completed"},
match={"prefect.resource.name": "flow_a2"},
match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
for_each=["prefect.tag"],
),
# Time gate
DeploymentEventTrigger(
expect={"prefect.flow-run.Completed"},
match={"prefect.resource.name": "time_gate"},
match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
for_each=["prefect.tag"],
),
],
)
],
)
How this helps:
- Flow B will not run until all upstream flows for the day AND the time_gate run for that same day tag have completed.
- Use the same day:YYYY-MM-DD
tag across all upstream flows and the gate so the compound trigger groups by that exact day via for_each=["prefect.tag"]
.
Notes:
- Be mindful of time zones: deployment cron schedules use the server’s timezone (UTC by default). Align your tagging logic accordingly.
- Avoid giving Flow B its own schedule if you’re using event triggers, or you may get duplicate runs.
- If late upstreams are possible, adjust or remove within=timedelta(days=1)
.
Refs:
- Event triggers: Event triggers concepts
- Creating deployment triggers: Creating deployment triggers
- Schedules: Schedules
If you share your upstream flow names and desired time window/timezone, I can generate the exact trigger block for your setup.Matthew McKay
10/06/2025, 8:24 PMMarvin
10/06/2025, 8:24 PMMarvin
10/06/2025, 8:24 PMwithin=timedelta(days=1)
, all the required events for a given day tag must occur within that window. If the successful retry lands outside that window relative to the other events, the compound trigger may not fire—adjust or remove within
if needed.
- Tag consistency: Make sure the retry run emits the same day:YYYY-MM-DD
tag as the other upstreams for that date. To avoid “midnight rollover” surprises, it’s best to derive the tag from an explicit day
parameter rather than date.today()
at runtime.
- Time-gate: If you’ve also added the cron-based time-gate child trigger, B won’t start until the gate run for that same day tag has completed as well.
Refs:
- Event triggers concepts: Event triggers concepts
- Creating deployment triggers: Creating deployment triggersMatthew McKay
10/10/2025, 1:43 AMMarvin
10/10/2025, 1:44 AMMarvin
10/10/2025, 1:56 AMname: chained-deployments
prefect-version: ">=3.0.0"
# Define both deployments
deployments:
- name: upstream
entrypoint: flows/a.py:flow_a
work_pool:
name: my-pool
- name: downstream
entrypoint: flows/b.py:flow_b
work_pool:
name: my-pool
# Define the automation (event-driven chaining)
automations:
- name: run-downstream-after-upstream
description: Run downstream when upstream completes successfully
enabled: true
trigger:
type: event
# Only fire on successful flow-run completions
match:
- event: "prefect.flow-run.Completed"
# And only when that completed run came from this specific deployment
# Use the "<FLOW_NAME>/<DEPLOYMENT_NAME>" form here
match_related:
- prefect.resource.type: "prefect.deployment"
prefect.resource.name: "flow_a/upstream"
actions:
- type: run-deployment
# The deployment to kick off
deployment: "flow_b/downstream"
How it works
- The trigger listens for the event name prefect.flow-run.Completed
(successful completion).
- match_related
scopes it to only events related to the specific upstream deployment via its name <FLOW_NAME>/<DEPLOYMENT_NAME>
.
- The action run-deployment
then starts the downstream deployment by name <FLOW_NAME>/<DEPLOYMENT_NAME>
.
Deploy and verify
- From your project root, register deployments and automations:
- prefect deploy
- List/inspect automations:
- prefect automations ls
- prefect automations inspect <automation_id>
Notes and tips
- Make sure the deployment names you reference exist (the ones created by the deployments:
section).
- Avoid creating cycles (e.g., B triggering A) unless that’s intended.
- This pattern works in both Prefect Cloud and Prefect Server.
- If you prefer code-based chaining, you can also call prefect.deployments.run_deployment()
at the end of flow A, but your question asked for YAML-based configuration.
Docs for reference
- prefect.yaml overview
- Automations and deployment triggers
If you share your flow names and deployment names, I can tailor the YAML exactly to your project.Matthew McKay
10/10/2025, 2:08 AMprefect deploy
doesn't seem to be deploying the automation in the yaml. Do I need to deploy that differently somehow?Marvin
10/10/2025, 2:08 AMMarvin
10/10/2025, 2:12 AMprefect deploy
does not apply a top-level automations:
section in prefect.yaml. Automations are created either:
- from a deployment’s own triggers:
section (linked automations created at deploy time), or
- in the UI (Automations), or
- via the Python API.
There’s no prefect automations create
CLI command; the CLI can only list/inspect/pause/resume/delete existing automations.
What to do
- Move your automation into a triggers:
block under a deployment in prefect.yaml and re-run prefect deploy
.
- Or create the automation in the UI (Automations -> New automation).
Example: chain B after A completes successfully by attaching a trigger to A
name: chained-deployments
prefect-version: ">=3.0.0"
deployments:
- name: upstream
entrypoint: flows/a.py:flow_a
work_pool:
name: my-pool
triggers:
- type: event
# Only fire on successful flow-run completions
match:
- event: "prefect.flow-run.Completed"
# Scope to this specific deployment by name
match_related:
- prefect.resource.type: "prefect.deployment"
prefect.resource.name: "flow_a/upstream"
actions:
- type: run-deployment
deployment: "flow_b/downstream"
- name: downstream
entrypoint: flows/b.py:flow_b
work_pool:
name: my-pool
Then:
- prefect deploy
(from the project root)
- Verify the automation was created: prefect automations ls
Helpful docs
- prefect.yaml overview
- Creating deployment triggers (automations)
If you share your current prefect.yaml, I can adapt it to the triggers:
form for you.