hi all, i'm trying to write a flow where all the t...
# prefect-community
d
hi all, i'm trying to write a flow where all the tasks should execute in a sequential order but some of them should optionally be skipped. basic example in normal python:
Copy code
print("hello world 1")
if not skip_task2:
    print("hello world 2")
if not skip_task3:
    print("hello world 3")
print("hello world 4")
here's an attempt in Prefect:
Copy code
@task()
def print_task(value: str) -> str:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(value)
    return value


with Flow("case_skip_flow") as flow:
    skip_task2 = Parameter("skip_task2", True)
    skip_task3 = Parameter("skip_task3", True)

    task1 = print_task("hello world 1")
    with case(skip_task2, False):
        task2 = print_task("hello world 2", upstream_tasks=[task1])
    with case(skip_task3, False):
        task3 = print_task(
            "hello world 3", upstream_tasks=[task1, task2], task_args={"skip_on_upstream_skip": False}
        )
    task4 = print_task("hello world 4", upstream_tasks=[task3], task_args={"skip_on_upstream_skip": False})
i'm getting into trouble with the upstream skip status for
task3
. • if I set
skip_on_upstream_skip
to
True
, then when
task2
is skipped, so is
task3
regardless of the value for
skip_task3
• if I set
skip_on_upstream_skip
to
False
, then
task3
runs regardless of the value for
skip_task3
how can i implement a pattern like this?
k
Hi @Dominick Olivito, looking now
This is pretty hard
Because there are two upstreams for task 3 (task 2 and the case), we only really care about the SKIP from the case for execution but the task 2 skip is affecting the trigger of task_3. also, the SKIP is not super distinguishable from the SUCCESS in the available triggers i think you actually need to make your own trigger for it
Yeah I don’t think I’m seeing a way, even if you create a custom trigger
d
thanks for taking a look. i tried a bit more, and it looks like if I put in an intermediate dummy task, then I can hack this and make this work:
Copy code
@task()
def dummy_task() -> None:
    pass


with Flow("case_skip_flow") as flow:
    skip_task2 = Parameter("skip_task2", True)
    skip_task3 = Parameter("skip_task3", False)

    task1 = print_task("hello world 1")
    with case(skip_task2, False):
        task2 = print_task("hello world 2", upstream_tasks=[task1])
    dummy_result = dummy_task(upstream_tasks=[task2], task_args={"skip_on_upstream_skip": False})

    with case(skip_task3, False):
        task3 = print_task(
            "hello world 3", upstream_tasks=[dummy_result]
        )
        task4 = print_task("hello world 4", upstream_tasks=[task3])
would this be more straightforward in Prefect 2.0?
k
yep definitely will be because you can just call
.wait()
and use the native Python if
👍 1