<@ULVA73B9P> I have a flow that runs two async sub...
# marvin-in-the-wild
s
@Marvin I have a flow that runs two async sub-flows, both subflows call tasks that are async with a time sleep. The first task of my 2 concurrent subflows executes in parallel. Then each task after that first task is run sequentially. I can’t seem to figure out the issue.
Copy code
@flow
async def run_sleep_multi_flow():
    flows = [
        wait_flow_1(),
        wait_flow_2(),
    ]
    await asyncio.gather(*flows)

@flow(task_runner=ConcurrentTaskRunner)
async def wait_flow_1():
    await sleep_task_async(5)
    await sleep_task_async(5)
    await sleep_task_async(5)


@flow(task_runner=ConcurrentTaskRunner)
async def wait_flow_2():
    await sleep_task_async(5)
    await sleep_task_async(5)
    await sleep_task_async(5)

@task
async def sleep_task_async(time_in_seconds):
    await asyncio.sleep(time_in_seconds)
    return "Done sleep"
m
It appears you're encountering an issue with concurrent execution in your flows while using the
ConcurrentTaskRunner
. According to the gathered information, here are some considerations and potential solutions: 1. Subflow Execution: Each subflow will create its task runner. If you use
ConcurrentTaskRunner
in parent and subflows, Prefect might handle them independently, impacting your concurrency. 2. Dependency Management: Ensure there's no hidden dependency between tasks that might cause sequential execution. 3. Task Runner Configuration: Verify that
ConcurrentTaskRunner
works as expected and isn't overridden by any local settings. To maintain concurrent execution, consider the following structure adjustments and checking: 1. Explicit Task Runner for Subflows: Ensure each subflow explicitly defines the
ConcurrentTaskRunner
. 2. Utilize
asyncio.gather
in Tasks: Ensure your async tasks themselves are leveraging
asyncio.gather
if they involve multiple awaits internally. 3. Review Task Runner Docs: - Review Prefect Task Runners Documentation for nuances in usage. Here’s a refined example to ensure concurrency in subflows and tasks using `asyncio.gather`:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import asyncio

@flow
async def run_sleep_multi_flow():
    flows = [
        wait_flow_1(),
        wait_flow_2(),
    ]
    await asyncio.gather(*flows)

@flow(task_runner=ConcurrentTaskRunner())
async def wait_flow_1():
    await asyncio.gather(
        sleep_task_async(5),
        sleep_task_async(5),
        sleep_task_async(5)
    )

@flow(task_runner=ConcurrentTaskRunner())
async def wait_flow_2():
    await asyncio.gather(
        sleep_task_async(5),
        sleep_task_async(5),
        sleep_task_async(5)
    )

@task
async def sleep_task_async(time_in_seconds):
    await asyncio.sleep(time_in_seconds)
    return "Done sleep"

if __name__ == "__main__":
    await run_sleep_multi_flow()
### References: - Prefect Task Runners Documentation - Relevant GitHub Issues: Issue #12351, Issue #12804 Ensure your environment setup supports concurrent execution and no other flows/settings override the expected behavior.