https://prefect.io logo
h

hayssam

08/29/2019, 12:06 AM
Hi, just getting started evaluating Prefect, looking good so far, as simple and expressive as needed to be useable 😉 Quick noob question about the flow semantics: Suppose I define this ETL like flow :
Copy code
@task
def parse_file():
[...]

@task(skip_on_upstream_skip=False)
def compute_aggregates_from_db():
[...]

with Flow('ETL ISM') as flow:
    e = parse_file() 
    t = process_dataframe(e) 
    l = store_dataframe_in_db(t, table_name="test_prefect") 
    ifelse(should_refresh_table("test_prefect"), e, Constant("No need"))
    statistics = compute_aggregates_from_db(upstream_tasks=[l])  
flow.run()
The flow is expected to output the statistics at every run. Suppose that I want to conditionally run the [e,t,l] part of the flow only if the file has been modified since than the last insertion in postgres. Rows in postgres are timestamped, and
should_refresh_table
indicate whether an upade is required or not. Should I : a. Perform the check in
store_dataframe_in_db
and raise a
prefect.engine.signals.SKIP
accordingly => I can avoid the
l
part of the flow, but
e
and
t
are still executed b. Add a
prefect.tasks.control_flow.conditional.ifelse
on
l
: same result c. Add a
prefect.tasks.control_flow.conditional.ifelse
on
e
: all its downstreams are skipped, which is the desired behavior Is this the approach you would recommend ? What was a bit surprising for me is that the condition is applied on the upstream
e
task, while I (wrongly) tried to condition the downstream
l
task, expecting that all upstreams are skipped as they are not needed.
j

Jeremiah

08/29/2019, 1:10 AM
Hi @hayssam this is an excellent question. Prefect evaluates your tasks “in order”, by which I mean it will not evaluate a downstream task until all of its upstream tasks have finished. In this way, it runs your code exactly how Python would if you were writing this as a script — if a function has an input, then the function doesn’t run until the input has been computed. Therefore, your
a
and
b
approaches don’t work because the control flow logic isn’t encountered until after
e
and
t
have run. Approach
c
works because the logic is checked first and only if it succeeds does control get transferred to the rest of the pipeline.
Does that make sense?
Put another way: tasks can pass information downstream; they can not pass information upstream.