Chris Arderne
11/08/2021, 12:29 PMupstream_tasks
is a bit unclear (to me). The following code:
@task()
def task_a():
...
@task()
def task_b(param):
...
@task()
def task_c():
...
with Flow("example") as flow:
a = task_a()
task_b(a)
task_c(upstream_tasks=[task_b])
produces a DAG as follows:
task_a -> task_b
task_b -> task_c
But I want task_a -> task_b -> task_c
, i.e. task_b
must only run once, doesn't return anything, and must complete before task_c
starts. To achieve this I've just assigned a fake return variable and passed that to upstream_tasks
as follows:
with Flow("example") as flow:
a = task_a()
b = task_b(a)
task_c(upstream_tasks=[b])
Is this the best way to do this?Sylvain Hazard
11/08/2021, 12:33 PMtask_b
. There are use cases where you might want to run it multiple times with different inputs.Amanda Wee
11/08/2021, 12:33 PMAnna Geller
b = task_b()
The above creates a copy of a task, i.e. “b” is a copy of “task_b”, and since it’s a copy, you can reference it
in task_c(upstream_tasks=[b])
.
from prefect import task, Flow
@task()
def task_a():
return "a"
@task(log_stdout=True)
def task_b():
print("b")
@task()
def task_c():
return "c"
with Flow("example") as flow:
a = task_a()
b = task_b(upstream_tasks=[a])
c = task_c(upstream_tasks=[b])
if __name__ == '__main__':
flow.visualize()
Anna Geller
with Flow("example") as flow:
a = task_a()
b = task_b()
c = task_c()
a.set_downstream(b)
b.set_downstream(c)
if __name__ == '__main__':
flow.visualize()
Chris Arderne
11/08/2021, 12:53 PM