<@ULVA73B9P> how do I run another flow from curren...
# ask-marvin
a
@Marvin how do I run another flow from current flow
m
thought for 467.3 seconds
Great question! In Prefect 3.x you have two common options: 1) Call another flow as a subflow (same codebase/process) - Just call the other flow like a normal function. It will run synchronously and its run will be visible under the parent in the UI.
Copy code
from prefect import flow

@flow
def child_flow(a: int) -> int:
    return a + 1

@flow
def parent_flow():
    result = child_flow(41)  # subflow call; blocks until done
    print(f"Child result: {result}")

if __name__ == "__main__":
    parent_flow()
2) Trigger a deployment programmatically (separate run via a worker) - Use
prefect.deployments.run_deployment
to start another deployment from your flow. This is useful when the child flow is deployed and may need different infra or environment. - By default
as_subflow=True
so the triggered run is shown as a subflow under the parent in the UI. You can pass parameters, a custom flow run name, etc.
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def parent_flow():
    flow_run = run_deployment(
        name="child-flow/prod",          # "<flow-name>/<deployment-name>"
        parameters={"a": 41},
        flow_run_name="child-from-parent",
        as_subflow=True,                 # default; shows as subflow in UI
        timeout=600,                     # optional: how long to wait
        poll_interval=5,                 # optional: status check interval
    )
    # flow_run contains the triggered run info (including state)
    return flow_run
Advanced: fire-and-forget via the client (non-blocking) - If you don’t want to wait for completion, you can create the flow run directly via the client. This triggers the run and returns immediately with the ID.
Copy code
from prefect import flow
from prefect.client.orchestration import get_client

@flow
async def parent_flow():
    async with get_client() as client:
        dep = await client.read_deployment_by_name("child-flow/prod")
        fr = await client.create_flow_run_from_deployment(
            deployment_id=dep.id,
            parameters={"a": 41},
            flow_run_name="child-from-parent",
        )
        # Return the new flow run ID and let it proceed independently
        return fr.id
Notes and caveats - For option 2, make sure a worker is running for the deployment’s work pool so the triggered run can start. - Parameters are passed as a plain dict in both approaches. - Visibility in the UI: subflows (and
run_deployment(..., as_subflow=True)
) appear nested under the parent. With
as_subflow=False
, the run won’t be nested under the parent. Docs and references - Triggering deployments from a flow: Run deployments (how-to) - Background on patterns: Workflow design patterns Would you like to run the other flow inline (subflow) or trigger a deployment (possibly on different infra)?
a
@Marvin if I use run_deployment, how do I wait for it to finish before starting another task
m
thought for 163.4 seconds
Use run_deployment’s built-in waiting. Pass a timeout to block until the triggered run reaches a terminal state. Example: wait until completion, then continue
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def parent_flow():
    # Wait indefinitely for the child deployment to finish
    child = run_deployment(
        name="child-flow/prod",
        parameters={"a": 41},
        timeout=None,        # wait until the run finishes (Completed/Failed/Cancelled)
        poll_interval=5,     # how often to check status
        as_subflow=True      # keep it nested under this flow in the UI
    )

    # Optional: enforce success before continuing
    if child.state and child.state.name != "Completed":
        raise RuntimeError(f"Child ended in state: {child.state.name}")

    # Now start the next task/step
    do_next_thing()

def do_next_thing():
    ...
Alternative: wait up to N seconds
Copy code
child = run_deployment("child-flow/prod", timeout=600)  # wait up to 10 minutes
Notes - timeout=None waits indefinitely; timeout=0 returns immediately (fire-and-forget). - After it returns, you can inspect child.state.name (e.g., "Completed", "Failed"). - Make sure a worker is running for the child’s work pool, or you’ll wait until it can be picked up. Docs - How-to: Run deployments - API: prefect.deployments.run_deployment