<@ULVA73B9P> Why do my tasks run sequentially in c...
# ask-marvin
j
@Marvin Why do my tasks run sequentially in concurrent subflows.
Copy code
from prefect import flow, get_run_logger, task
import asyncio

@flow
async def simple_flow():
    await asyncio.sleep(10)

@task
def simple_task():
    asyncio.run(asyncio.sleep(10))

@flow
async def flow_with_tasks():
    simple_task1 = simple_task.submit()
    simple_task2 = simple_task.submit()
    simple_task1.result()
    simple_task2.result()

@flow
async def flow_with_one_task():
    simple_task()

@flow
def sync_flow():
    simple_task()

@flow
async def test_pipeline():

    logger = get_run_logger()
    <http://logger.info|logger.info>("Local timezone: Australia/Melbourne")

    sync_flow()
    
    flow1 = await flow_with_one_task()
    
    flow1 = asyncio.create_task(flow_with_one_task(return_state=True))
    
    flow2 = asyncio.create_task(simple_flow(return_state=True))
    flow3 = asyncio.create_task(flow_with_one_task(return_state=True))
    flow4 = asyncio.create_task(flow_with_tasks(return_state=True))
    
    flow5 = asyncio.create_task(flow_with_one_task(return_state=True))
    flow6 = asyncio.create_task(flow_with_one_task(return_state=True))

    await asyncio.gather(flow3, flow4, flow2, flow5, flow6, flow1)

if __name__ == '__main__':
    asyncio.run(test_pipeline())
m
The issue you're encountering where tasks run sequentially in concurrent subflows seems to root from how Python's
asyncio
and Prefect handle concurrency and task runners. Here are key points and references from the Prefect documentation and issues: ### Understanding Concurrency in Prefect 1. Task Runners and Default Behavior: - Prefect's
flow
function has a default task runner (
ConcurrentTaskRunner
), but your customized use of
asyncio
might conflict with how Prefect manages its event loops and concurrency. - Reference: Specifying Upstream Dependencies in Prefect 2. Tasks vs. Subflows: - Tasks in Prefect are better suited for parallel execution compared to subflows. If you run tasks within a flow, Prefect ensures they are managed according to the flow’s concurrency settings. - Reference: Concepts of Flows ### Specific Issues and Discussions from Prefect's GitHub: - RuntimeError with Multiple Subflows: - When running the same subflow concurrently multiple times, a
RuntimeError("The task runner is already started!")
can be encountered. - Solution: Convert those subflows into tasks if they need to run in parallel. - Reference: GitHub Issue #5853 - Future Enhancements and Submit API: - There's an ongoing enhancement request for a
Flow.submit
interface which might better handle future subflow executions. - Reference: GitHub Issue #6689 ### Recommendations: - Convert Subflows to Tasks: - Instead of running the subflows concurrently, convert them into tasks to utilize Prefect's native concurrency handling mechanisms. - Refactor to Use
asyncio
Correctly
: - Ensure proper use of
asyncio
when combined with Prefect. For instance, avoiding
asyncio.run
inside tasks or flows should be avoided as it can create conflicts with how Prefect orchestrates flow runs. Here’s a refactored example solution:
Copy code
python
from prefect import flow, task, get_run_logger
import asyncio

@task
async def simple_task():
    await asyncio.sleep(10)

@flow
async def flow_with_tasks():
    task1 = await simple_task.submit()
    task2 = await simple_task.submit()
    await asyncio.gather(task1, task2)

@flow
async def main_pipeline():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Local timezone: Australia/Melbourne")
    
    tasks = [
        flow_with_tasks(),
        flow_with_tasks()
    ]
    
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main_pipeline())
This refactoring ensures the proper use of
asyncio
alongside Prefect's task concurrency mechanisms. For other details, refer directly to the provided documentation links and GitHub Issues. If needed, continue monitoring the Prefect GitHub repository for new updates or enhancements regarding concurrent subflows and task runners.