<@ULVA73B9P> Can I run another deployed flow from ...
# ask-marvin
t
@Marvin Can I run another deployed flow from within a flow run and get its result for later use in the current flow run?
m
thought for 975.0 seconds
thought for 318.8 seconds
Yes. In Prefect 3.x you have two good options: 1) Call the other flow as a subflow (simplest, returns the value directly) - Just call the flow function from within your flow. Prefect will create a subflow run and return the child flow’s return value to your parent flow. - Best when both flows share the same code/env and you don’t need to hop to different infrastructure. Example:
Copy code
from prefect import flow

@flow
def child_flow(x: int) -> int:
    return x + 1

@flow
def parent_flow():
    y = child_flow(41)  # runs as a subflow, returns 42
    # use y later in this flow run
    return y
2) Trigger another deployment from your flow and fetch its result - Use
prefect.deployments.flow_runs.run_deployment(...)
to start a run of a deployed flow. By default, it links as a subflow in the UI (
as_subflow=True
). - Wait for it to complete (it returns a FlowRun with its final state), then extract the child flow’s return value from the state using
prefect.states.get_state_result
. Example:
Copy code
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
from prefect.states import get_state_result

@flow
async def parent_flow():
    # "<flow-name>/<deployment-name>"
    child_run = await run_deployment(
        name="child-flow/production",
        parameters={"x": 41},
        as_subflow=True,   # default; shows up as a subflow in the UI
        timeout=60 * 30,   # optional: wait up to 30 minutes
        poll_interval=5,   # optional: how often to poll
    )

    # child_run.state is the final state; get the returned Python value
    result = get_state_result(child_run.state)  # raises if child failed
    # use result later in this flow run
    return result
Notes and caveats: - Results persistence: When you call a deployed flow (i.e., it runs in a separate process/machine), Prefect needs to read the child’s result from result storage. If the child runs on different infrastructure, configure result storage accessible from the parent (e.g., S3/GCS/Azure) to avoid “MissingResult” errors. Subflows (option 1) don’t require remote result storage. - Error handling:
get_state_result(...)
will raise if the child run’s final state is failed. You can use
get_state_result(child_run.state, raise_on_failure=False)
if you want to handle it yourself. - Linking in the UI:
as_subflow=True
links the run under the parent; set
as_subflow=False
if you don’t want a subflow relationship. Docs and references: - Run a deployment from code: How to: Run deployments - API (create flow run from a deployment): API reference - States and results:
prefect.states.get_state_result
(available via
from prefect.states import get_state_result
) If you can share how your two flows are deployed (same vs different infra) I can suggest the best result storage configuration to make this seamless.
Yes — in Prefect 3.x you can trigger another deployed flow from within a running flow, wait for it to finish, and retrieve its return value for use later in the current run. There are two common patterns: 1) Call the other flow function directly (subflow, same codebase) - Best when both flows live in the same code environment/repo and you don’t need a separate deployment/worker. - Runs inline as a subflow; you get the return value immediately.
Copy code
from prefect import flow, get_run_logger

@flow
def child_flow(x: int) -> int:
    return x * 2

@flow
def parent_flow():
    logger = get_run_logger()
    result = child_flow(21)  # subflow call, returns 42
    <http://logger.info|logger.info>(f"Child result: {result}")
    return result
2) Trigger a deployment (remote execution) and fetch the result later - Best when the child flow must run as its own deployment/worker (e.g., different infra, image, resources). - Use
prefect.deployments.run_deployment
to start it and
prefect.flow_runs.wait_for_flow_run
+
prefect.states.get_state_result
to fetch the returned value.
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
def parent_flow():
    logger = get_run_logger()

    # Kick off the child deployment; returns a FlowRun object
    child_run = run_deployment(
        name="repo-name/child-flow-name/prod",     # "<workspace or repo>/<flow>/<deployment>"
        parameters={"x": 21},
        # as_subflow=True (default) ensures the UI shows a parent-child link
        # work_queue_name="...", tags=["..."], idempotency_key="..." are also available if needed
    )

    # Do other work here if you like, then wait for completion when you need the result:
    child_run = wait_for_flow_run(child_run.id, timeout=3600, log_states=True)

    # Get the child flow’s return value from its terminal state
    child_result = get_state_result(child_run.state)  # -> 42
    <http://logger.info|logger.info>(f"Child result: {child_result}")
    return child_result
Notes and pitfalls - Blocking behavior:
wait_for_flow_run
will block your parent flow until the child reaches a terminal state or the timeout is hit (default 3h; override with
timeout=
). You can delay the call to
wait_for_flow_run
so your parent can do other work first. - Passing parameters: Use the
parameters
dict shown above; keys must match the child deployment’s flow parameters. - Getting the result: Use
get_state_result
on the terminal
state
of the child flow run. Make sure the child flow’s return value is serializable; Prefect will persist and retrieve it via result storage/serializer. - Permissions: The worker running the parent must be able to create/observe the child run in your Prefect API and read results from your configured result storage. - Subflow vs deployment: Calling
child_flow(...)
runs inline in the same process (fast, simple, no separate infra). Using
run_deployment(...)
runs the child on its own infrastructure via a worker (isolated, scalable, can use different images/resources). If you share your deployment name and whether you want the parent to wait immediately or only later (after doing other work), I can tailor the snippet exactly to your setup.
1