Pradnyesh Choudhari
09/10/2025, 2:02 PMMarvin
09/10/2025, 2:02 PMMarvin
09/10/2025, 2:16 PMrun_deployment
. This creates/schedules the downstream flow runs and returns immediately, so your orchestrator can keep going (or even finish) without waiting.
Here are practical patterns you can copy/paste.
Fire-and-forget a single flow run (now)
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
async def orchestrator():
# Triggers immediately and returns a FlowRun object; does NOT wait for completion
child_run = await run_deployment(
name="my-child-flow/production", # "flow_name/deployment_name" or a deployment UUID
parameters={"x": 1},
flow_run_name="child-1",
as_subflow=False # optional: decouple from parent for full independence
)
return {"child_run_id": child_run.id}
Schedule a run for the future
from datetime import datetime, timedelta, timezone
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
async def orchestrator():
scheduled_at = datetime.now(timezone.utc) + timedelta(minutes=10)
child_run = await run_deployment(
name="my-child-flow/backfill",
parameters={"date": "2025-09-10"},
scheduled_time=scheduled_at,
as_subflow=False
)
return {"scheduled_child_run_id": child_run.id}
Trigger many runs concurrently (still non-blocking for run completion)
import asyncio
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
async def orchestrator(partitions: list[int]):
# Schedule many at once; this only waits for creation, not completion
tasks = [
run_deployment(
name="my-child-flow/etl",
parameters={"partition": p},
as_subflow=False
)
for p in partitions
]
child_runs = await asyncio.gather(*tasks)
return [fr.id for fr in child_runs]
Notes and tips
- as_subflow=False
fully decouples the child runs from the parent; they will not be cancelled if the parent is cancelled and will not be displayed as subflows in the UI. If you want them shown as subflows in the UI but still not wait for completion, you can leave the default (True
); run_deployment
does not wait by default.
- You can also pass tags
, work_queue_name
, and job_variables
to influence routing and infrastructure:
await run_deployment(
name="flow/deployment",
parameters={"foo": "bar"},
tags=["priority:high"],
work_queue_name="my-queue",
job_variables={"env": {"MY_VAR": "123"}},
as_subflow=False
)
Related docs
- Creating and using deployments: How to create deployments
- Event-driven alternatives with Automations (if you want events to kick off flows instead of Python):
- Event triggers concept
- Creating deployment triggers
If you share how you’re identifying the target deployments and whether you want them linked as subflows in the UI, I can tailor the example further.