Gabriel Lespérance
07/29/2024, 5:48 PMMarvin
07/29/2024, 5:48 PMimport asyncio
from prefect import flow
async def some_async_function():
await asyncio.sleep(1)
return "I'm an async function"
@flow(log_prints=True) # send `print` statements as INFO logs
async def my_async_subflow(name):
print(f"Subflow {name} started")
await some_async_function()
print(f"Subflow {name} completed")
@flow(log_prints=True)
async def parent_flow():
subflows = [my_async_subflow(name) for name in ["A", "B", "C"]]
await asyncio.gather(*subflows)
if __name__ == "__main__":
asyncio.run(parent_flow()) # run it like a normal async python function
In this example, parent_flow
runs subflows my_async_subflow
in parallel using asyncio.gather
.Gabriel Lespérance
07/29/2024, 5:54 PMimport asyncio
import time
from prefect import flow, task
@task
async def shout(number):
time.sleep(5)
print(f"#{number}")
@flow()
async def count_to(highest_number):
results = [shout.submit(number) for number in range(highest_number)]
return results
@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow()
async def many_flows(highest_number: int, times: int):
results = [count_to(highest_number) for _ in range(times)]
return await asyncio.gather(*results)
# return [count_to_proxy.submit(highest_number) for _ in range(times)]
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
@MarvinMarvin
07/29/2024, 5:54 PMtime.sleep(5)
call in your shout
task is blocking. Even if the task function is async, the blocking call within it will prevent parallel execution.
2. Prefect's Task Management: When tasks are submitted to Prefect's task runner, if they encounter a synchronous block, it can disrupt concurrency.
Here’s the corrected version of your code:
python
import asyncio
import time
from prefect import flow, task
# Correct the blocking call
@task
async def shout(number):
await asyncio.sleep(5) # Change from time.sleep to asyncio.sleep
print(f"#{number}")
@flow()
async def count_to(highest_number):
results = [shout.submit(number) for number in range(highest_number)]
return results
@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow()
async def many_flows(highest_number: int, times: int):
results = [count_to(highest_number) for _ in range(times)]
return await asyncio.gather(*results) # This should work fine with corrected shout task
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
You can find more relevant details in the documentation:
- Documentation on async subflows
- Documentation on using asyncio
By using asyncio.sleep(5)
instead of time.sleep(5)
, you ensure that the coroutine yields control back to the event loop, allowing other tasks to proceed concurrently.Gabriel Lespérance
07/29/2024, 5:58 PMMarvin
07/29/2024, 5:59 PMpython
import asyncio
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
# Correct the blocking call
@task
async def shout(number):
await asyncio.sleep(5) # Use asyncio.sleep instead of time.sleep
print(f"#{number}")
@flow()
async def count_to(highest_number):
# Ensure the use of the relevant task runner type
task_runner_type = type(FlowRunContext.get().task_runner)
results = [shout.with_options(task_runner=task_runner_type()).submit(number) for number in range(highest_number)]
return results
@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow()
async def many_flows(highest_number: int, times: int):
task_runner_type = type(FlowRunContext.get().task_runner)
results = [count_to.with_options(task_runner=task_runner_type())(highest_number) for _ in range(times)]
return await asyncio.gather(*results) # This should work fine with corrected shout task
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
### Key Changes:
1. Using asyncio.sleep
: Ensures non-blocking sleep within tasks.
2. Separate Task Runners: Each subflow is configured to use a separate task runner to enable true parallel execution.
References:
- Issue on running subflows concurrently
- Issue on parent flow status when running subflows
This adaptation should help in achieving parallel execution of subflows. If issues persist, consider tracking the ongoing discussion and resolution in the linked issues.