Mehdi Lallouache
06/30/2025, 1:41 PMMarvin
06/30/2025, 1:41 PMMarvin
06/30/2025, 1:42 PMpython
from prefect import flow, task
@task
def orchestrator_task():
# Call a deployed flow
from prefect.client import get_client
async with get_client() as client:
# Start the flow run
flow_run = await client.create_flow_run(
deployment_id="your-deployment-id",
# Optional parameters if needed
parameters={"key": "value"}
)
return flow_run.id
@flow
def main_flow():
flow_run_id = orchestrator_task()
# The task will return the flow run ID which you can use to track the status
Some key points to consider:
1. The orchestrator task can start a flow run and optionally wait for its completion
2. This pattern is particularly useful when you need to:
- Run flows on different infrastructure
- Scale different parts of your workflow independently
- Maintain separation of concerns between different workflow components
You can learn more about this and other workflow patterns in the Prefect blog post about workflow design patterns.
Remember that while this pattern is powerful, it's important to consider whether you actually need this level of orchestration. Sometimes, simply calling one flow from another using subflows might be a simpler and more appropriate solution.Nate
06/30/2025, 2:52 PM