Philip MacMenamin
06/30/2020, 9:25 PMa = task_a()
b = task_b(a)
task_c(upstream_tasks=[task_b])
This produces a strange looking DAG. I would expect that to produce a DAG like
task_a ---> task_b ----> task_cAvi A
06/30/2020, 9:33 PMtask_c(upstream_tasks=[b])
Philip MacMenamin
06/30/2020, 9:34 PMAvi A
06/30/2020, 9:34 PMtask_b
with different arguments, how do you differentiate which one should be the upstream for you task_c
?a
and b
are technically tasks, not return results. This is how prefect builds its DAGPhilip MacMenamin
06/30/2020, 9:36 PMJim Crist-Harif
06/30/2020, 9:40 PMupstream_tasks
as you've done here). A task that returns nothing can still be referenced.Avi A
06/30/2020, 9:41 PMreturn
, it returns a None
, which is still a value. Prefect distinguishes between a None
results and NoResult
. See the box in this chapter:
https://docs.prefect.io/core/concepts/results.html#result-objectsPhilip MacMenamin
06/30/2020, 9:46 PMwith Flow("PDB Flow") as f:
pdb_id = Parameter("pdb_id")
job = init_job(pdb_id)
pdb_file = get_pdp_local(job)
wc_task(job, upstream_tasks=[pdb_file])
shouldn't this just be a linear DAG?Jim Crist-Harif
06/30/2020, 9:47 PMjob
to both wc_task
and get_pdp_local
, so there's a diamond dependency for that task.Philip MacMenamin
06/30/2020, 9:48 PMJim Crist-Harif
06/30/2020, 9:49 PMinit_job
is being used by both wc_task
and get_pdb_local
, so the result needs to be passed to both of those tasks.wc_task
depends on both init_job
and get_pdp_local
.Philip MacMenamin
06/30/2020, 9:50 PMJim Crist-Harif
06/30/2020, 9:51 PMPhilip MacMenamin
06/30/2020, 9:52 PMwith Flow("PDB Flow") as f:
pdb_id = Parameter("pdb_id")
job = init_job(pdb_id)
pdb_file = get_pdp_local(job)
wc_task(job, upstream_tasks=[pdb_file])
s3_task(job, upstream_tasks=[wc_task, get_pdp_local])
Jim Crist-Harif
06/30/2020, 10:03 PMwc_task
as an upstream task, when you want to be passing in the output of wc_task
.Philip MacMenamin
06/30/2020, 10:05 PMJim Crist-Harif
06/30/2020, 10:06 PM@task
decorated function returns a task representing running that function with those args. Those are the nodes in the graph. The @task
decorated function also happens to be a task itself (there's implementation reasons for this), but usually you don't want to add those to the flow directly.Philip MacMenamin
06/30/2020, 10:06 PMwith Flow("PDB Flow") as f:
pdb_id = Parameter("pdb_id")
job = init_job(pdb_id)
pdb_file = get_pdp_local(job)
wc_out = wc_task(job, upstream_tasks=[pdb_file])
s3_task(job, upstream_tasks=[wc_out])
Avi A
06/30/2020, 10:09 PMJim Crist-Harif
06/30/2020, 10:09 PM@task
def add(x, y):
return x + 1
@task
def echo(x):
print(x)
@task
def do_after_echo():
print("I run later, with no arguments")
with Flow("test") as flow:
a = add(1, 2)
b = add(3, 4)
c = add(a, b)
echo_c = echo(c)
do_after_echo(upstream_tasks=[echo_c])
Philip MacMenamin
06/30/2020, 10:09 PMAvi A
06/30/2020, 10:10 PMwc_out
that you defined saves the reference to the task output (which is None
, but it doesnât matter), which is then used to produce the dependency graphPhilip MacMenamin
06/30/2020, 10:10 PMJim Crist-Harif
06/30/2020, 10:12 PMPre_s3
needs to wait for wc_task
to complete.Philip MacMenamin
06/30/2020, 10:18 PM_
? I think that's the pythonic way of saying you're kind of ignoring the varJim Crist-Harif
06/30/2020, 10:23 PM@task
def inc(x):
return x + 1
with Flow("example") as flow:
# 1 is a data dependency
a = inc(1)
# 2 is a data dependency
b = inc(2)
# b is a data dependency, the result of task `b` is passed as an argument
# a is a non-data dependency. The result of `a` is not passed to task `c`, but task `c` depends on task `a`, and won't run until it's completed
c = inc(b, upstream_tasks=[a])
a
, b
, and c
represent tasks in your graph. Depending on how these tasks are passed as arguments to other tasks determines whether the return value from them is used as a "data dependency" (if you pass them as an argument to the task), or whether an implicit non-data dependency is used (if you pass them in upstream_tasks
).Philip MacMenamin
06/30/2020, 10:38 PMNone
)
and you assign that None to a var, and then use that None/var to set as an upstream task.
In the sense my wc_task
returns nothing (really None
) and I assign it to wc_out
and then make that a dep to prevent the last task in the list from running until the wc_task
is finished.
Or as you demo'd with
echo_c = echo(c)
do_after_echo(upstream_tasks=[echo_c])
Nakul Goyal
07/21/2020, 11:18 AMJeremiah
07/21/2020, 11:35 AMc(y=b(x=a())
.Nakul Goyal
07/21/2020, 11:46 AMJeremiah
07/21/2020, 12:07 PM