serial
01/30/2024, 7:09 PM@task(task_run_name="task-{id}-1")
def task_1(id: str):
time.sleep(1)
@task(task_run_name="task-{id}-2")
def task_2(id: str):
time.sleep(1)
@flow(task_runner=ConcurrentTaskRunner(),
flow_run_name="my_flow-{name}")
async def my_flow(name: str):
return(task_1.submit(name).wait(), task_2.submit(name))
@flow(task_runner=DaskTaskRunner())
async def master_flow():
futures = []
for name in ['A', 'B', 'C', 'D', 'E']:
futures.append(my_flow(name))
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(master_flow())
I hope you can help me.Jake Kaplan
01/30/2024, 7:21 PM.wait()
in return(task_1.submit(name).wait(), task_2.submit(name))
Jake Kaplan
01/30/2024, 7:22 PMserial
01/30/2024, 7:32 PMJake Kaplan
01/30/2024, 7:45 PMJake Kaplan
01/30/2024, 7:45 PM@task(task_run_name="task-{id}-1")
async def task_1(id: str):
time.sleep(1)
@task(task_run_name="task-{id}-2")
async def task_2(id: str):
time.sleep(1)
@flow(task_runner=ConcurrentTaskRunner(), flow_run_name="my_flow-{name}")
async def my_flow(name: str):
await task_1(name)
await task_2(name)
Jake Kaplan
01/30/2024, 7:45 PMserial
01/30/2024, 8:08 PMJake Kaplan
01/30/2024, 8:48 PM@task(task_run_name="task-{name}-1")
def task_1(name: str):
time.sleep(1)
@task(task_run_name="task-{name}-2")
def task_2(name: str):
time.sleep(1)
@flow(flow_run_name="my_flow-{name}")
async def sub_flow(name: str):
task1_future = task_1.submit(name=name)
task_2.submit(name=name, wait_for=[task1_future])
Jake Kaplan
01/30/2024, 8:48 PMJake Kaplan
01/30/2024, 8:49 PMserial
01/30/2024, 9:17 PMserial
01/30/2024, 9:34 PM@task(task_run_name="task-{id}-2")
def task_2(id: str, task1_result):
time.sleep(1)
print(task1_result)
@flow
async def my_flow(name: str):
task1_future = task_1.submit(name)
return(task_2.submit(name, task1_future, wait_for=[task1_future]))
Thank you again for your time, i started to have a headache 😄