Ashley Kent
09/16/2025, 12:45 PMMarvin
09/16/2025, 12:45 PMMarvin
09/16/2025, 12:53 PMfrom prefect import flow
@flow
def child_flow(a: int) -> int:
return a + 1
@flow
def parent_flow():
result = child_flow(41) # subflow call; blocks until done
print(f"Child result: {result}")
if __name__ == "__main__":
parent_flow()
2) Trigger a deployment programmatically (separate run via a worker)
- Use prefect.deployments.run_deployment
to start another deployment from your flow. This is useful when the child flow is deployed and may need different infra or environment.
- By default as_subflow=True
so the triggered run is shown as a subflow under the parent in the UI. You can pass parameters, a custom flow run name, etc.
from prefect import flow
from prefect.deployments import run_deployment
@flow
def parent_flow():
flow_run = run_deployment(
name="child-flow/prod", # "<flow-name>/<deployment-name>"
parameters={"a": 41},
flow_run_name="child-from-parent",
as_subflow=True, # default; shows as subflow in UI
timeout=600, # optional: how long to wait
poll_interval=5, # optional: status check interval
)
# flow_run contains the triggered run info (including state)
return flow_run
Advanced: fire-and-forget via the client (non-blocking)
- If you don’t want to wait for completion, you can create the flow run directly via the client. This triggers the run and returns immediately with the ID.
from prefect import flow
from prefect.client.orchestration import get_client
@flow
async def parent_flow():
async with get_client() as client:
dep = await client.read_deployment_by_name("child-flow/prod")
fr = await client.create_flow_run_from_deployment(
deployment_id=dep.id,
parameters={"a": 41},
flow_run_name="child-from-parent",
)
# Return the new flow run ID and let it proceed independently
return fr.id
Notes and caveats
- For option 2, make sure a worker is running for the deployment’s work pool so the triggered run can start.
- Parameters are passed as a plain dict in both approaches.
- Visibility in the UI: subflows (and run_deployment(..., as_subflow=True)
) appear nested under the parent. With as_subflow=False
, the run won’t be nested under the parent.
Docs and references
- Triggering deployments from a flow: Run deployments (how-to)
- Background on patterns: Workflow design patterns
Would you like to run the other flow inline (subflow) or trigger a deployment (possibly on different infra)?Ashley Kent
09/16/2025, 12:54 PMMarvin
09/16/2025, 12:54 PMMarvin
09/16/2025, 12:57 PMfrom prefect import flow
from prefect.deployments import run_deployment
@flow
def parent_flow():
# Wait indefinitely for the child deployment to finish
child = run_deployment(
name="child-flow/prod",
parameters={"a": 41},
timeout=None, # wait until the run finishes (Completed/Failed/Cancelled)
poll_interval=5, # how often to check status
as_subflow=True # keep it nested under this flow in the UI
)
# Optional: enforce success before continuing
if child.state and child.state.name != "Completed":
raise RuntimeError(f"Child ended in state: {child.state.name}")
# Now start the next task/step
do_next_thing()
def do_next_thing():
...
Alternative: wait up to N seconds
child = run_deployment("child-flow/prod", timeout=600) # wait up to 10 minutes
Notes
- timeout=None waits indefinitely; timeout=0 returns immediately (fire-and-forget).
- After it returns, you can inspect child.state.name (e.g., "Completed", "Failed").
- Make sure a worker is running for the child’s work pool, or you’ll wait until it can be picked up.
Docs
- How-to: Run deployments
- API: prefect.deployments.run_deployment