Where is the docs for how the DAG is generated? eg...
# ask-community
p
Where is the docs for how the DAG is generated? eg, if I have
Copy code
a = task_a()
b = task_b(a)
task_c(upstream_tasks=[task_b])
This produces a strange looking DAG. I would expect that to produce a DAG like task_a ---> task_b ----> task_c
a
you want to use
Copy code
task_c(upstream_tasks=[b])
p
the return of the task, not the task itself?
a
yes, the reason is: suppose you have multiple results running
task_b
with different arguments, how do you differentiate which one should be the upstream for you
task_c
?
btw, when the flow is executed,
a
and
b
are technically tasks, not return results. This is how prefect builds its DAG
p
ok. Got it. That works. Is there anything I can read going through the logic of this?
I guess knowing that a return value from a task is a task helps clarify a lot of the logic 🤔
Is the general approach to always have a task return something, even if that result isn't "important", in order to give you control of the DAG?
j
Think of calling a task as returning a reference to the output of the task. You can then pass that reference to other tasks (either as arguments, or as
upstream_tasks
as you've done here). A task that returns nothing can still be referenced.
For something to read, https://docs.prefect.io/core/concepts/flows.html may be useful
a
regarding reading material, I don’t know; I learned it on the go. It’s related to the imperative API but I see now that the documentation is a bit lacking on that part https://docs.prefect.io/core/concepts/flows.html#imperative-api
Regarding “returning something”. In Python every function returns something. If you don’t end it with a
return
, it returns a
None
, which is still a value. Prefect distinguishes between a
None
results and
NoResult
. See the box in this chapter: https://docs.prefect.io/core/concepts/results.html#result-objects
p
Copy code
with Flow("PDB Flow") as f:
    pdb_id = Parameter("pdb_id")
    job = init_job(pdb_id)
    pdb_file = get_pdp_local(job)
    wc_task(job, upstream_tasks=[pdb_file])
shouldn't this just be a linear DAG?
as opposed to:
j
You're passing
job
to both
wc_task
and
get_pdp_local
, so there's a diamond dependency for that task.
upvote 1
p
Ah, ok. It's assuming job is being potentially mutated in both places?
j
Not mutated, no. The result of
init_job
is being used by both
wc_task
and
get_pdb_local
, so the result needs to be passed to both of those tasks.
The edges represent task dependencies.
wc_task
depends on both
init_job
and
get_pdp_local
.
p
ok - so, is that DAG saying that wctask can only run after bot init job and get_pdb_local?
j
Exactly
p
ok. I was reading it as saying it can run after init_job, and after get_local.
in the sense it's runnable directly after init, as well as after get local. Ie a short cut. But, generally speaking, is it fair to say returning something from a task, and using that to control the flow is a good idea?
Copy code
with Flow("PDB Flow") as f:
    pdb_id = Parameter("pdb_id")
    job = init_job(pdb_id)
    pdb_file = get_pdp_local(job)
    wc_task(job, upstream_tasks=[pdb_file])
    s3_task(job, upstream_tasks=[wc_task, get_pdp_local])
This is the graph if I add another task, which has two tasks marked as upstream. Those two replicated nodes, hanging in the middle on the LHS, what's the significance of them being there twice? Are they considered entrypoints?
j
You're passing in
wc_task
as an upstream task, when you want to be passing in the output of
wc_task
.
p
ok, so this goes back to a prev question. It's just a good idea to have returns from these tasks always?
j
Calling a
@task
decorated function returns a task representing running that function with those args. Those are the nodes in the graph. The
@task
decorated function also happens to be a task itself (there's implementation reasons for this), but usually you don't want to add those to the flow directly.
You don't need to return anything.
One sec, writing up an example
p
ok, using the default return is good enough, as Avi said?
Copy code
with Flow("PDB Flow") as f:
    pdb_id = Parameter("pdb_id")
    job = init_job(pdb_id)
    pdb_file = get_pdp_local(job)
    wc_out = wc_task(job, upstream_tasks=[pdb_file])
    s3_task(job, upstream_tasks=[wc_out])
a
yes. that should work
j
Copy code
@task
def add(x, y):
    return x + 1

@task
def echo(x):
    print(x)

@task
def do_after_echo():
    print("I run later, with no arguments")

with Flow("test") as flow:
    a = add(1, 2)
    b = add(3, 4)
    c = add(a, b)
    echo_c = echo(c)

    do_after_echo(upstream_tasks=[echo_c])
👍 1
p
produces:
a
as Jim pointed out,
wc_out
that you defined saves the reference to the task output (which is
None
, but it doesn’t matter), which is then used to produce the dependency graph
p
right, ok. So we're using implicit returns.
yup yup. Got it!
😎 1
haha. Not the sharpest knife in the drawer at times. OK!
j
Prefect has two types of dependencies: • data dependencies. These are used as direct arguments to tasks • upstream dependencies. These are used to mark that one task depends on another task, but the output won't be passed in directly as an argument. We're not using "implicit returns", since you don't actually want to use the return value - you want to note that
Pre_s3
needs to wait for
wc_task
to complete.
👍 1
p
ok, I was meaning the implicit return of None (ie I'm not writing that line of code) at the end of the task is being used to mark the task as needing to happen before the next task can. So, type 2, upstream deps in the above. And do you guys ever assign those `None`s as
_
? I think that's the pythonic way of saying you're kind of ignoring the var
actually, no, you need to assign them to uniq things, otherwise the book keeping gets messed up. OK.
j
Sorry, I think we're still talking past each other.
Copy code
@task
def inc(x):
    return x + 1

with Flow("example") as flow:
    # 1 is a data dependency
    a = inc(1)

    # 2 is a data dependency
    b = inc(2)

    # b is a data dependency, the result of task `b` is passed as an argument
    # a is a non-data dependency. The result of `a` is not passed to task `c`, but task `c` depends on task `a`, and won't run until it's completed
    c = inc(b, upstream_tasks=[a])
upvote 1
In the above,
a
,
b
, and
c
represent tasks in your graph. Depending on how these tasks are passed as arguments to other tasks determines whether the return value from them is used as a "data dependency" (if you pass them as an argument to the task), or whether an implicit non-data dependency is used (if you pass them in
upstream_tasks
).
👍 1
p
I think I'm OK with the above logic. I was talking about the cases where we have a task that has no explicit return (ie implicitly returns
None
) and you assign that None to a var, and then use that None/var to set as an upstream task. In the sense my
wc_task
returns nothing (really
None
) and I assign it to
wc_out
and then make that a dep to prevent the last task in the list from running until the
wc_task
is finished. Or as you demo'd with
Copy code
echo_c = echo(c)

    do_after_echo(upstream_tasks=[echo_c])
Thanks very much for your help guys! I appreciate it.
n
Hey, can't we create a fully functional dynamic DAG (flow a->b->c) at runtime? Ask is, I am able to create a dynamic DAG using "chain()" method, but what about the data dependencies? How to manage data depencies between task "a, b & c"? Can you please help here?.
j
@Nakul Goyal to create a DAG with data dependencies, call the tasks on each other like functions -
c(y=b(x=a())
.
n
@Jeremiah
Do you mean instead of using Prefect, this should be done in native Pythonic way??
j
Yes, this is part of Prefect’s design. Please see the docs here.