Documentation for `upstream_tasks` is a bit unclea...
# ask-community
c
Documentation for
upstream_tasks
is a bit unclear (to me). The following code:
Copy code
@task()
def task_a():
    ...

@task()
def task_b(param):
    ...

@task()
def task_c():
    ...

with Flow("example") as flow:
    a = task_a()
    task_b(a)
    task_c(upstream_tasks=[task_b])
produces a DAG as follows:
Copy code
task_a -> task_b
task_b -> task_c
But I want
task_a -> task_b -> task_c
, i.e.
task_b
must only run once, doesn't return anything, and must complete before
task_c
starts. To achieve this I've just assigned a fake return variable and passed that to
upstream_tasks
as follows:
Copy code
with Flow("example") as flow:
    a = task_a()
    b = task_b(a)
    task_c(upstream_tasks=[b])
Is this the best way to do this?
s
As far as I know, this is the intended way, yes. The issue with your first version is that Prefect doesn't know you only want to run one instance of
task_b
. There are use cases where you might want to run it multiple times with different inputs.
upvote 2
a
Yes, that's how I would do it.
upvote 2
a
Great answers! I’d only add that your task b doesn’t necessarily need any fake return value to set it as downstream dependency of task a.
Copy code
b = task_b()
The above creates a copy of a task, i.e. “b” is a copy of “task_b”, and since it’s a copy, you can reference it in
task_c(upstream_tasks=[b])
.
Copy code
from prefect import task, Flow


@task()
def task_a():
    return "a"


@task(log_stdout=True)
def task_b():
    print("b")


@task()
def task_c():
    return "c"


with Flow("example") as flow:
    a = task_a()
    b = task_b(upstream_tasks=[a])
    c = task_c(upstream_tasks=[b])

if __name__ == '__main__':
    flow.visualize()
👍 1
you can also do:
Copy code
with Flow("example") as flow:
    a = task_a()
    b = task_b()
    c = task_c()
    a.set_downstream(b)
    b.set_downstream(c)

if __name__ == '__main__':
    flow.visualize()
c
Thanks all!