merlin
05/28/2024, 6:40 PMDylan
@marvin
at the beginning of your question to ask Marvin 😄jason baker
05/28/2024, 6:50 PM@flow
async def flow_1():
return None
@flow
async def flow_2():
return None
and then you can use normal asyncio functions with it, e.g.
res1, res2 = await asyncio.gather(flow_1(), flow_2())
merlin
05/28/2024, 6:56 PMrun_deployment
within async TaskGroups, but that was based on a suggested workaround from this discourse starting in 2022, not sure if there was an updated best practice to avoid the errors called out in that discussion.merlin
05/28/2024, 7:00 PMjason baker
05/28/2024, 7:00 PMmerlin
05/28/2024, 7:02 PMmerlin
05/28/2024, 7:11 PMjason baker
05/28/2024, 7:20 PMdouble
function/flow with different parameters
from prefect import flow
import asyncio
@flow(name="double")
async def double(num: int) -> int:
await asyncio.sleep(1)
return num * 2
@flow(name="parent")
async def parent():
c1, c2 = await asyncio.gather(double(1), double(2))
return c1, c2
if __name__ == "__main__":
asyncio.run(parent())
jason baker
05/28/2024, 7:21 PMmerlin
05/28/2024, 7:29 PMmerlin
05/28/2024, 7:44 PMfrom prefect import flow, task, get_run_logger
import asyncio
@task
async def base_task():
logger = get_run_logger()
await asyncio.sleep(5)
<http://logger.info|logger.info>("This task is running")
return 42
@flow(name="double")
async def double(num: int) -> int:
await asyncio.sleep(1)
return num * (await base_task())
@flow(name="parent")
async def parent():
logger = get_run_logger()
c1, c2 = await asyncio.gather(double(1), double(2))
<http://logger.info|logger.info>(f"c1, c2: {c1, c2}")
return c1, c2
if __name__ == "__main__":
asyncio.run(parent())
merlin
05/28/2024, 7:45 PMjason baker
05/28/2024, 7:45 PMjason baker
05/28/2024, 7:47 PMmerlin
05/28/2024, 8:05 PMrun_deployment
flows are all on the same infrastructure you will hit an OSError: [Errno 24] Too many open files
, and if the storage is on github you can be doing a lot of unnecessary pulls from remote storage.