<@ULVA73B9P> Is it possible to have two flows sche...
# ask-marvin
m
@Marvin Is it possible to have two flows scheduled for different times via cron, but have one always wait for the other to finish successfully before starting? For example, flow A runs at 1pm and flow B runs at 2pm, but if flow A takes more than an hour, flow B won't start until A is done. Also, flow B won't start until 2pm, even if flow A finishes early.
m
thought for 208.9 seconds
Yes—there are a couple good ways to do this in Prefect 3.x. The right choice depends on whether you want a purely declarative setup (Automations) or you’re comfortable adding a small guard to Flow B’s code. Key requirement recap: - B should never start before its own schedule (2pm) - If A overruns past 2pm, B must wait for A to finish - Optionally: If A fails, B should not run Option 1: Keep both crons and add a global concurrency limit (no code changes) - What it gives you: - If A is still running at 2pm, B’s run will wait in AwaitingConcurrencySlot until A finishes. - B will still never start before 2pm because it has its own schedule. - How: - Create a global concurrency limit (e.g., key: a-and-b, limit: 1). - Tag both deployments (A and B) with the same tag (e.g., a-and-b). - Result: A and B will never overlap; B will queue if A overruns. - Handling A failures (optional): - Add an Automation so that if A fails, the next scheduled B run is cancelled (or left paused). This ensures B doesn’t run when A failed. - Docs: - Global concurrency limits - Automations and triggers Option 2: Keep both crons and add a simple “wait-for-A” gate at the top of Flow B - What it gives you: - B only starts its work after confirming A’s corresponding run has completed successfully. - If A failed, B can exit early. - B never starts early because its run is not created until 2pm. - Minimal example for B:
Copy code
from prefect import flow, get_run_logger
  from prefect.flow_runs import wait_for_flow_run

  @flow
  def flow_b(upstream_run_id: str | None = None):
      logger = get_run_logger()

      # If we were given A's run id, wait for it to finish
      if upstream_run_id:
          fr = wait_for_flow_run(upstream_run_id, timeout=6 * 60 * 60)  # 6h cap; adjust as needed
          state_name = fr.state.name  # "Completed", "Failed", etc.
          <http://logger.info|logger.info>(f"Upstream Flow A finished with state: {state_name}")

          if state_name != "Completed":
              raise RuntimeError("Upstream Flow A did not complete successfully; aborting Flow B.")

      # ...rest of B's work...
- How to pass `upstream_run_id`: - If you use Prefect Cloud Automations, you can trigger/run B when A completes and include A’s
flow_run_id
as a parameter to B. If A finishes before 2pm, set
scheduled_time
for B to 2pm so B won’t start early. If A finishes after 2pm, schedule B “now”. - Reference: Run deployments and Automations - Alternate: If you don’t want to pass a run id, you can query for A’s “expected” run (e.g., the 1pm slot that corresponds to B’s 2pm slot) via the Prefect client, then use
wait_for_flow_run
on that id. Option 3: Let A’s success drive B (remove B’s cron) - What it gives you: - B only runs when A completes successfully. - If A finishes early, schedule B for 2pm; if A finishes late, schedule B immediately; if A fails, don’t run B at all. - How: - Set an Automation on “A completed successfully” with an action to “RunDeployment” for B. - If A completes before 2pm, set B’s
scheduled_time
to 2pm; if after 2pm, omit
scheduled_time
so B runs now. - Docs: Automations and Run deployments Recommendation - If you want minimal changes and you’re okay with B being created at 2pm and waiting when A overruns: use Option 1 (global concurrency limit). Add an Automation to cancel B if A fails. - If you want explicit “A must have completed successfully” logic in code: use Option 2 with the
wait_for_flow_run
gate in B.
- If you want the cleanest dependency without two separate crons: Option 3 (Automations) is the most declarative. A couple links you’ll likely need: - Run deployments programmatically (run_deployment) - Automations (event/sequence triggers and actions) - Global concurrency limits Quick clarifications to tailor guidance: - Are you on Prefect Cloud (Automations available) or OSS? - Should B be skipped entirely if A fails? - Do you prefer a no-code/declarative setup (Automations + concurrency) or a small code change in B?
m
@Marvin we are using Prefect cloud, hybrid with AWS. The situation is more complicated than the original example, as in reality there is a graph of dependent flows. Flow B has multiple different dependencies, all of which trigger off of different crons and all of which must finish before B can start. Thus we can't use concurrency limits. To clarify, flow B should be skipped if the flows it depends on don't complete successfully. It seems like option 2 is our best bet, as we can have flow B wait on all of the dependent flows. However that would mean that flow B is "executing" and using compute while waiting for its dependencies to finish, correct? Is there no way to only have flow B start when its dependencies are complete? And how would retries work with a graph of dependencies like this, would we have to individually retry each flow?
m
thought for 206.9 seconds
Great question — and you’re right to think carefully about compute usage with a dependency graph like this. Short answer - Yes: having Flow B “wait” inside the flow (e.g., with
wait_for_flow_run
) will keep B running and consume compute. - Use Prefect Cloud Automations to start B only after all upstream flows complete successfully. This is event-driven and does not hold compute. - Retries: set them on each flow (and/or tasks) independently; B should trigger only on Completed upstreams so you don’t have to manually retry upstreams from B. Recommended pattern (no compute while waiting) - Create a single Cloud Automation that runs Deployment B when all upstreams complete: 1) Create an Automation (Reactive posture). 2) Add a condition for each upstream deployment: “Flow run enters Completed” filtered to that deployment. 3) Combine conditions with “all” semantics (i.e., all must occur). Choose a time window that matches how long you’re willing to wait for the slowest upstream. 4) Action: Run Deployment (select B), optionally pass parameters/tags. This starts B only after your upstream runs are all successful; there is no running flow holding resources while it “waits.” - Docs/reading: - Event triggers and automations (Prefect 3) - Automations overview blog - No flow is an island - Workflow design patterns About retries with a dependency graph - Configure retries on each flow independently. For example:
Copy code
from prefect import flow

  @flow(retries=3, retry_delay_seconds=300)
  def upstream_A(...):
      ...

  @flow(retries=3, retry_delay_seconds=300)
  def upstream_B(...):
      ...

  @flow(retries=2, retry_delay_seconds=600)
  def flow_B(...):
      ...
- Your automation should trigger B only on upstream “Completed.” This means upstream retries will run to completion (or final failure) before the downstream is ever triggered. - If an upstream ultimately fails, B will simply never run. You can add another automation to send a notification or declare an incident if all upstreams haven’t completed within your expected window. Why not “wait inside B”? - Programmatic waiting (e.g.,
prefect.flow_runs.wait_for_flow_run
) works, but the B flow stays Running and holds a worker slot/compute the entire time. This is typically not what you want in a hybrid AWS setup. Optional: barrier/pause pattern - You can technically start B on its own schedule, immediately check your gating conditions, and if unmet, pause/suspend the flow run to release compute:
Copy code
from prefect import flow
  from prefect.flow_runs import suspend_flow_run

  @flow
  def flow_B():
      # if dependencies not satisfied:
      suspend_flow_run()  # releases compute; an automation can resume later
      # resume and proceed only after dependencies are ready
- However, resuming a specific paused run requires targeted automations and extra coordination. In most cases, the “Run Deployment B when all upstreams complete” automation is simpler and cleaner. One nuance to confirm - Do you need B to run only when the “matching” upstream runs for the same logical period (e.g., same partition/date) have all completed? If yes: - You’ll want a way to correlate runs (e.g., a date parameter/tag) and configure the automation window/filters accordingly. - We can sketch an approach using tags/parameters or an asset/materialization pattern if you share how you want to group runs (daily, hourly, etc.) and whether you pass a partition key/date to your deployments. If you share: - the list of upstream deployments - how you want to align “cycles” (daily/hourly and any partition keys) - the maximum wait time you’re comfortable with
…I can propose an exact Automation setup (UI steps and/or code) that will gate B precisely the way you want, with no idle compute.
m
@Marvin Yes, flow B should only run when all of its matching upstream runs have completed. These are all daily runs, so tagging them with a date would suffice. What would an example of that look like so that flow B waits on all of its dependencies for the corresponding day period?
m
thought for 336.0 seconds
Great — tagging by day is a solid way to coordinate this. Below are two proven patterns in Prefect 3.x to ensure “Flow B” only runs after all daily upstream runs (e.g., Flow A1, A2, …) for the same day have completed. Option A: Event-driven Deployment Trigger (recommended) Have Flow B’s deployment trigger automatically when all upstream flows complete for the same tag/day. This uses a compound trigger with per-tag grouping. 1) Tag your upstream flows with the day Add a tag like
day:YYYY-MM-DD
to every upstream run. For example:
Copy code
from datetime import date
from prefect import flow, tags

@flow
def flow_a1():
    day_tag = f"day:{date.today().isoformat()}"
    with tags(day_tag):
        # ... flow logic ...
        pass
Repeat for each upstream flow (A2, A3, etc.). If your flows receive a
day
parameter, derive the tag from that parameter to coordinate backfills easily. 2) Configure Flow B’s deployment with a compound trigger When deploying Flow B, set triggers so it requires completion events from each upstream flow for the same day tag, all within a 1-day window.
Copy code
from datetime import timedelta
from prefect import flow
from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger, DeploymentCompoundTrigger

@flow
def flow_b():
    # ... your fan-in logic ...
    pass

# Build Flow B’s deployment and attach triggers
flow_b.deploy(
    name="b",
    work_pool_name="your-pool",
    # Other deployment args as needed: image, parameters, etc.
    triggers=[
        DeploymentCompoundTrigger(
            require="all",                     # wait for ALL the child triggers
            within=timedelta(days=1),          # group events within the same day window
            triggers=[
                # One event trigger per upstream flow (A1)
                DeploymentEventTrigger(
                    expect={"prefect.flow-run.Completed"},
                    # Match the upstream flow by name (or match a deployment)
                    match={"prefect.resource.name": "flow_a1"},
                    # Match the related tag that starts with "day:"
                    # This ensures we only consider runs tagged for a specific day
                    match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
                    # Group by the specific day tag so all triggers must refer to the same tag value
                    for_each=["prefect.tag"],
                ),
                # A2
                DeploymentEventTrigger(
                    expect={"prefect.flow-run.Completed"},
                    match={"prefect.resource.name": "flow_a2"},
                    match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
                    for_each=["prefect.tag"],
                ),
                # Add more triggers (A3, A4, ...) as needed
            ],
        )
    ],
)
How it works: - Each DeploymentEventTrigger fires when its upstream flow completes and has a related tag that begins with
day:
(e.g.,
day:2025-10-06
). - for_each=["prefect.tag"] groups events by the exact tag value so the compound trigger only fires when all upstream completions share the same day tag. - require="all" ensures Flow B only runs when all upstream flows for that tag have completed. You can also configure this visually in Prefect Cloud’s Automations UI: - Create a Compound trigger - Child triggers are “Flow run Completed” filters for each upstream flow - Add Related resource filter “Tag” with prefix
day:
for each - Set “For each related” to Tag - Set “Require” to All Docs: Creating deployment triggers and Event triggers concepts Option B: In-flow coordination (Flow B waits programmatically) If you prefer to keep this logic inside Flow B, you can: - Tag upstream runs as above - Start Flow B daily (e.g., via schedule)
- Query for the upstream runs with today’s tag and await them Skeleton example:
Copy code
import asyncio
from datetime import date
from prefect import flow
from prefect import get_client
from prefect.flow_runs import wait_for_flow_run

UPSTREAM_FLOW_NAMES = ["flow_a1", "flow_a2"]  # extend as needed

@flow
async def flow_b():
    day_tag = f"day:{date.today().isoformat()}"

    async with get_client() as client:
        # Find upstream flow runs for this tag/day
        runs = await client.read_flow_runs(
            # Filter by upstream flow names and the day tag
            flow_filter={"name": {"any_": UPSTREAM_FLOW_NAMES}},
            flow_run_filter={"tags": {"all_": [day_tag]}},
            limit=200,
        )

        if len(runs) < len(UPSTREAM_FLOW_NAMES):
            # If some upstream runs haven’t been created yet, you can add polling here
            # or raise a pause/retry, depending on your pattern.
            pass

        # Wait for all found runs to complete
        await asyncio.gather(*[
            wait_for_flow_run(run.id, client=client, log_states=True)
            for run in runs
        ])

    # Now all upstream daily runs have completed; proceed
    # ... fan-in logic ...
Notes and decisions - Failure handling: In Option A, expect={"prefect.flow-run.Completed"} means failures will not trigger B. If you want B to run after successes and/or failures, expand expect to include Failed or handle failures explicitly in Option B. - Time window: Adjust within=timedelta(...) to the window you consider the same “day” (e.g., 24h). - Tag shape: Using a single day tag like
day:YYYY-MM-DD
is simple and works well with “prefix: day:” matching and for_each by tag. Would you like me to: - Generate the exact triggers for your specific upstream flow names? - Show the UI configuration with screenshots/step-by-step? - Or adapt the in-flow pattern to backfills (passing an explicit
day
parameter)?
m
@Marvin With the Event-driven Deployment Trigger (Option A), is it possible to also include a cron dependency so that flow B doesn't start until a certain time? Or would the best way to achieve that be having a separate upstream dependency of B that triggers off of a cron and immediately succeeds, so that B will not start until after that time?
m
thought for 101.7 seconds
Short answer: Event triggers don’t have a built-in “cron/not-before” gate. The two clean patterns are: - Add a “time-gate” upstream flow that runs on a cron at the allowed time and immediately completes; include it as another child trigger in Flow B’s compound trigger (require="all"). - Or schedule Flow B by cron and, in Flow B, wait for all upstream day-tag runs to complete before proceeding. Since you’re already using Option A, the time-gate flow is a great fit. Example: 1) Time-gate flow that runs at the allowed time and tags the day
Copy code
from datetime import date
from prefect import flow, tags

@flow
def time_gate(day: str | None = None):
    # Use explicit param for backfills; fallback to "today"
    day_str = day or date.today().isoformat()
    with tags(f"day:{day_str}"):
        # No-op; completes immediately
        return

# Deploy with your desired cron (e.g., 05:00 daily, UTC by default)
time_gate.deploy(
    name="time-gate-0500",
    work_pool_name="your-pool",
    cron="0 5 * * *",
)
2) Add the gate as another trigger in Flow B’s compound trigger
Copy code
from datetime import timedelta
from prefect.events.schemas.deployment_triggers import (
    DeploymentEventTrigger,
    DeploymentCompoundTrigger,
)

flow_b.deploy(
    name="b",
    work_pool_name="your-pool",
    triggers=[
        DeploymentCompoundTrigger(
            require="all",
            within=timedelta(days=1),
            triggers=[
                # Upstream A1
                DeploymentEventTrigger(
                    expect={"prefect.flow-run.Completed"},
                    match={"prefect.resource.name": "flow_a1"},
                    match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
                    for_each=["prefect.tag"],
                ),
                # Upstream A2
                DeploymentEventTrigger(
                    expect={"prefect.flow-run.Completed"},
                    match={"prefect.resource.name": "flow_a2"},
                    match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
                    for_each=["prefect.tag"],
                ),
                # Time gate
                DeploymentEventTrigger(
                    expect={"prefect.flow-run.Completed"},
                    match={"prefect.resource.name": "time_gate"},
                    match_related={"prefect.resource.id": {"prefix": "prefect.tag.day:"}},
                    for_each=["prefect.tag"],
                ),
            ],
        )
    ],
)
How this helps: - Flow B will not run until all upstream flows for the day AND the time_gate run for that same day tag have completed. - Use the same
day:YYYY-MM-DD
tag across all upstream flows and the gate so the compound trigger groups by that exact day via
for_each=["prefect.tag"]
. Notes: - Be mindful of time zones: deployment cron schedules use the server’s timezone (UTC by default). Align your tagging logic accordingly. - Avoid giving Flow B its own schedule if you’re using event triggers, or you may get duplicate runs. - If late upstreams are possible, adjust or remove
within=timedelta(days=1)
. Refs: - Event triggers: Event triggers concepts - Creating deployment triggers: Creating deployment triggers - Schedules: Schedules If you share your upstream flow names and desired time window/timezone, I can generate the exact trigger block for your setup.
m
@Marvin In the example above, if flow_a1 failed then flow_b wouldn't start, correct? If flow_a1 was then retried an hour later and succeeded, would flow_b then automatically trigger off of that without needing to do anything else?
m
thought for 24.3 seconds
Yes, that’s correct. With the event-driven approach as shown (expect={"prefect.flow-run.Completed"}): - Flow B will NOT start when flow_a1 fails. - If flow_a1 is retried later and reaches Completed for the same day tag, that Completed event will satisfy flow_a1’s child trigger. As soon as all the other child triggers for that same day tag are satisfied too, the compound trigger will fire and Flow B will start. No extra steps needed. A couple of practical notes: - Time window: If you set
within=timedelta(days=1)
, all the required events for a given day tag must occur within that window. If the successful retry lands outside that window relative to the other events, the compound trigger may not fire—adjust or remove
within
if needed. - Tag consistency: Make sure the retry run emits the same
day:YYYY-MM-DD
tag as the other upstreams for that date. To avoid “midnight rollover” surprises, it’s best to derive the tag from an explicit
day
parameter rather than
date.today()
at runtime. - Time-gate: If you’ve also added the cron-based time-gate child trigger, B won’t start until the gate run for that same day tag has completed as well. Refs: - Event triggers concepts: Event triggers concepts - Creating deployment triggers: Creating deployment triggers
m
@Marvin how would I define two deployments in YAML such that one will run after the other completes successfully?
m
thought for 771.6 seconds
Great question. In Prefect 3.x, you chain deployments by adding an automation to your prefect.yaml that listens for the “upstream” deployment’s successful flow-run completion event and then runs the “downstream” deployment. Here’s a minimal, end-to-end example of prefect.yaml with two deployments and an automation to run B after A completes successfully:
Copy code
name: chained-deployments
prefect-version: ">=3.0.0"

# Define both deployments
deployments:
  - name: upstream
    entrypoint: flows/a.py:flow_a
    work_pool:
      name: my-pool

  - name: downstream
    entrypoint: flows/b.py:flow_b
    work_pool:
      name: my-pool

# Define the automation (event-driven chaining)
automations:
  - name: run-downstream-after-upstream
    description: Run downstream when upstream completes successfully
    enabled: true
    trigger:
      type: event
      # Only fire on successful flow-run completions
      match:
        - event: "prefect.flow-run.Completed"
      # And only when that completed run came from this specific deployment
      # Use the "<FLOW_NAME>/<DEPLOYMENT_NAME>" form here
      match_related:
        - prefect.resource.type: "prefect.deployment"
          prefect.resource.name: "flow_a/upstream"
    actions:
      - type: run-deployment
        # The deployment to kick off
        deployment: "flow_b/downstream"
How it works - The trigger listens for the event name
prefect.flow-run.Completed
(successful completion). -
match_related
scopes it to only events related to the specific upstream deployment via its name
<FLOW_NAME>/<DEPLOYMENT_NAME>
. - The action
run-deployment
then starts the downstream deployment by name
<FLOW_NAME>/<DEPLOYMENT_NAME>
. Deploy and verify - From your project root, register deployments and automations: -
prefect deploy
- List/inspect automations: -
prefect automations ls
-
prefect automations inspect <automation_id>
Notes and tips - Make sure the deployment names you reference exist (the ones created by the
deployments:
section). - Avoid creating cycles (e.g., B triggering A) unless that’s intended. - This pattern works in both Prefect Cloud and Prefect Server. - If you prefer code-based chaining, you can also call
prefect.deployments.run_deployment()
at the end of flow A, but your question asked for YAML-based configuration. Docs for reference - prefect.yaml overview - Automations and deployment triggers If you share your flow names and deployment names, I can tailor the YAML exactly to your project.
m
@Marvin
prefect deploy
doesn't seem to be deploying the automation in the yaml. Do I need to deploy that differently somehow?
m
thought for 228.9 seconds
Short answer: yes — that’s why it isn’t showing up. In Prefect 3.x,
prefect deploy
does not apply a top-level
automations:
section in prefect.yaml. Automations are created either: - from a deployment’s own
triggers:
section (linked automations created at deploy time), or - in the UI (Automations), or - via the Python API. There’s no
prefect automations create
CLI command; the CLI can only list/inspect/pause/resume/delete existing automations. What to do - Move your automation into a
triggers:
block under a deployment in prefect.yaml and re-run
prefect deploy
. - Or create the automation in the UI (Automations -> New automation). Example: chain B after A completes successfully by attaching a trigger to A
Copy code
name: chained-deployments
prefect-version: ">=3.0.0"

deployments:
  - name: upstream
    entrypoint: flows/a.py:flow_a
    work_pool:
      name: my-pool
    triggers:
      - type: event
        # Only fire on successful flow-run completions
        match:
          - event: "prefect.flow-run.Completed"
        # Scope to this specific deployment by name
        match_related:
          - prefect.resource.type: "prefect.deployment"
            prefect.resource.name: "flow_a/upstream"
        actions:
          - type: run-deployment
            deployment: "flow_b/downstream"

  - name: downstream
    entrypoint: flows/b.py:flow_b
    work_pool:
      name: my-pool
Then: -
prefect deploy
(from the project root) - Verify the automation was created:
prefect automations ls
Helpful docs - prefect.yaml overview - Creating deployment triggers (automations) If you share your current prefect.yaml, I can adapt it to the
triggers:
form for you.