Andrew Said
08/11/2025, 9:00 PMwait_for_flow_run
which prefect to invoke and wait for a flow to complete. Is it possible to trigger multiple flows from within a flow and wait for all to complete before proceeding?Marvin
08/11/2025, 9:01 PMMarvin
08/11/2025, 9:05 PMwait_for_flow_run
was a Prefect 1.x thing. In 3.x, use subflows or deployments plus futures.
Option A — Call subflows concurrently and wait
from prefect import flow
from prefect.futures import wait
@flow
def child_flow(x: int) -> int:
return x * 2
@flow
def parent_flow():
# fan-out: launch child flows concurrently
futures = [child_flow.submit(i) for i in range(10)]
# wait for all to finish (optional convenience)
wait(futures)
# fan-in: collect results
results = [f.result() for f in futures]
return results
- Each child_flow.submit(...)
returns a PrefectFlowRunFuture; wait
or f.result()
blocks until completion.
- Docs: Subflows, prefect.futures.wait
Option B — Trigger deployments and wait
If you want to launch specific deployments (not just calling the function directly), use run_deployment
. It will create the flow runs and wait for each to finish. To do many in parallel, wrap it in a task and submit those tasks.
from prefect import flow, task
from prefect.deployments.flow_runs import run_deployment
@task
def trigger(dep_name: str, params: dict):
# Blocks until the run completes or times out
return run_deployment(
name=dep_name,
parameters=params,
as_subflow=True, # keep lineage to this parent flow
timeout=3600, # optional
poll_interval=5, # optional
)
@flow
def orchestrator():
futs = [
trigger.submit("child-flow/dev", {"x": i})
for i in range(10)
]
runs = [f.result() for f in futs] # all completed here
return [r.state for r in runs]
- API: run_deployment
If you’re actually on Prefect 1.x and meant the old WaitForFlowRun
task, say so and I’ll tailor the example for that lovely antique. Otherwise, the above is the modern approach.Andrew Said
08/11/2025, 9:09 PMrun_deployment
do these have to be the same flow or could I run 3 different flows in for example futs in your example?Marvin
08/11/2025, 9:09 PMMarvin
08/11/2025, 9:10 PMrun_deployment
targets a specific deployment (by name or ID), and each deployment is tied to a flow. So you can kick off N different deployments (each for a different flow) concurrently and wait for all of them to finish.
Pattern recap: wrap run_deployment
in a task, submit those tasks, then wait/fan-in.
from prefect import flow, task
from prefect.futures import wait
from prefect.deployments.flow_runs import run_deployment
@task
def trigger(dep_name: str, params: dict):
# Blocks until the deployment's flow run completes (or times out)
return run_deployment(
name=dep_name, # e.g. "flow-a/dev"
parameters=params,
as_subflow=True, # keep lineage in the UI
timeout=3600, # optional
poll_interval=5, # optional
)
@flow
def orchestrator():
futs = [
trigger.submit("flow-a/dev", {"x": 1}),
trigger.submit("flow-b/prod", {"y": "abc"}),
trigger.submit("flow-c/main", {"z": True}),
]
# Wait for all to complete
wait(futs)
# Fan-in: collect the finished FlowRun objects (states, IDs, etc.)
runs = [f.result() for f in futs]
return [r.state for r in runs]
Notes:
- Deployment names use the format "<flow_name>/<deployment_name>"
or you can pass a deployment UUID.
- run_deployment
returns a FlowRun object; if you need the called flow’s return value, persist it (e.g., via artifacts or result storage) and fetch it separately.
Refs:
- run_deployment API: prefect.deployments.flow_runs.run_deployment
- Waiting on futures: prefect.futures.waitAndrew Said
08/11/2025, 9:11 PMMarvin
08/11/2025, 9:11 PMMarvin
08/11/2025, 9:13 PM@flow(timeout_seconds=...)
.
- Any infrastructure-level limits (e.g., if you were using your own k8s/containers). Managed work pools don’t add an extra runtime cap beyond what you configure in your flows.
If you want to enforce hard caps, set them explicitly:
- Flow-level timeout: @flow(timeout_seconds=...)
— docs: flow API
- Waiting on child runs: run_deployment(timeout=...)
— this only limits the wait, not the run itself: run_deployment
If you’re targeting very long runs (multi-day) on Managed, tell me your expected duration and I’ll double-check current service limits for your plan. Otherwise, you’re safe to choose whatever timeout makes sense for the parent’s waiting behavior.Andrew Said
08/11/2025, 9:14 PMMarvin
08/11/2025, 9:14 PMMarvin
08/11/2025, 9:15 PMrun_deployment(timeout=None)
(the default) to wait indefinitely, or set a large enough timeout (e.g., a few hours + buffer). This only limits how long the parent waits; it does not kill the child run.
- If you set a flow-level timeout on the orchestrator, make sure it’s longer than the total fan-out + fan-in time: @flow(timeout_seconds=...)
.
- Keep as_subflow=True
for lineage, and consider a slightly higher poll_interval
if you’re triggering lots of runs (e.g., 10–30s) to reduce chatter.
Example:
from prefect import flow, task
from prefect.deployments.flow_runs import run_deployment
@task
def trigger(dep_name: str, params: dict):
return run_deployment(
name=dep_name,
parameters=params,
as_subflow=True,
timeout=None, # wait as long as it takes
poll_interval=10, # optional
)
@flow(timeout_seconds=4*60*60) # e.g., 4 hours
def orchestrator():
futs = [
trigger.submit("flow-a/dev", {"x": 1}),
trigger.submit("flow-b/prod", {"y": "abc"}),
trigger.submit("flow-c/main", {"z": True}),
]
runs = [f.result() for f in futs]
return [r.state for r in runs]
Refs:
- Flow decorator and timeouts: https://docs.prefect.io/latest/api-ref/flows/#prefect.flow
- run_deployment: https://docs.prefect.io/latest/api-ref/deployments/#prefect.deployments.flow_runs.run_deployment
If your runs start nudging toward “movie marathon” lengths, tell me your ballpark and I’ll sanity-check against any current plan limits.