hayssam08/29/2019, 12:06 AM
The flow is expected to output the statistics at every run. Suppose that I want to conditionally run the [e,t,l] part of the flow only if the file has been modified since than the last insertion in postgres. Rows in postgres are timestamped, and
@task def parse_file(): [...] @task(skip_on_upstream_skip=False) def compute_aggregates_from_db(): [...] with Flow('ETL ISM') as flow: e = parse_file() t = process_dataframe(e) l = store_dataframe_in_db(t, table_name="test_prefect") ifelse(should_refresh_table("test_prefect"), e, Constant("No need")) statistics = compute_aggregates_from_db(upstream_tasks=[l]) flow.run()
indicate whether an upade is required or not. Should I : a. Perform the check in
and raise a
accordingly => I can avoid the
part of the flow, but
are still executed b. Add a
: same result c. Add a
: all its downstreams are skipped, which is the desired behavior Is this the approach you would recommend ? What was a bit surprising for me is that the condition is applied on the upstream
task, while I (wrongly) tried to condition the downstream
task, expecting that all upstreams are skipped as they are not needed.
Jeremiah08/29/2019, 1:10 AM
approaches don’t work because the control flow logic isn’t encountered until after
have run. Approach
works because the logic is checked first and only if it succeeds does control get transferred to the rest of the pipeline.