Theom
08/22/2025, 12:23 PMMarvin
08/22/2025, 12:23 PMMarvin
08/22/2025, 12:24 PMMarvin
08/22/2025, 12:29 PMfrom prefect import flow
@flow
def child_flow(x: int) -> int:
    return x + 1
@flow
def parent_flow():
    y = child_flow(41)  # runs as a subflow, returns 42
    # use y later in this flow run
    return y
2) Trigger another deployment from your flow and fetch its result
- Use prefect.deployments.flow_runs.run_deployment(...) to start a run of a deployed flow. By default, it links as a subflow in the UI (as_subflow=True).
- Wait for it to complete (it returns a FlowRun with its final state), then extract the child flow’s return value from the state using prefect.states.get_state_result.
Example:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
from prefect.states import get_state_result
@flow
async def parent_flow():
    # "<flow-name>/<deployment-name>"
    child_run = await run_deployment(
        name="child-flow/production",
        parameters={"x": 41},
        as_subflow=True,   # default; shows up as a subflow in the UI
        timeout=60 * 30,   # optional: wait up to 30 minutes
        poll_interval=5,   # optional: how often to poll
    )
    # child_run.state is the final state; get the returned Python value
    result = get_state_result(child_run.state)  # raises if child failed
    # use result later in this flow run
    return result
Notes and caveats:
- Results persistence: When you call a deployed flow (i.e., it runs in a separate process/machine), Prefect needs to read the child’s result from result storage. If the child runs on different infrastructure, configure result storage accessible from the parent (e.g., S3/GCS/Azure) to avoid “MissingResult” errors. Subflows (option 1) don’t require remote result storage.
- Error handling: get_state_result(...) will raise if the child run’s final state is failed. You can use get_state_result(child_run.state, raise_on_failure=False) if you want to handle it yourself.
- Linking in the UI: as_subflow=True links the run under the parent; set as_subflow=False if you don’t want a subflow relationship.
Docs and references:
- Run a deployment from code: How to: Run deployments
- API (create flow run from a deployment): API reference
- States and results: prefect.states.get_state_result (available via from prefect.states import get_state_result)
If you can share how your two flows are deployed (same vs different infra) I can suggest the best result storage configuration to make this seamless.Marvin
08/22/2025, 12:40 PMfrom prefect import flow, get_run_logger
@flow
def child_flow(x: int) -> int:
    return x * 2
@flow
def parent_flow():
    logger = get_run_logger()
    result = child_flow(21)  # subflow call, returns 42
    <http://logger.info|logger.info>(f"Child result: {result}")
    return result
2) Trigger a deployment (remote execution) and fetch the result later
- Best when the child flow must run as its own deployment/worker (e.g., different infra, image, resources).
- Use prefect.deployments.run_deployment to start it and prefect.flow_runs.wait_for_flow_run + prefect.states.get_state_result to fetch the returned value.
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
def parent_flow():
    logger = get_run_logger()
    # Kick off the child deployment; returns a FlowRun object
    child_run = run_deployment(
        name="repo-name/child-flow-name/prod",     # "<workspace or repo>/<flow>/<deployment>"
        parameters={"x": 21},
        # as_subflow=True (default) ensures the UI shows a parent-child link
        # work_queue_name="...", tags=["..."], idempotency_key="..." are also available if needed
    )
    # Do other work here if you like, then wait for completion when you need the result:
    child_run = wait_for_flow_run(child_run.id, timeout=3600, log_states=True)
    # Get the child flow’s return value from its terminal state
    child_result = get_state_result(child_run.state)  # -> 42
    <http://logger.info|logger.info>(f"Child result: {child_result}")
    return child_result
Notes and pitfalls
- Blocking behavior: wait_for_flow_run will block your parent flow until the child reaches a terminal state or the timeout is hit (default 3h; override with timeout=). You can delay the call to wait_for_flow_run so your parent can do other work first.
- Passing parameters: Use the parameters dict shown above; keys must match the child deployment’s flow parameters.
- Getting the result: Use get_state_result on the terminal state of the child flow run. Make sure the child flow’s return value is serializable; Prefect will persist and retrieve it via result storage/serializer.
- Permissions: The worker running the parent must be able to create/observe the child run in your Prefect API and read results from your configured result storage.
- Subflow vs deployment: Calling child_flow(...) runs inline in the same process (fast, simple, no separate infra). Using run_deployment(...) runs the child on its own infrastructure via a worker (isolated, scalable, can use different images/resources).
If you share your deployment name and whether you want the parent to wait immediately or only later (after doing other work), I can tailor the snippet exactly to your setup.