https://prefect.io logo
Title
p

Philip MacMenamin

06/30/2020, 9:25 PM
Where is the docs for how the DAG is generated? eg, if I have
a = task_a()
b = task_b(a)
task_c(upstream_tasks=[task_b])
This produces a strange looking DAG. I would expect that to produce a DAG like task_a ---> task_b ----> task_c
a

Avi A

06/30/2020, 9:33 PM
you want to use
task_c(upstream_tasks=[b])
p

Philip MacMenamin

06/30/2020, 9:34 PM
the return of the task, not the task itself?
a

Avi A

06/30/2020, 9:34 PM
yes, the reason is: suppose you have multiple results running
task_b
with different arguments, how do you differentiate which one should be the upstream for you
task_c
?
btw, when the flow is executed,
a
and
b
are technically tasks, not return results. This is how prefect builds its DAG
p

Philip MacMenamin

06/30/2020, 9:36 PM
ok. Got it. That works. Is there anything I can read going through the logic of this?
I guess knowing that a return value from a task is a task helps clarify a lot of the logic 🤔
Is the general approach to always have a task return something, even if that result isn't "important", in order to give you control of the DAG?
j

Jim Crist-Harif

06/30/2020, 9:40 PM
Think of calling a task as returning a reference to the output of the task. You can then pass that reference to other tasks (either as arguments, or as
upstream_tasks
as you've done here). A task that returns nothing can still be referenced.
For something to read, https://docs.prefect.io/core/concepts/flows.html may be useful
a

Avi A

06/30/2020, 9:41 PM
regarding reading material, I don’t know; I learned it on the go. It’s related to the imperative API but I see now that the documentation is a bit lacking on that part https://docs.prefect.io/core/concepts/flows.html#imperative-api
Regarding “returning something”. In Python every function returns something. If you don’t end it with a
return
, it returns a
None
, which is still a value. Prefect distinguishes between a
None
results and
NoResult
. See the box in this chapter: https://docs.prefect.io/core/concepts/results.html#result-objects
p

Philip MacMenamin

06/30/2020, 9:46 PM
with Flow("PDB Flow") as f:
    pdb_id = Parameter("pdb_id")
    job = init_job(pdb_id)
    pdb_file = get_pdp_local(job)
    wc_task(job, upstream_tasks=[pdb_file])
shouldn't this just be a linear DAG?
as opposed to:
j

Jim Crist-Harif

06/30/2020, 9:47 PM
You're passing
job
to both
wc_task
and
get_pdp_local
, so there's a diamond dependency for that task.
:upvote: 1
p

Philip MacMenamin

06/30/2020, 9:48 PM
Ah, ok. It's assuming job is being potentially mutated in both places?
j

Jim Crist-Harif

06/30/2020, 9:49 PM
Not mutated, no. The result of
init_job
is being used by both
wc_task
and
get_pdb_local
, so the result needs to be passed to both of those tasks.
The edges represent task dependencies.
wc_task
depends on both
init_job
and
get_pdp_local
.
p

Philip MacMenamin

06/30/2020, 9:50 PM
ok - so, is that DAG saying that wctask can only run after bot init job and get_pdb_local?
j

Jim Crist-Harif

06/30/2020, 9:51 PM
Exactly
p

Philip MacMenamin

06/30/2020, 9:52 PM
ok. I was reading it as saying it can run after init_job, and after get_local.
in the sense it's runnable directly after init, as well as after get local. Ie a short cut. But, generally speaking, is it fair to say returning something from a task, and using that to control the flow is a good idea?
with Flow("PDB Flow") as f:
    pdb_id = Parameter("pdb_id")
    job = init_job(pdb_id)
    pdb_file = get_pdp_local(job)
    wc_task(job, upstream_tasks=[pdb_file])
    s3_task(job, upstream_tasks=[wc_task, get_pdp_local])
This is the graph if I add another task, which has two tasks marked as upstream. Those two replicated nodes, hanging in the middle on the LHS, what's the significance of them being there twice? Are they considered entrypoints?
j

Jim Crist-Harif

06/30/2020, 10:03 PM
You're passing in
wc_task
as an upstream task, when you want to be passing in the output of
wc_task
.
p

Philip MacMenamin

06/30/2020, 10:05 PM
ok, so this goes back to a prev question. It's just a good idea to have returns from these tasks always?
j

Jim Crist-Harif

06/30/2020, 10:06 PM
Calling a
@task
decorated function returns a task representing running that function with those args. Those are the nodes in the graph. The
@task
decorated function also happens to be a task itself (there's implementation reasons for this), but usually you don't want to add those to the flow directly.
You don't need to return anything.
One sec, writing up an example
p

Philip MacMenamin

06/30/2020, 10:06 PM
ok, using the default return is good enough, as Avi said?
with Flow("PDB Flow") as f:
    pdb_id = Parameter("pdb_id")
    job = init_job(pdb_id)
    pdb_file = get_pdp_local(job)
    wc_out = wc_task(job, upstream_tasks=[pdb_file])
    s3_task(job, upstream_tasks=[wc_out])
a

Avi A

06/30/2020, 10:09 PM
yes. that should work
j

Jim Crist-Harif

06/30/2020, 10:09 PM
@task
def add(x, y):
    return x + 1

@task
def echo(x):
    print(x)

@task
def do_after_echo():
    print("I run later, with no arguments")

with Flow("test") as flow:
    a = add(1, 2)
    b = add(3, 4)
    c = add(a, b)
    echo_c = echo(c)

    do_after_echo(upstream_tasks=[echo_c])
👍 1
p

Philip MacMenamin

06/30/2020, 10:09 PM
produces:
a

Avi A

06/30/2020, 10:10 PM
as Jim pointed out,
wc_out
that you defined saves the reference to the task output (which is
None
, but it doesn’t matter), which is then used to produce the dependency graph
p

Philip MacMenamin

06/30/2020, 10:10 PM
right, ok. So we're using implicit returns.
yup yup. Got it!
😎 1
haha. Not the sharpest knife in the drawer at times. OK!
j

Jim Crist-Harif

06/30/2020, 10:12 PM
Prefect has two types of dependencies: • data dependencies. These are used as direct arguments to tasks • upstream dependencies. These are used to mark that one task depends on another task, but the output won't be passed in directly as an argument. We're not using "implicit returns", since you don't actually want to use the return value - you want to note that
Pre_s3
needs to wait for
wc_task
to complete.
👍 1
p

Philip MacMenamin

06/30/2020, 10:18 PM
ok, I was meaning the implicit return of None (ie I'm not writing that line of code) at the end of the task is being used to mark the task as needing to happen before the next task can. So, type 2, upstream deps in the above. And do you guys ever assign those `None`s as
_
? I think that's the pythonic way of saying you're kind of ignoring the var
actually, no, you need to assign them to uniq things, otherwise the book keeping gets messed up. OK.
j

Jim Crist-Harif

06/30/2020, 10:23 PM
Sorry, I think we're still talking past each other.
@task
def inc(x):
    return x + 1

with Flow("example") as flow:
    # 1 is a data dependency
    a = inc(1)

    # 2 is a data dependency
    b = inc(2)

    # b is a data dependency, the result of task `b` is passed as an argument
    # a is a non-data dependency. The result of `a` is not passed to task `c`, but task `c` depends on task `a`, and won't run until it's completed
    c = inc(b, upstream_tasks=[a])
:upvote: 1
In the above,
a
,
b
, and
c
represent tasks in your graph. Depending on how these tasks are passed as arguments to other tasks determines whether the return value from them is used as a "data dependency" (if you pass them as an argument to the task), or whether an implicit non-data dependency is used (if you pass them in
upstream_tasks
).
👍 1
p

Philip MacMenamin

06/30/2020, 10:38 PM
I think I'm OK with the above logic. I was talking about the cases where we have a task that has no explicit return (ie implicitly returns
None
) and you assign that None to a var, and then use that None/var to set as an upstream task. In the sense my
wc_task
returns nothing (really
None
) and I assign it to
wc_out
and then make that a dep to prevent the last task in the list from running until the
wc_task
is finished. Or as you demo'd with
echo_c = echo(c)

    do_after_echo(upstream_tasks=[echo_c])
Thanks very much for your help guys! I appreciate it.
n

Nakul Goyal

07/21/2020, 11:18 AM
Hey, can't we create a fully functional dynamic DAG (flow a->b->c) at runtime? Ask is, I am able to create a dynamic DAG using "chain()" method, but what about the data dependencies? How to manage data depencies between task "a, b & c"? Can you please help here?.
j

Jeremiah

07/21/2020, 11:35 AM
@Nakul Goyal to create a DAG with data dependencies, call the tasks on each other like functions -
c(y=b(x=a())
.
n

Nakul Goyal

07/21/2020, 11:46 AM
@Jeremiah
Do you mean instead of using Prefect, this should be done in native Pythonic way??
j

Jeremiah

07/21/2020, 12:07 PM
Yes, this is part of Prefect’s design. Please see the docs here.