Kayvon Avishan
06/05/2024, 7:45 PMMarvin
06/05/2024, 7:46 PMAnyIO
task groups or asyncio.gather
. Here's a relevant example:
### Concurrently Running Subflows
python
import asyncio
from prefect import flow, task
@task
async def my_task(x):
await asyncio.sleep(1)
return x
@flow
async def my_subflow(x):
result = await my_task(x)
print(f"Subflow result: {result}")
return result
@flow
async def my_parent_flow():
tasks = [my_subflow(x) for x in range(5)]
# Run subflows concurrently using asyncio.gather
results = await asyncio.gather(*tasks)
print(f"Parent flow results: {results}")
if __name__ == "__main__":
my_parent_flow()
### Documentation
You can read more about composing and running subflows here.
### Example
Here is an additional example where tasks are submitted and executed concurrently:
python
import time
from prefect import flow, task
@task
def sleep(t: int):
print(f"Going to sleep for {t} seconds...")
time.sleep(t)
print("...and I'm awake!")
@flow(log_prints=True)
def my_flow():
"""flows run submitted tasks concurrently by default"""
sleep(3) # block the flow for 3 seconds (not concurrently)
future = sleep.submit(2) # submit a task run to the task runner, doesn't block flow
future.wait() # block the flow until the submitted task run is complete
sleep.map([5] * 10) # submit 10 at once, each sleeps for 5 seconds, don't block
# even though the flow is done, we won't exit until all the task runs are complete
if __name__ == "__main__":
my_flow()
This example demonstrates concurrent execution using task submission and mapping.
### Reference
For more details and additional examples, you can visit the following resources:
- Concurrent Subflows Documentation
- Code Example for Concurrent Tasks
Hopefully, this helps you run your subflows concurrently with less pain and suffering. Or perhaps more. Who knows these days.KG
07/08/2024, 5:58 PMAnyIO
task groups or asyncio.gather
, do all of the tasks of the subflow need to be asyncMarvin
07/08/2024, 5:58 PMasyncio.gather
or AnyIO
task groups to run subflows concurrently, it is not strictly necessary for all tasks within the subflows to be asynchronous; however, asynchronous tasks can optimize concurrency and prevent blocking. You can mix synchronous and asynchronous tasks within your subflows, but here's how you should manage them:
### Asynchronous Subflow Example
Here's how you can run subflows concurrently using `asyncio.gather`:
python
import asyncio
from prefect import flow, task
@task
async def async_task(x):
await asyncio.sleep(1)
return x
@task
def sync_task(x):
import time
time.sleep(1)
return x
@flow
async def my_subflow(x):
result_async = await async_task(x)
result_sync = sync_task(x) # This will still block, consider refactoring if performance is critical
print(f"Subflow async result: {result_async}, sync result: {result_sync}")
return result_async, result_sync
@flow
async def my_parent_flow():
tasks = [my_subflow(x) for x in range(5)]
# Run subflows concurrently using asyncio.gather
results = await asyncio.gather(*tasks)
print(f"Parent flow results: {results}")
if __name__ == "__main__":
asyncio.run(my_parent_flow())
### Handling Mixed Tasks
While the example above showcases running both async and sync tasks, keep in mind:
1. Asynchronous Subflows: Mark subflows async
to use await
.
2. Synchronous Tasks: If sync tasks are part of async subflows, they will block.
### Additional Resources
Here is a simple async flow example:
Async Basic Flow
Hopefully, this makes your concurrent subflow execution slightly less dreary. For more specifics on async usage in Prefect, you might want to peruse the official documentation.