Hello everyone, I see that a task as an arbitrary ...
# ask-community
s
Hello everyone, I see that a task as an arbitrary function can be skipped by raising SKIP signal.
Copy code
from prefect import task
from prefect.tasks.shell import ShellTask
from prefect.engine import signals

shell_task_invoke = ShellTask()

@task
def skip_if_non_prod(env):
    if env == "PROD":
        return shell_task_invoke(command="ls")
    else:
        raise signals.RETRY()
Can I skip subclasses of class Task i.e. ShellTask, DBTShellTask by passing parameter something like below?
Copy code
is_not_prod = False if env == "PROD" else True
ShellTask(skip=is_not_prod)
k
Hey @Sumit Kumar Rai, all of the tasks in the task library take the init kwargs of the
Task
class, but there is nothing for skip. You can maybe make another task that raises a SKIP signal
if True
. Set this to be upstream of the
ShellTasks
, and the SKIP will propagate to downstream tasks.
a
i believe you can do something like
Copy code
with case(is_not_prod, False):
    ShellTask()
here's the documentation on conditional logic
k
That’s a better idea @Anurag Bajpai!
s
Thanks @Kevin Kho and @Anurag Bajpai
I am able to create the flow as I wanted in the attachment, with the help of case statement. I'm planning to run the snapshot task only if it is production. But the logs says that my task is running multiple times one after another. I have 0 retries for the task too. How did it run multiple times? What did I do wrong? Here's also the source code I wrote.
Copy code
dbt_deps_task_invoke        = dbt_task(
        command="dbt deps",
        task_args={"name": "Installing dbt dependencies"}
    )
    
    dbt_deps_task_invoke.set_upstream(git_clone_task_invoke)
    
    dbt_deps_task_invoke.set_upstream(pipelinewise_cf2_flow)
    dbt_deps_task_invoke.set_upstream(pipelinewise_cf3_flow)
    dbt_deps_task_invoke.set_upstream(pipelinewise_passmark_flow)
    dbt_deps_task_invoke.set_upstream(pipelinewise_talentlms_flow)
    dbt_deps_task_invoke.set_upstream(sheetload_flow)
    
    with case(ENV == PRODUCTION, True):
        dbt_snapshot_task_invoke    = dbt_task(
            command="dbt snapshot --target " + DBT_ENV,
            task_args={
                "name": "Taking snapshot",
                "trigger": all_finished
            }
        )
        dbt_snapshot_task_invoke.set_upstream(dbt_deps_task_invoke)
    snapshot_case_invoked_task = merge(dbt_snapshot_task_invoke, dbt_deps_task_invoke)

    dbt_run_task_invoke     = dbt_task(
        "dbt run --target " + DBT_ENV,
        task_args={"name": "Running dbt"}
    )
@Kevin Kho @Anurag Bajpai
k
Hard to say from this @Sumit Kumar Rai, which part is being repeated? Your graph looks good also.
case
looks a bit off though. It should take in a task as the first argument, so maybe make a task that does the comparison?