https://prefect.io logo
Title
f

FuETL

01/11/2023, 6:20 PM
Hi everyone, can someone enlighten this behaviour to me? I have the
case
with flow when is
true
and when is
false
But i noticed that when the case is
false
the
task_b()
is always SKIPPED. After some struggling :face_holding_back_tears: i figure out that the reason that the task was being SKIPPED because inside the case
true
i'm changing the
task_result
variable (if i rename the task run on false cond). Is this the desired behaviour? Why is like that? Is there a way to bypass without renaming?
from prefect import task, Flow, case


@task()
def task_a():
    print("task_a")
    return "task_a"


@task()
def task_condition():
    return False


@task()
def task_b(value: str):
    print("task_b")
    return f"task_b={value}"


@task()
def task_c():
    print("task_c")


with Flow("test-flow") as flow:
    task_result = task_a()
    cond = task_condition()

    with case(cond, True):
        task_result = task_b(task_result)
        task_c()

    with case(cond, False):
        task_result = task_b(task_result)

flow.run()
😛refect-duck: Sorry to bump, appreciate any help with this. Thank you!
z

Zanie

01/12/2023, 12:41 AM
Hi.
with case: …
always executes when constructing your DAG
Your reassignment at
task_result = task_b(task_result)
will always override that variable to be the possibly skipped result of
task_b
I would strongly recommend not reusing variable names like this as it will lead to confusing behavior and make it hard to understand what’s going on in your flow.
f

FuETL

01/12/2023, 1:16 AM
Thanks for the response @Zanie . So in my flow the usage is like: I have a condition that depending of what evaluate i will change the content of the variable in this case
task_result
, so this happen because of the way that DAG is contructed right? I will probably need to read further more about how DAG are constructed (i appreciate if you have some material to share). For sure i will avoid this type of shenanigans in future. 😅
DAG with the current code:
DAG with different variable names:
z

Zanie

01/12/2023, 3:23 AM
You can’t change the DAG like that with
case
statements — they’re evaluated at runtime not when you are declaring the flow.
Or rather, while declaring the flow both cases are executed to construct the DAG. You can’t have the DAG come back together afterwards based on the value without doing something a little more complicated.
You might be able to do something like this
from prefect import task, Flow, case
from prefect.triggers import not_all_skipped

@task()
def task_a():
    print("task_a")
    return "task_a"


@task()
def task_condition():
    return False


@task()
def task_b(value: str):
    print("task_b")
    return f"task_b={value}"


@task(trigger=not_all_skipped)
def get_case(a, b):
    # select one that's not skipped (not implemented)
    return a


@task
def task_c(value):
    print(value)


with Flow("test-flow") as flow:
    a = task_a()
    cond = task_condition()

    with case(cond, True):
        b_true = task_b(a)

    with case(cond, False):
        b_false = task_b(a)

    b = get_case(b_true, b_false)
    task_c(b)
f

FuETL

01/12/2023, 11:15 AM
Very nice! Thank you @Zanie
Also is possible to use merge as well like:
with case(cond, True):
        result1 = task_b("_result1;" + result)
        task_c()

    with case(cond, False):
        result2 = task_b("_result2;" + result)

    result = merge(result1, result2)
    another_task(result)
z

Zanie

01/12/2023, 5:48 PM
Oh yeah
merge
— I figured we had something like this but couldn’t remember what it was called
🙂