<@ULVA73B9P> I know there is a function called `wa...
# ask-marvin
a
@Marvin I know there is a function called
wait_for_flow_run
which prefect to invoke and wait for a flow to complete. Is it possible to trigger multiple flows from within a flow and wait for all to complete before proceeding?
m
thought for 240.1 seconds
Short answer: yes. In Prefect 3.x you can fan-out to multiple flows and fan-in after they finish. A quick note:
wait_for_flow_run
was a Prefect 1.x thing. In 3.x, use subflows or deployments plus futures. Option A — Call subflows concurrently and wait
Copy code
from prefect import flow
from prefect.futures import wait

@flow
def child_flow(x: int) -> int:
    return x * 2

@flow
def parent_flow():
    # fan-out: launch child flows concurrently
    futures = [child_flow.submit(i) for i in range(10)]

    # wait for all to finish (optional convenience)
    wait(futures)

    # fan-in: collect results
    results = [f.result() for f in futures]
    return results
- Each
child_flow.submit(...)
returns a PrefectFlowRunFuture;
wait
or
f.result()
blocks until completion. - Docs: Subflows, prefect.futures.wait Option B — Trigger deployments and wait If you want to launch specific deployments (not just calling the function directly), use
run_deployment
. It will create the flow runs and wait for each to finish. To do many in parallel, wrap it in a task and submit those tasks.
Copy code
from prefect import flow, task
from prefect.deployments.flow_runs import run_deployment

@task
def trigger(dep_name: str, params: dict):
    # Blocks until the run completes or times out
    return run_deployment(
        name=dep_name,
        parameters=params,
        as_subflow=True,   # keep lineage to this parent flow
        timeout=3600,      # optional
        poll_interval=5,   # optional
    )

@flow
def orchestrator():
    futs = [
        trigger.submit("child-flow/dev", {"x": i})
        for i in range(10)
    ]
    runs = [f.result() for f in futs]  # all completed here
    return [r.state for r in runs]
- API: run_deployment If you’re actually on Prefect 1.x and meant the old
WaitForFlowRun
task, say so and I’ll tailor the example for that lovely antique. Otherwise, the above is the modern approach.
a
@Marvin I am using prefect 3. If I am using
run_deployment
do these have to be the same flow or could I run 3 different flows in for example futs in your example?
m
thought for 53.2 seconds
Yes — they can be different flows.
run_deployment
targets a specific deployment (by name or ID), and each deployment is tied to a flow. So you can kick off N different deployments (each for a different flow) concurrently and wait for all of them to finish. Pattern recap: wrap
run_deployment
in a task, submit those tasks, then wait/fan-in.
Copy code
from prefect import flow, task
from prefect.futures import wait
from prefect.deployments.flow_runs import run_deployment

@task
def trigger(dep_name: str, params: dict):
    # Blocks until the deployment's flow run completes (or times out)
    return run_deployment(
        name=dep_name,            # e.g. "flow-a/dev"
        parameters=params,
        as_subflow=True,          # keep lineage in the UI
        timeout=3600,             # optional
        poll_interval=5,          # optional
    )

@flow
def orchestrator():
    futs = [
        trigger.submit("flow-a/dev", {"x": 1}),
        trigger.submit("flow-b/prod", {"y": "abc"}),
        trigger.submit("flow-c/main", {"z": True}),
    ]

    # Wait for all to complete
    wait(futs)

    # Fan-in: collect the finished FlowRun objects (states, IDs, etc.)
    runs = [f.result() for f in futs]
    return [r.state for r in runs]
Notes: - Deployment names use the format
"<flow_name>/<deployment_name>"
or you can pass a deployment UUID. -
run_deployment
returns a FlowRun object; if you need the called flow’s return value, persist it (e.g., via artifacts or result storage) and fetch it separately. Refs: - run_deployment API: prefect.deployments.flow_runs.run_deployment - Waiting on futures: prefect.futures.wait
a
@Marvin Thank you. Is there a maximum timeout limit if the orchestrator and subflows are running in a managed work pool?
m
thought for 124.1 seconds
Short version: there’s no hidden “max timeout” imposed by Prefect just because you’re using a Managed work pool. Details: - run_deployment(timeout=...) only controls how long the calling code waits for the child run to finish. If it hits the timeout, the call returns/raises, but the child flow run keeps running in the work pool. - The actual lifetime of your orchestrator or subflows is governed by: - Any timeouts you set on the flows themselves via
@flow(timeout_seconds=...)
. - Any infrastructure-level limits (e.g., if you were using your own k8s/containers). Managed work pools don’t add an extra runtime cap beyond what you configure in your flows. If you want to enforce hard caps, set them explicitly: - Flow-level timeout:
@flow(timeout_seconds=...)
— docs: flow API - Waiting on child runs:
run_deployment(timeout=...)
— this only limits the wait, not the run itself: run_deployment If you’re targeting very long runs (multi-day) on Managed, tell me your expected duration and I’ll double-check current service limits for your plan. Otherwise, you’re safe to choose whatever timeout makes sense for the parent’s waiting behavior.
a
@Marvin I don’t expect multi day runs, but it could take a couple of hours for all flows to run
m
thought for 25.8 seconds
You should be fine. A few hours is well within the norm, even on Managed work pools. A couple of practical notes so nothing times out prematurely: - Leave
run_deployment(timeout=None)
(the default) to wait indefinitely, or set a large enough timeout (e.g., a few hours + buffer). This only limits how long the parent waits; it does not kill the child run. - If you set a flow-level timeout on the orchestrator, make sure it’s longer than the total fan-out + fan-in time:
@flow(timeout_seconds=...)
. - Keep
as_subflow=True
for lineage, and consider a slightly higher
poll_interval
if you’re triggering lots of runs (e.g., 10–30s) to reduce chatter. Example:
Copy code
from prefect import flow, task
from prefect.deployments.flow_runs import run_deployment

@task
def trigger(dep_name: str, params: dict):
    return run_deployment(
        name=dep_name,
        parameters=params,
        as_subflow=True,
        timeout=None,       # wait as long as it takes
        poll_interval=10,   # optional
    )

@flow(timeout_seconds=4*60*60)  # e.g., 4 hours
def orchestrator():
    futs = [
        trigger.submit("flow-a/dev", {"x": 1}),
        trigger.submit("flow-b/prod", {"y": "abc"}),
        trigger.submit("flow-c/main", {"z": True}),
    ]
    runs = [f.result() for f in futs]
    return [r.state for r in runs]
Refs: - Flow decorator and timeouts: https://docs.prefect.io/latest/api-ref/flows/#prefect.flow - run_deployment: https://docs.prefect.io/latest/api-ref/deployments/#prefect.deployments.flow_runs.run_deployment If your runs start nudging toward “movie marathon” lengths, tell me your ballpark and I’ll sanity-check against any current plan limits.