jack
02/24/2023, 8:31 PMflow.set_dependencies(task_4, upstream_tasks=[task_2, task_3])
Jenia Varavva
02/24/2023, 8:32 PMRyan Peden
02/24/2023, 8:52 PMimport time
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def task_1():
print("Task 1 running!")
time.sleep(3)
return "Task 1 result"
@task
def task_2(result):
print("Task 2 running!")
print(f"Task 2 got result: {result}")
time.sleep(3)
@task
def task_3(result):
print("Task 3 running!")
print(f"Task 3 got result: {result}")
@flow(name="hello-world", task_runner=ConcurrentTaskRunner())
def main_flow() -> None:
result_1 = task_1.submit()
result_2 = task_2.submit(result_1)
task_3.submit(result_2)
if __name__ == "__main__":
main_flow()
Normally, calling .submit
would cause all three tasks to run concurrently. But since task 1's result is an input to task 2, task 2 treats task 1 as a dependency and waits for it to finish. Same for task 3 waiting for the result of task 2.