Sumit Kumar Rai
08/11/2021, 4:28 AMfrom prefect import task
from prefect.tasks.shell import ShellTask
from prefect.engine import signals
shell_task_invoke = ShellTask()
@task
def skip_if_non_prod(env):
if env == "PROD":
return shell_task_invoke(command="ls")
else:
raise signals.RETRY()
Can I skip subclasses of class Task i.e. ShellTask, DBTShellTask by passing parameter something like below?
is_not_prod = False if env == "PROD" else True
ShellTask(skip=is_not_prod)
Kevin Kho
Task
class, but there is nothing for skip. You can maybe make another task that raises a SKIP signal if True
. Set this to be upstream of the ShellTasks
, and the SKIP will propagate to downstream tasks.Anurag Bajpai
08/11/2021, 3:49 PMwith case(is_not_prod, False):
ShellTask()
here's the documentation on conditional logicKevin Kho
Sumit Kumar Rai
08/12/2021, 3:45 AMSumit Kumar Rai
08/30/2021, 4:18 AMdbt_deps_task_invoke = dbt_task(
command="dbt deps",
task_args={"name": "Installing dbt dependencies"}
)
dbt_deps_task_invoke.set_upstream(git_clone_task_invoke)
dbt_deps_task_invoke.set_upstream(pipelinewise_cf2_flow)
dbt_deps_task_invoke.set_upstream(pipelinewise_cf3_flow)
dbt_deps_task_invoke.set_upstream(pipelinewise_passmark_flow)
dbt_deps_task_invoke.set_upstream(pipelinewise_talentlms_flow)
dbt_deps_task_invoke.set_upstream(sheetload_flow)
with case(ENV == PRODUCTION, True):
dbt_snapshot_task_invoke = dbt_task(
command="dbt snapshot --target " + DBT_ENV,
task_args={
"name": "Taking snapshot",
"trigger": all_finished
}
)
dbt_snapshot_task_invoke.set_upstream(dbt_deps_task_invoke)
snapshot_case_invoked_task = merge(dbt_snapshot_task_invoke, dbt_deps_task_invoke)
dbt_run_task_invoke = dbt_task(
"dbt run --target " + DBT_ENV,
task_args={"name": "Running dbt"}
)
Sumit Kumar Rai
08/31/2021, 6:13 AMKevin Kho
case
looks a bit off though. It should take in a task as the first argument, so maybe make a task that does the comparison?