Ofir
08/26/2022, 3:58 PMRyan Peden
08/26/2022, 4:16 PMimport asyncio
from prefect import task, flow
@task
async def task1():
print("task1")
@task
async def task2():
print("task2")
@flow(name="subflow 1")
async def sub_flow_one():
print("in subflow")
await task1()
return "returned from subflow 1"
@flow(name="subflow 2")
async def sub_flow_two(input):
print("in subflow")
print(input)
await task2()
return "returned from subflow 2"
@flow(name="main flow")
async def main_flow():
sub_one_result = await sub_flow_one()
sub_two_result = await sub_flow_two(sub_one_result)
print(sub_two_result)
if __name__ == '__main__':
asyncio.run(main_flow())
As the docs mention, one of the advantages of subflows is that you can use different task runners for each flow. So, for example, you might use a Dask task runner that connects to a CPU-only Dask cluster for your preprocessing subflow, but you might then want to use a different Dask cluster running on machines with GPUs for you training subflow.Ofir
08/26/2022, 6:10 PM