Kayvon Avishan
06/05/2024, 7:45 PMMarvin
06/05/2024, 7:46 PMAnyIO
task groups or asyncio.gather
. Here's a relevant example:
### Concurrently Running Subflows
python
import asyncio
from prefect import flow, task
@task
async def my_task(x):
await asyncio.sleep(1)
return x
@flow
async def my_subflow(x):
result = await my_task(x)
print(f"Subflow result: {result}")
return result
@flow
async def my_parent_flow():
tasks = [my_subflow(x) for x in range(5)]
# Run subflows concurrently using asyncio.gather
results = await asyncio.gather(*tasks)
print(f"Parent flow results: {results}")
if __name__ == "__main__":
my_parent_flow()
### Documentation
You can read more about composing and running subflows here.
### Example
Here is an additional example where tasks are submitted and executed concurrently:
python
import time
from prefect import flow, task
@task
def sleep(t: int):
print(f"Going to sleep for {t} seconds...")
time.sleep(t)
print("...and I'm awake!")
@flow(log_prints=True)
def my_flow():
"""flows run submitted tasks concurrently by default"""
sleep(3) # block the flow for 3 seconds (not concurrently)
future = sleep.submit(2) # submit a task run to the task runner, doesn't block flow
future.wait() # block the flow until the submitted task run is complete
sleep.map([5] * 10) # submit 10 at once, each sleeps for 5 seconds, don't block
# even though the flow is done, we won't exit until all the task runs are complete
if __name__ == "__main__":
my_flow()
This example demonstrates concurrent execution using task submission and mapping.
### Reference
For more details and additional examples, you can visit the following resources:
- Concurrent Subflows Documentation
- Code Example for Concurrent Tasks
Hopefully, this helps you run your subflows concurrently with less pain and suffering. Or perhaps more. Who knows these days.KG
07/08/2024, 5:58 PMAnyIO
task groups or asyncio.gather
, do all of the tasks of the subflow need to be asyncMarvin
07/08/2024, 5:58 PMasyncio.gather
or AnyIO
task groups to run subflows concurrently, it is not strictly necessary for all tasks within the subflows to be asynchronous; however, asynchronous tasks can optimize concurrency and prevent blocking. You can mix synchronous and asynchronous tasks within your subflows, but here's how you should manage them:
### Asynchronous Subflow Example
Here's how you can run subflows concurrently using `asyncio.gather`:
python
import asyncio
from prefect import flow, task
@task
async def async_task(x):
await asyncio.sleep(1)
return x
@task
def sync_task(x):
import time
time.sleep(1)
return x
@flow
async def my_subflow(x):
result_async = await async_task(x)
result_sync = sync_task(x) # This will still block, consider refactoring if performance is critical
print(f"Subflow async result: {result_async}, sync result: {result_sync}")
return result_async, result_sync
@flow
async def my_parent_flow():
tasks = [my_subflow(x) for x in range(5)]
# Run subflows concurrently using asyncio.gather
results = await asyncio.gather(*tasks)
print(f"Parent flow results: {results}")
if __name__ == "__main__":
asyncio.run(my_parent_flow())
### Handling Mixed Tasks
While the example above showcases running both async and sync tasks, keep in mind:
1. Asynchronous Subflows: Mark subflows async
to use await
.
2. Synchronous Tasks: If sync tasks are part of async subflows, they will block.
### Additional Resources
Here is a simple async flow example:
Async Basic Flow
Hopefully, this makes your concurrent subflow execution slightly less dreary. For more specifics on async usage in Prefect, you might want to peruse the official documentation.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by