<@ULVA73B9P> I'm using `run_deployment` to run sub...
# ask-marvin
r
@Marvin I'm using
run_deployment
to run subflows within a flow. How am I able to get the result of the subflows executed ? Can I run
run_deployment
in an async way ? i want to block execution while the subflow hasn't finished. Is the blocking, an async blocking ? does that mean that I can use
asyncio.gather
to run and wait for multiple subflows to be terminated ?
m
thought for 210.2 seconds
Short answer - Yes: in Prefect 3.x,
run_deployment
is async. - When you
await run_deployment(...)
, it will wait until the triggered flow run reaches a terminal state (Completed/Failed/Cancelled) or until the optional
timeout
elapses. - It returns a
FlowRun
object. To get the subflow’s returned value, read the final state and resolve its result. - Because it’s async “blocking,” you can run many concurrently with
asyncio.gather
and await them all. Code examples 1) Run a single deployment and get its result
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import get_state_result

@flow
async def parent():
    # as_subflow defaults to True so this appears as a subflow in the UI
    flow_run = await run_deployment(
        name="my-subflow/my-deployment",
        parameters={"x": 1},
        timeout=3600,          # optional; wait up to 1 hour
        poll_interval=5,       # optional; poll every 5 seconds
    )

    # flow_run.state is a Prefect State with the subflow's final status
    if flow_run.state.is_completed():
        result = get_state_result(flow_run.state)  # materialize the return value
        return result
    else:
        # Raise the underlying exception if it failed/crashed/cancelled
        from prefect.states import raise_state_exception
        raise_state_exception(flow_run.state)
2) Run multiple deployments concurrently and wait for all
Copy code
import asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import get_state_result

@flow
async def parent():
    runs = await asyncio.gather(
        run_deployment(name="flow-a/dep-a", parameters={"p": 1}),
        run_deployment(name="flow-b/dep-b", parameters={"p": 2}),
        run_deployment(name="flow-c/dep-c", parameters={"p": 3}),
    )
    # Each item is a FlowRun with a final state
    results = [get_state_result(r.state) for r in runs]  # will raise on failures
    return results
Notes and tips -
as_subflow=True
(the default) ensures the triggered run is tracked as a subflow of the parent in the UI and inherits context/tags. Set
as_subflow=False
if you want a decoupled run. -
timeout=None
(default) means “wait indefinitely” for the subflow to finish. Tune
poll_interval
to control how often the state is checked. - If you prefer more explicit control, you can also call
prefect.flow_runs.wait_for_flow_run(flow_run_id)
yourself;
run_deployment
already does this waiting for you under the hood. References - API source for `run_deployment`: prefect/deployments/flow_runs.py - API source for `wait_for_flow_run`: prefect/flow_runs.py - Docs: Run deployments programmatically If you share your parent/subflow signatures, I can tailor the snippet to your parameters and return types.