How do I run concurrent flows? I understand that t...
# ask-marvin
m
How do I run concurrent flows? I understand that tasks can run concurrently, but I have flows that are already compositions of tasks, and the flow needs to run concurrently for sets of different parameters.
d
make sure to
@marvin
at the beginning of your question to ask Marvin 😄
✅ 1
j
my understanding is that you need to mark each flow as async with
Copy code
@flow
async def flow_1():
    return None

@flow
async def flow_2():
    return None
and then you can use normal asyncio functions with it, e.g.
Copy code
res1, res2 = await asyncio.gather(flow_1(), flow_2())
m
thanks jason, I have been running
run_deployment
within async TaskGroups, but that was based on a suggested workaround from this discourse starting in 2022, not sure if there was an updated best practice to avoid the errors called out in that discussion.
It looks like the original bug is resolved in this issue closed in 2023, I haven't tried it yet hopefully it works fine to run concurrent subflows rather than building new tasks as a subflow.
j
Looking at the thread it seems like the semantics changed between prefect2 and prefect1. I'm using the above pattern without any issues, but my use profile may not line up exactly with yours
m
I've always been on Prefect 2. The problem was when an identical subflow was run (with different parameters) it would fail. Hence the run_deployment idea. but that has some drawbacks. In my case, the job needed to be replicated is too complex to squeeze into a single task.
@jason baker the difference from your example is running the same flow with different parameters concurrently. I'll report back if I have success with the closed ticket 7322.
j
wouldn't this minimal example pose the same problem? the same top level flow "parent" running the
double
function/flow with different parameters
Copy code
from prefect import flow
import asyncio


@flow(name="double")
async def double(num: int) -> int:
    await asyncio.sleep(1)
    return num * 2


@flow(name="parent")
async def parent():
    c1, c2 = await asyncio.gather(double(1), double(2))
    return c1, c2


if __name__ == "__main__":
    asyncio.run(parent())
maybe it is different when it has tasks on different threads running, or running on your actual infrastructure
m
awesome, thanks for the tight example, I just ran it successfully on prefect version 2.16.0
I tested with a task component and it looks good.
Copy code
from prefect import flow, task, get_run_logger
import asyncio

@task
async def base_task():
    logger = get_run_logger()
    await asyncio.sleep(5)
    <http://logger.info|logger.info>("This task is running")
    return 42

@flow(name="double")
async def double(num: int) -> int:
    await asyncio.sleep(1)
    return num * (await base_task())

@flow(name="parent")
async def parent():
    logger = get_run_logger()
    c1, c2 = await asyncio.gather(double(1), double(2))
    <http://logger.info|logger.info>(f"c1, c2: {c1, c2}")
    return c1, c2


if __name__ == "__main__":
    asyncio.run(parent())
the task run concurrently and finish around the same time.
🦜 1
j
great to hear!
It sounds like the one difference between using the run_deployment approach is that that would enable true parallelism, running on separate infrastructure - where this will run on the same infra as the parent. so if you have CPU bound blocking code your flows are doing, it will block the main thread and limit your concurrency. tasks on the other hand always run on separate threads (or maybe processes) as I understand it
m
good point, and i have used run_deployment in this case extensively. On the flip side, if the parallel
run_deployment
flows are all on the same infrastructure you will hit an
OSError: [Errno 24] Too many open files
, and if the storage is on github you can be doing a lot of unnecessary pulls from remote storage.