Thanks! The confusion here is how a flow's "reference" tasks are determined. Reference tasks are used to determine the final state of the flow. By default, the reference tasks are the last tasks executed, also called the "terminal" tasks.
In this case,
dbt_test_out
and
dbt_docs_out
are expected to run last. Because
env
is not
'prod'
, the two tasks are skipped and not failed.
The exact solution will depend on your use case. If you're only using Prefect Core, one solution might be to set the reference tasks manually with
flow.set_reference_tasks([])
like below
from prefect import task, Task, Flow, Parameter
from prefect.tasks.control_flow.case import case
@task
def fail():
raise Exception
my_task = Task()
ENV = 'local'
with Flow('case statement and terminal task') as flow:
env = Parameter('env', default=ENV)
fail_task = fail()
with case(env, 'prod'):
my_task_test = my_task(upstream_tasks=[fail_task])
if env != 'prod':
flow.set_reference_tasks([fail_task])
flow.run()