Hi everyone — I have a flow that is ending with “F...
# ask-community
b
Hi everyone — I have a flow that is ending with “Flow run SUCCESS: all reference tasks succeeded” even though one of the tasks failed! Am I missing something? The task that fails is a DBTShellTask.. Problem is that I have a flow state handler that sends slack alerts if the overall flow state is FAILED, but that’s no happening so we’re not getting any alert..
z
Hi Bruno, could you share the code for your DBTShellTask
b
It is a “dbt test” only..
z
Could you share your flow code?
b
Copy code
with Flow("kensington_pipeline") as flow:
        env = Parameter('env')
        scope = Parameter('scope')
        config = get_config(env, scope)
        dbt_run_command = get_dbt_run_command(config)
        dbt_run_out = dbt_task(command=dbt_run_command, env=config['dbt_env'])

        with case(env, 'prod'):
            dbt_test_out = dbt_task(command="dbt test", env=config['dbt_env'], upstream_tasks=[dbt_run_out])
            dbt_docs_out = dbt_task(command="dbt docs generate", env=config['dbt_env'], upstream_tasks=[dbt_test_out])
Copy code
@task
def get_dbt_run_command(config):
    env = config['env']
    scope = config['scope']

    scope_map = {
        "full": "dbt run",
        "local": extract_file_content("./src/local.sh"),
    }

    return scope_map[scope]
z
Thanks! The confusion here is how a flow's "reference" tasks are determined. Reference tasks are used to determine the final state of the flow. By default, the reference tasks are the last tasks executed, also called the "terminal" tasks. In this case,
dbt_test_out
and
dbt_docs_out
are expected to run last. Because
env
is not
'prod'
, the two tasks are skipped and not failed. The exact solution will depend on your use case. If you're only using Prefect Core, one solution might be to set the reference tasks manually with
flow.set_reference_tasks([])
like below
Copy code
from prefect import task, Task, Flow, Parameter
from prefect.tasks.control_flow.case import case

@task
def fail():
	raise Exception

my_task = Task()

ENV = 'local'


with Flow('case statement and terminal task') as flow:
	env = Parameter('env', default=ENV)
	fail_task = fail()
	with case(env, 'prod'):
		my_task_test = my_task(upstream_tasks=[fail_task])

	if env != 'prod':
		flow.set_reference_tasks([fail_task])

flow.run()
b
hmmm I see! This is a bit odd haha but I think it makes sense… Also, I think this is only not-default when you have these “case” statements, right?
z
exactly!
b
can I make it such that if any task fails, then the flow fails ?
I thought about adding all tasks to the function you mentioned above but sounds like there should be an easier way
z
If you do want to fail the flow when any task fails, I'd recommend setting all tasks as reference tasks
flow.set_reference_tasks(flow.tasks)
b
ah that’s great! Thank you very much!