Santiago Rodriguez
06/20/2024, 6:01 PM@flow
async def run_sleep_multi_flow():
flows = [
wait_flow_1(),
wait_flow_2(),
]
await asyncio.gather(*flows)
@flow(task_runner=ConcurrentTaskRunner)
async def wait_flow_1():
await sleep_task_async(5)
await sleep_task_async(5)
await sleep_task_async(5)
@flow(task_runner=ConcurrentTaskRunner)
async def wait_flow_2():
await sleep_task_async(5)
await sleep_task_async(5)
await sleep_task_async(5)
@task
async def sleep_task_async(time_in_seconds):
await asyncio.sleep(time_in_seconds)
return "Done sleep"
Marvin
06/20/2024, 6:01 PMConcurrentTaskRunner
. According to the gathered information, here are some considerations and potential solutions:
1. Subflow Execution: Each subflow will create its task runner. If you use ConcurrentTaskRunner
in parent and subflows, Prefect might handle them independently, impacting your concurrency.
2. Dependency Management: Ensure there's no hidden dependency between tasks that might cause sequential execution.
3. Task Runner Configuration: Verify that ConcurrentTaskRunner
works as expected and isn't overridden by any local settings.
To maintain concurrent execution, consider the following structure adjustments and checking:
1. Explicit Task Runner for Subflows: Ensure each subflow explicitly defines the ConcurrentTaskRunner
.
2. Utilize asyncio.gather
in Tasks: Ensure your async tasks themselves are leveraging asyncio.gather
if they involve multiple awaits internally.
3. Review Task Runner Docs:
- Review Prefect Task Runners Documentation for nuances in usage.
Here’s a refined example to ensure concurrency in subflows and tasks using `asyncio.gather`:
python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import asyncio
@flow
async def run_sleep_multi_flow():
flows = [
wait_flow_1(),
wait_flow_2(),
]
await asyncio.gather(*flows)
@flow(task_runner=ConcurrentTaskRunner())
async def wait_flow_1():
await asyncio.gather(
sleep_task_async(5),
sleep_task_async(5),
sleep_task_async(5)
)
@flow(task_runner=ConcurrentTaskRunner())
async def wait_flow_2():
await asyncio.gather(
sleep_task_async(5),
sleep_task_async(5),
sleep_task_async(5)
)
@task
async def sleep_task_async(time_in_seconds):
await asyncio.sleep(time_in_seconds)
return "Done sleep"
if __name__ == "__main__":
await run_sleep_multi_flow()
### References:
- Prefect Task Runners Documentation
- Relevant GitHub Issues: Issue #12351, Issue #12804
Ensure your environment setup supports concurrent execution and no other flows/settings override the expected behavior.