https://prefect.io logo
Title
d

dherincx

03/17/2022, 2:03 AM
So I have an ETL flow that I'm executing (shown below). There is a task named
execute_ddls
that only executes if
new_ddl_exist
is True. When
execute_ddls
is skipped, all my downstream dbt tasks are skipped but regardless if the case statement is entered, I want all downstream tasks to run. I tried
skip_on_upstream_skip = False
on the dbtShellTask but it doesn't work. I'm sure I'm missing something so trivial...
with Flow('bi_test_flow') as flow:

    # new DDLs (if any)
    ddls = new_ddls_to_run(loaded_files, os.listdir(DDL_PATH))

    # # # execute new DDLs ONLY if they exist
    new_ddl_exist = do_new_ddl_scripts_exist(ddls)
    with case(new_ddl_exist, True):
        execute_ddls = execute_sql(ddls)

    dbt = dbt(
        command="dbt run -m anlyz_base.views",
        upstream_tasks=[execute_ddls],
    )
    dbt_operations = dbt(
        command="dbt run-operation materialize_views"
    )
k

Kevin Kho

03/17/2022, 2:21 AM
This happens because the mechanism for skipping a task is propagating a skip signal so
execute_ddls
raises SKIP and then dbt raises SKIP. To avoid that, set dbt’s trigger to always_run. You can find more on triggers here
d

dherincx

03/17/2022, 2:43 AM
Hey Kevin when I try this, I get an
unexpected keyword argument: trigger
. I understand the propogration, but not sure why the trigger parameter isn't recognized here, despite inheriting from the Task class
dbt_operations = dbt(
        command="dbt run-operation materialize_views",
        trigger=all_successful
    )
k

Kevin Kho

03/17/2022, 2:53 AM
The trigger needs to be defined at the task definition
d

dherincx

03/17/2022, 2:55 AM
Oh gosh, duh.....my gosh...it's been a long day...of course....
Thanks Kevin!