Hello, I’m trying to run multiple subflows at the ...
# best-practices
r
Hello, I’m trying to run multiple subflows at the same time, with each subflow running an extract, transform and load task (see test code below). I expected it to run each task right after each other during each subflow’s run. From the UI (see attachement), it seems that ‘subflow / 4’ does do that. But then ‘subflow / 3’ waits for ‘subflow / 4’ to be done before starting to do the ‘transform and load’ tasks. then ‘subflow / 2’ waits for ‘subflow / 3’ to be done, before running its transform and load and finally ‘subflow / 1’ runs transform and load, after ‘subflow / 2’ has completed Are my expectations misplaced? How could I improve my code to have all subflows run their respective tasks as soon as possible?
Copy code
import asyncio

import random
from prefect import task, flow

from prefect.runtime import flow_run


def flow_run_name():
    return f"{flow_run.parameters['flow_index']}"

@task(name='extract')
async def extract():
    await asyncio.sleep(2)
    return 'extracted'

@task(name='transform')
async def transform(extracted):
    await asyncio.sleep(2)
    return 'transformed'

@task(name='load')
async def load(transformed):
    await asyncio.sleep(2)
    return 'loaded'

@flow(name='subflow', flow_run_name=flow_run_name, log_prints=True)
async def subflow(flow_index):
    extracted = await extract()
    transformed = await transform(extracted=extracted)
    loaded = await load(transformed=transformed)
    print('flow index: ' + str(flow_index) + ' ' + extracted + ' +' + transformed + '+' + loaded)

@flow(name='main_flow')
async def main_flow():
    subflows = [subflow(1), subflow(2), subflow(3),  subflow(4),]
    await asyncio.gather(*subflows)

if __name__ == '__main__':
    asyncio.run(main_flow())