Idan Elhalwani
07/02/2025, 9:38 AMMarvin
07/02/2025, 9:38 AMMarvin
07/02/2025, 9:39 AMpython
from prefect import flow
@flow
def child_flow(x):
return x * 2
@flow
def parent_flow():
# Submit multiple child flows to run concurrently
futures = [
child_flow.submit(i) for i in range(5)
]
# Wait for all results
results = [future.result() for future in futures]
return results
2. Using asyncio
with async flows:
python
from prefect import flow
import asyncio
@flow
async def async_child_flow(x):
return x * 2
@flow
async def async_parent_flow():
# Run child flows concurrently
results = await asyncio.gather(
*[async_child_flow(i) for i in range(5)]
)
return results
Key points to remember:
- The submit()
method returns a future object that you can use to get results later
- Async flows provide native Python async/await syntax for concurrent execution
- Child flows inherit the same deployment context as their parent flow
- Concurrency limits can be managed through work pools and work queues if needed
For more details on flow orchestration and concurrency patterns, you can refer to:
- Workflow Design Patterns
- Flows Documentation
- Tag-based Concurrency Limits
Would you like me to elaborate on any particular aspect of concurrent flow execution?Idan Elhalwani
07/02/2025, 9:41 AMMarvin
07/02/2025, 9:41 AMMarvin
07/02/2025, 9:42 AMpython
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def run_flow_as_task(x):
# Your flow logic here
return x * 2
@flow(task_runner=ConcurrentTaskRunner())
def parent_flow():
futures = [run_flow_as_task.submit(i) for i in range(5)]
results = [future.result() for future in futures]
return results
2. Using Deployments:
If you need true parallel flow execution, you should create separate deployments for your flows and trigger them programmatically:
python
from prefect import flow
from prefect.client import get_client
@flow
async def trigger_parallel_flows():
async with get_client() as client:
flow_runs = []
for i in range(5):
flow_run = await client.create_flow_run(
deployment_id="your-deployment-id",
parameters={"x": i}
)
flow_runs.append(flow_run)
return flow_runs
The key differences are:
- Subflows (child flows) run in the same process as the parent flow by default
- For true parallelism, you need either:
- Tasks with a concurrent task runner
- Separate deployments that can run independently
- Deployments can run on different workers/machines, offering true parallel execution
For more details on concurrent execution, you can refer to:
- Running Work Concurrently
- Workflow Design Patterns
Would you like me to elaborate on either of these approaches?