https://prefect.io logo
Title
b

Bob Colner

03/15/2020, 11:06 PM
Hi folks!, I'm a disgruntled airflow user here looking to learn more about prefect. I've started to port over a simple ETL workflow with some success.
My issue is some tasks are getting triggered more then necessary (2 vs. 1 runs). I'm trying to use the functional API and my tasks do not have data-dependencies)
for reference this is the 'flow':
with Flow("core-etl", schedule=schedule) as flow:
    
    events_itc()
    
    dims_subscribers(
        upstream_tasks=[events_subscriptions()]
    )
        
    dims_devices_made(
        upstream_tasks=[dims_devices_made_installed, dims_adjust]
    )
    
    dims_devices_instasize(
        upstream_tasks=[dims_devices_instasize_installed, dims_adjust]
    )
    
    dims_merged_subscribers(
        upstream_tasks=[dims_devices_instasize, dims_subscribers]
    )
flow.visualize()
dims_devices_instasize
and
dims_subscribers
both run 2 times in the flow.
Perhaps I need to use the imperative API?
c

Chris White

03/15/2020, 11:23 PM
Hi @Bob Colner and welcome! Anytime you call a task using the functional API, a new task is created. That means that in the following example,
a
and
b
are completely independent tasks:
with Flow("example") as flow:
    a = my_task()
    b = my_task()
in your case, if you store the output of
dims_inst = dims_devices_instasize(
        upstream_tasks=[dims_devices_instasize_installed, dims_adjust]
    )
and use
dims_inst
as the upstream dependency of
dims_merged_subscribers
, you should be good to go (and see a different viz)
b

Bob Colner

03/15/2020, 11:30 PM
thanks, I'm my case there is no output to store. Each task is updating tables in a remote DB
basically, I need a '3 layer' DAG but not sure how to get that
c

Chris White

03/15/2020, 11:31 PM
Sorry I didn’t mean the output of the task itself but rather the return value of calling the task; (see my code snippet)
b

Bob Colner

03/15/2020, 11:31 PM
oh, so in the flow context the output is special?
I've also tried the .set_downstream() method. I guess I'm not groking how to express all dependencies in prefect after reading the docs+examples
c

Chris White

03/15/2020, 11:34 PM
the result of calling a task instance is a new copy of the task in the same way that
f() and f()
are two totally different function calls with distinct return values; this tutorial actually covers the distinction: https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows See the section header “Calling Tasks creates copies”
b

Bob Colner

03/15/2020, 11:36 PM
I'll check that out, still not understanding. In my example each task is unique and only needs to be run once. but the ordering matters
c

Chris White

03/15/2020, 11:41 PM
So it’s all about what API calls you make to build your flow; here’s an example of what I mean:
@task
def my_task():
    return 42

with Flow("example") as flow:

   # calling my_task produces a copy
   # that is distinct from my_task
   # but has the same runtime logic

    a = my_task()
    b = my_task(upstream_tasks=[a])
which is a flow with two tasks as can be seen in the following DAG viz:
b

Bob Colner

03/15/2020, 11:45 PM
thanks, and I've got that basic flow working if you . see my DAG viz.
Trying to get this 3 layer DAG :
sorry if this is confusing!
c

Chris White

03/15/2020, 11:48 PM
no worries at all! I understand where you’re coming from. Give me a few minutes and I’ll refactor your code snippet to work as you expect
b

Bob Colner

03/15/2020, 11:51 PM
thanks so much
c

Chris White

03/15/2020, 11:58 PM
Anytime! Try this out and see if it does what you expect:
with Flow("core-etl", schedule=schedule) as flow:

    sub_task = dims_subscribers(
        upstream_tasks=[events_subscriptions]
    )

    made_task = dims_devices_made(
        upstream_tasks=[dims_devices_made_installed, dims_adjust]
    )

    inst_task = dims_devices_instasize(
        upstream_tasks=[dims_devices_instasize_installed, dims_adjust]
    )

    dims_merged_subscribers(
        upstream_tasks=[inst_task, sub_task]
    )
b

Bob Colner

03/16/2020, 12:01 AM
nice! ah much better
and it ran like I intended
cool, so assigning the output var is key (even if the task has no other 'real' output)
then the later task can reference the output var (vs. the task itself like I was using)
makes sense now.
c

Chris White

03/16/2020, 12:04 AM
yup exactly!
b

Bob Colner

03/16/2020, 12:05 AM
thanks again, I'm liking the design of the tool/service -reminds me a bit of AWS SWS
the 'hybrid' cloud approach that is
c

Chris White

03/16/2020, 12:06 AM
anytime! and thanks - let us know if you have any other questions