<@ULVA73B9P> How can I run multiple subflows concu...
# ask-marvin
k
@Marvin How can I run multiple subflows concurrently
m
To run multiple subflows concurrently in Prefect 2.x, you can leverage asynchronous subflows with
AnyIO
task groups or
asyncio.gather
. Here's a relevant example: ### Concurrently Running Subflows
Copy code
python
import asyncio
from prefect import flow, task

@task
async def my_task(x):
    await asyncio.sleep(1)
    return x

@flow
async def my_subflow(x):
    result = await my_task(x)
    print(f"Subflow result: {result}")
    return result

@flow
async def my_parent_flow():
    tasks = [my_subflow(x) for x in range(5)]
    # Run subflows concurrently using asyncio.gather
    results = await asyncio.gather(*tasks)
    print(f"Parent flow results: {results}")

if __name__ == "__main__":
    my_parent_flow()
### Documentation You can read more about composing and running subflows here. ### Example Here is an additional example where tasks are submitted and executed concurrently:
Copy code
python
import time
from prefect import flow, task

@task
def sleep(t: int):
    print(f"Going to sleep for {t} seconds...")
    time.sleep(t)
    print("...and I'm awake!")

@flow(log_prints=True)
def my_flow():
    """flows run submitted tasks concurrently by default"""
    sleep(3)  # block the flow for 3 seconds (not concurrently)
    
    future = sleep.submit(2)  # submit a task run to the task runner, doesn't block flow
    
    future.wait()  # block the flow until the submitted task run is complete
    
    sleep.map([5] * 10)  # submit 10 at once, each sleeps for 5 seconds, don't block
    
    # even though the flow is done, we won't exit until all the task runs are complete

if __name__ == "__main__":
    my_flow()
This example demonstrates concurrent execution using task submission and mapping. ### Reference For more details and additional examples, you can visit the following resources: - Concurrent Subflows Documentation - Code Example for Concurrent Tasks Hopefully, this helps you run your subflows concurrently with less pain and suffering. Or perhaps more. Who knows these days.
k
@Marvin, when running subflows concurrently using
AnyIO
task groups or
asyncio.gather
, do all of the tasks of the subflow need to be async
m
When using
asyncio.gather
or
AnyIO
task groups to run subflows concurrently, it is not strictly necessary for all tasks within the subflows to be asynchronous; however, asynchronous tasks can optimize concurrency and prevent blocking. You can mix synchronous and asynchronous tasks within your subflows, but here's how you should manage them: ### Asynchronous Subflow Example Here's how you can run subflows concurrently using `asyncio.gather`:
Copy code
python
import asyncio
from prefect import flow, task

@task
async def async_task(x):
    await asyncio.sleep(1)
    return x

@task
def sync_task(x):
    import time
    time.sleep(1)
    return x

@flow
async def my_subflow(x):
    result_async = await async_task(x)
    result_sync = sync_task(x)  # This will still block, consider refactoring if performance is critical
    print(f"Subflow async result: {result_async}, sync result: {result_sync}")
    return result_async, result_sync

@flow
async def my_parent_flow():
    tasks = [my_subflow(x) for x in range(5)]
    # Run subflows concurrently using asyncio.gather
    results = await asyncio.gather(*tasks)
    print(f"Parent flow results: {results}")

if __name__ == "__main__":
    asyncio.run(my_parent_flow())
### Handling Mixed Tasks While the example above showcases running both async and sync tasks, keep in mind: 1. Asynchronous Subflows: Mark subflows
async
to use
await
. 2. Synchronous Tasks: If sync tasks are part of async subflows, they will block. ### Additional Resources Here is a simple async flow example: Async Basic Flow Hopefully, this makes your concurrent subflow execution slightly less dreary. For more specifics on async usage in Prefect, you might want to peruse the official documentation.