Jack P
11/15/2023, 4:03 PMasyncio.run(async_function())
in a Prefect 2 sub flow?Marvin
11/15/2023, 4:03 PMasyncio.run(async_function())
in a Prefect 2 subflow. Asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather. Here is an example:
import asyncio
from prefect import flow
@flow
async def subflow_1():
print("Subflow 1 started!")
await asyncio.sleep(1)
@flow
async def subflow_2():
print("Subflow 2 started!")
await asyncio.sleep(1)
@flow
async def subflow_3():
print("Subflow 3 started!")
await asyncio.sleep(1)
For more details, you can refer to this discussion.Jack P
11/15/2023, 4:04 PMMarvin
11/15/2023, 4:04 PMfrom prefect import Flow
@Flow
async def main_flow():
await subflow_1.run()
await subflow_2.run()
await subflow_3.run()
This will run the subflows sequentially. If you want to run them in parallel, you can use `asyncio.gather`:
from prefect import Flow
import asyncio
@Flow
async def main_flow():
await asyncio.gather(subflow_1.run(), subflow_2.run(), subflow_3.run())
Please note that the run
method is used to execute the subflows.Nate
11/15/2023, 4:05 PMrun
method is used to execute the subflows." untrue friendMarvin
11/15/2023, 4:05 PMrun
method to execute flows or subflows. Instead, you just call the flow like a function. Here's how you would do it:
from prefect import Flow
@Flow
async def main_flow():
await subflow_1()
await subflow_2()
await subflow_3()
And for parallel execution:
from prefect import Flow
import asyncio
@Flow
async def main_flow():
await asyncio.gather(subflow_1(), subflow_2(), subflow_3())
I hope this clears up the misunderstanding.Jack P
11/15/2023, 4:05 PM