Bruno Murino
06/24/2021, 12:43 PMZach Angell
Bruno Murino
06/25/2021, 1:27 PMZach Angell
Bruno Murino
06/25/2021, 2:03 PMwith Flow("kensington_pipeline") as flow:
env = Parameter('env')
scope = Parameter('scope')
config = get_config(env, scope)
dbt_run_command = get_dbt_run_command(config)
dbt_run_out = dbt_task(command=dbt_run_command, env=config['dbt_env'])
with case(env, 'prod'):
dbt_test_out = dbt_task(command="dbt test", env=config['dbt_env'], upstream_tasks=[dbt_run_out])
dbt_docs_out = dbt_task(command="dbt docs generate", env=config['dbt_env'], upstream_tasks=[dbt_test_out])
Bruno Murino
06/25/2021, 2:04 PM@task
def get_dbt_run_command(config):
env = config['env']
scope = config['scope']
scope_map = {
"full": "dbt run",
"local": extract_file_content("./src/local.sh"),
}
return scope_map[scope]
Zach Angell
dbt_test_out
and dbt_docs_out
are expected to run last. Because env
is not 'prod'
, the two tasks are skipped and not failed.
The exact solution will depend on your use case. If you're only using Prefect Core, one solution might be to set the reference tasks manually with flow.set_reference_tasks([])
like below
from prefect import task, Task, Flow, Parameter
from prefect.tasks.control_flow.case import case
@task
def fail():
raise Exception
my_task = Task()
ENV = 'local'
with Flow('case statement and terminal task') as flow:
env = Parameter('env', default=ENV)
fail_task = fail()
with case(env, 'prod'):
my_task_test = my_task(upstream_tasks=[fail_task])
if env != 'prod':
flow.set_reference_tasks([fail_task])
flow.run()
Bruno Murino
06/25/2021, 4:06 PMZach Angell
Bruno Murino
06/25/2021, 4:12 PMBruno Murino
06/25/2021, 4:12 PMZach Angell
flow.set_reference_tasks(flow.tasks)
Bruno Murino
06/25/2021, 4:34 PM