<@ULVA73B9P> whats the best way to call multiple s...
# marvin-in-the-wild
g
@Marvin whats the best way to call multiple subflows in parallel / concurrently?
m
To run multiple subflows concurrently in Prefect, you can utilize asyncio.gather or AnyIO task groups. This approach uses asynchronous subflows to achieve parallel execution. Here's a relevant code example: Documentation excerpts: - Subflows with AnyIO or asyncio - Example Code Code example:
Copy code
import asyncio
from prefect import flow

async def some_async_function():
    await asyncio.sleep(1)
    return "I'm an async function"

@flow(log_prints=True) # send `print` statements as INFO logs
async def my_async_subflow(name):
    print(f"Subflow {name} started")
    await some_async_function()
    print(f"Subflow {name} completed")

@flow(log_prints=True)
async def parent_flow():
    subflows = [my_async_subflow(name) for name in ["A", "B", "C"]]
    await asyncio.gather(*subflows)

if __name__ == "__main__":
    asyncio.run(parent_flow()) # run it like a normal async python function
In this example,
parent_flow
runs subflows
my_async_subflow
in parallel using
asyncio.gather
.
g
ok i see. could you tell me why this one doesnt run in parallel?
Copy code
import asyncio
import time

from prefect import flow, task


@task
async def shout(number):
    time.sleep(5)
    print(f"#{number}")


@flow()
async def count_to(highest_number):
    results = [shout.submit(number) for number in range(highest_number)]
    return results


@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)


@flow()
async def many_flows(highest_number: int, times: int):
    results = [count_to(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)
    # return [count_to_proxy.submit(highest_number) for _ in range(times)]


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
@Marvin
m
Your code seems correct at first glance, but let's identify a few specific points to check: 1. Synchronous Blocking Call: The
time.sleep(5)
call in your
shout
task is blocking. Even if the task function is async, the blocking call within it will prevent parallel execution. 2. Prefect's Task Management: When tasks are submitted to Prefect's task runner, if they encounter a synchronous block, it can disrupt concurrency. Here’s the corrected version of your code:
Copy code
python
import asyncio
import time

from prefect import flow, task

# Correct the blocking call
@task
async def shout(number):
    await asyncio.sleep(5)  # Change from time.sleep to asyncio.sleep
    print(f"#{number}")

@flow()
async def count_to(highest_number):
    results = [shout.submit(number) for number in range(highest_number)]
    return results

@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)

@flow()
async def many_flows(highest_number: int, times: int):
    results = [count_to(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)  # This should work fine with corrected shout task

if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
You can find more relevant details in the documentation: - Documentation on async subflows - Documentation on using asyncio By using
asyncio.sleep(5)
instead of
time.sleep(5)
, you ensure that the coroutine yields control back to the event loop, allowing other tasks to proceed concurrently.
g
@Marvin count_to’s are still executing one after the other, not in parallel
m
From the relevant GitHub issues, it appears that Prefect has had some challenges with concurrently running subflows, and some of these issues might still affect your current setup. However, there's a workaround that might help you achieve parallel execution. You can ensure that each subflow uses a separate task runner. Here's an updated example incorporating this mechanism:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

# Correct the blocking call
@task
async def shout(number):
    await asyncio.sleep(5)  # Use asyncio.sleep instead of time.sleep
    print(f"#{number}")

@flow()
async def count_to(highest_number):
    # Ensure the use of the relevant task runner type
    task_runner_type = type(FlowRunContext.get().task_runner)
    results = [shout.with_options(task_runner=task_runner_type()).submit(number) for number in range(highest_number)]
    return results

@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)

@flow()
async def many_flows(highest_number: int, times: int):
    task_runner_type = type(FlowRunContext.get().task_runner)
    results = [count_to.with_options(task_runner=task_runner_type())(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)  # This should work fine with corrected shout task

if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
### Key Changes: 1. Using
asyncio.sleep
: Ensures non-blocking sleep within tasks. 2. Separate Task Runners: Each subflow is configured to use a separate task runner to enable true parallel execution. References: - Issue on running subflows concurrently - Issue on parent flow status when running subflows This adaptation should help in achieving parallel execution of subflows. If issues persist, consider tracking the ongoing discussion and resolution in the linked issues.