https://prefect.io logo
Title
j

jack

02/24/2023, 8:31 PM
In v2, what are the best ways to set dependencies between tasks? In v1 we called
flow.set_dependencies(task_4, upstream_tasks=[task_2, task_3])
1
j

Jenia Varavva

02/24/2023, 8:32 PM
r

Ryan Peden

02/24/2023, 8:52 PM
Also note that passing the result of one task to another task as an input parameter will make the downstream task wait for its input tasks to finish. For example:
import time
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner


@task
def task_1():
    print("Task 1 running!")
    time.sleep(3)
    return "Task 1 result"

@task
def task_2(result):
    print("Task 2 running!")
    print(f"Task 2 got result: {result}")
    time.sleep(3)

@task
def task_3(result):
    print("Task 3 running!")
    print(f"Task 3 got result: {result}")


@flow(name="hello-world", task_runner=ConcurrentTaskRunner())
def main_flow() -> None:
    result_1 = task_1.submit()
    result_2 = task_2.submit(result_1)
    task_3.submit(result_2)


if __name__ == "__main__":
    main_flow()
Normally, calling
.submit
would cause all three tasks to run concurrently. But since task 1's result is an input to task 2, task 2 treats task 1 as a dependency and waits for it to finish. Same for task 3 waiting for the result of task 2.