hayssam
08/29/2019, 12:06 AM@task
def parse_file():
[...]
@task(skip_on_upstream_skip=False)
def compute_aggregates_from_db():
[...]
with Flow('ETL ISM') as flow:
e = parse_file()
t = process_dataframe(e)
l = store_dataframe_in_db(t, table_name="test_prefect")
ifelse(should_refresh_table("test_prefect"), e, Constant("No need"))
statistics = compute_aggregates_from_db(upstream_tasks=[l])
flow.run()
The flow is expected to output the statistics at every run. Suppose that I want to conditionally run the [e,t,l] part of the flow only if the file has been modified since than the last insertion in postgres. Rows in postgres are timestamped, and should_refresh_table
indicate whether an upade is required or not. Should I :
a. Perform the check in store_dataframe_in_db
and raise a prefect.engine.signals.SKIP
accordingly => I can avoid the l
part of the flow, but e
and t
are still executed
b. Add a prefect.tasks.control_flow.conditional.ifelse
on l
: same result
c. Add a prefect.tasks.control_flow.conditional.ifelse
on e
: all its downstreams are skipped, which is the desired behavior
Is this the approach you would recommend ? What was a bit surprising for me is that the condition is applied on the upstream e
task, while I (wrongly) tried to condition the downstream l
task, expecting that all upstreams are skipped as they are not needed.Jeremiah
08/29/2019, 1:10 AMa
and b
approaches donât work because the control flow logic isnât encountered until after e
and t
have run. Approach c
works because the logic is checked first and only if it succeeds does control get transferred to the rest of the pipeline.