Romain Lefebvre
10/09/2025, 9:52 AMrun_deployment to run subflows within a flow. How am I able to get the result of the subflows executed ?
Can I run run_deployment in an async way ?
i want to block execution while the subflow hasn't finished. Is the blocking, an async blocking ? does that mean that I can use asyncio.gather to run and wait for multiple subflows to be terminated ?Marvin
10/09/2025, 9:52 AMMarvin
10/09/2025, 9:55 AMrun_deployment is async.
- When you await run_deployment(...), it will wait until the triggered flow run reaches a terminal state (Completed/Failed/Cancelled) or until the optional timeout elapses.
- It returns a FlowRun object. To get the subflow’s returned value, read the final state and resolve its result.
- Because it’s async “blocking,” you can run many concurrently with asyncio.gather and await them all.
Code examples
1) Run a single deployment and get its result
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import get_state_result
@flow
async def parent():
# as_subflow defaults to True so this appears as a subflow in the UI
flow_run = await run_deployment(
name="my-subflow/my-deployment",
parameters={"x": 1},
timeout=3600, # optional; wait up to 1 hour
poll_interval=5, # optional; poll every 5 seconds
)
# flow_run.state is a Prefect State with the subflow's final status
if flow_run.state.is_completed():
result = get_state_result(flow_run.state) # materialize the return value
return result
else:
# Raise the underlying exception if it failed/crashed/cancelled
from prefect.states import raise_state_exception
raise_state_exception(flow_run.state)
2) Run multiple deployments concurrently and wait for all
import asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import get_state_result
@flow
async def parent():
runs = await asyncio.gather(
run_deployment(name="flow-a/dep-a", parameters={"p": 1}),
run_deployment(name="flow-b/dep-b", parameters={"p": 2}),
run_deployment(name="flow-c/dep-c", parameters={"p": 3}),
)
# Each item is a FlowRun with a final state
results = [get_state_result(r.state) for r in runs] # will raise on failures
return results
Notes and tips
- as_subflow=True (the default) ensures the triggered run is tracked as a subflow of the parent in the UI and inherits context/tags. Set as_subflow=False if you want a decoupled run.
- timeout=None (default) means “wait indefinitely” for the subflow to finish. Tune poll_interval to control how often the state is checked.
- If you prefer more explicit control, you can also call prefect.flow_runs.wait_for_flow_run(flow_run_id) yourself; run_deployment already does this waiting for you under the hood.
References
- API source for `run_deployment`: prefect/deployments/flow_runs.py
- API source for `wait_for_flow_run`: prefect/flow_runs.py
- Docs: Run deployments programmatically
If you share your parent/subflow signatures, I can tailor the snippet to your parameters and return types.