Theom
08/22/2025, 12:23 PMMarvin
08/22/2025, 12:23 PMMarvin
08/22/2025, 12:24 PMMarvin
08/22/2025, 12:29 PMfrom prefect import flow
@flow
def child_flow(x: int) -> int:
return x + 1
@flow
def parent_flow():
y = child_flow(41) # runs as a subflow, returns 42
# use y later in this flow run
return y
2) Trigger another deployment from your flow and fetch its result
- Use prefect.deployments.flow_runs.run_deployment(...)
to start a run of a deployed flow. By default, it links as a subflow in the UI (as_subflow=True
).
- Wait for it to complete (it returns a FlowRun with its final state), then extract the child flow’s return value from the state using prefect.states.get_state_result
.
Example:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
from prefect.states import get_state_result
@flow
async def parent_flow():
# "<flow-name>/<deployment-name>"
child_run = await run_deployment(
name="child-flow/production",
parameters={"x": 41},
as_subflow=True, # default; shows up as a subflow in the UI
timeout=60 * 30, # optional: wait up to 30 minutes
poll_interval=5, # optional: how often to poll
)
# child_run.state is the final state; get the returned Python value
result = get_state_result(child_run.state) # raises if child failed
# use result later in this flow run
return result
Notes and caveats:
- Results persistence: When you call a deployed flow (i.e., it runs in a separate process/machine), Prefect needs to read the child’s result from result storage. If the child runs on different infrastructure, configure result storage accessible from the parent (e.g., S3/GCS/Azure) to avoid “MissingResult” errors. Subflows (option 1) don’t require remote result storage.
- Error handling: get_state_result(...)
will raise if the child run’s final state is failed. You can use get_state_result(child_run.state, raise_on_failure=False)
if you want to handle it yourself.
- Linking in the UI: as_subflow=True
links the run under the parent; set as_subflow=False
if you don’t want a subflow relationship.
Docs and references:
- Run a deployment from code: How to: Run deployments
- API (create flow run from a deployment): API reference
- States and results: prefect.states.get_state_result
(available via from prefect.states import get_state_result
)
If you can share how your two flows are deployed (same vs different infra) I can suggest the best result storage configuration to make this seamless.Marvin
08/22/2025, 12:40 PMfrom prefect import flow, get_run_logger
@flow
def child_flow(x: int) -> int:
return x * 2
@flow
def parent_flow():
logger = get_run_logger()
result = child_flow(21) # subflow call, returns 42
<http://logger.info|logger.info>(f"Child result: {result}")
return result
2) Trigger a deployment (remote execution) and fetch the result later
- Best when the child flow must run as its own deployment/worker (e.g., different infra, image, resources).
- Use prefect.deployments.run_deployment
to start it and prefect.flow_runs.wait_for_flow_run
+ prefect.states.get_state_result
to fetch the returned value.
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
def parent_flow():
logger = get_run_logger()
# Kick off the child deployment; returns a FlowRun object
child_run = run_deployment(
name="repo-name/child-flow-name/prod", # "<workspace or repo>/<flow>/<deployment>"
parameters={"x": 21},
# as_subflow=True (default) ensures the UI shows a parent-child link
# work_queue_name="...", tags=["..."], idempotency_key="..." are also available if needed
)
# Do other work here if you like, then wait for completion when you need the result:
child_run = wait_for_flow_run(child_run.id, timeout=3600, log_states=True)
# Get the child flow’s return value from its terminal state
child_result = get_state_result(child_run.state) # -> 42
<http://logger.info|logger.info>(f"Child result: {child_result}")
return child_result
Notes and pitfalls
- Blocking behavior: wait_for_flow_run
will block your parent flow until the child reaches a terminal state or the timeout is hit (default 3h; override with timeout=
). You can delay the call to wait_for_flow_run
so your parent can do other work first.
- Passing parameters: Use the parameters
dict shown above; keys must match the child deployment’s flow parameters.
- Getting the result: Use get_state_result
on the terminal state
of the child flow run. Make sure the child flow’s return value is serializable; Prefect will persist and retrieve it via result storage/serializer.
- Permissions: The worker running the parent must be able to create/observe the child run in your Prefect API and read results from your configured result storage.
- Subflow vs deployment: Calling child_flow(...)
runs inline in the same process (fast, simple, no separate infra). Using run_deployment(...)
runs the child on its own infrastructure via a worker (isolated, scalable, can use different images/resources).
If you share your deployment name and whether you want the parent to wait immediately or only later (after doing other work), I can tailor the snippet exactly to your setup.