FuETL
01/11/2023, 6:20 PMcase
with flow when is true
and when is false
But i noticed that when the case is false
the task_b()
is always SKIPPED.
After some struggling :face_holding_back_tears: i figure out that the reason that the task was being SKIPPED because inside the case true
i'm changing the task_result
variable (if i rename the task run on false cond).
Is this the desired behaviour? Why is like that? Is there a way to bypass without renaming?
from prefect import task, Flow, case
@task()
def task_a():
print("task_a")
return "task_a"
@task()
def task_condition():
return False
@task()
def task_b(value: str):
print("task_b")
return f"task_b={value}"
@task()
def task_c():
print("task_c")
with Flow("test-flow") as flow:
task_result = task_a()
cond = task_condition()
with case(cond, True):
task_result = task_b(task_result)
task_c()
with case(cond, False):
task_result = task_b(task_result)
flow.run()
Zanie
01/12/2023, 12:41 AMwith case: …
always executes when constructing your DAGtask_result = task_b(task_result)
will always override that variable to be the possibly skipped result of task_b
FuETL
01/12/2023, 1:16 AMtask_result
, so this happen because of the way that DAG is contructed right? I will probably need to read further more about how DAG are constructed (i appreciate if you have some material to share).
For sure i will avoid this type of shenanigans in future. 😅Zanie
01/12/2023, 3:23 AMcase
statements — they’re evaluated at runtime not when you are declaring the flow.from prefect import task, Flow, case
from prefect.triggers import not_all_skipped
@task()
def task_a():
print("task_a")
return "task_a"
@task()
def task_condition():
return False
@task()
def task_b(value: str):
print("task_b")
return f"task_b={value}"
@task(trigger=not_all_skipped)
def get_case(a, b):
# select one that's not skipped (not implemented)
return a
@task
def task_c(value):
print(value)
with Flow("test-flow") as flow:
a = task_a()
cond = task_condition()
with case(cond, True):
b_true = task_b(a)
with case(cond, False):
b_false = task_b(a)
b = get_case(b_true, b_false)
task_c(b)
FuETL
01/12/2023, 11:15 AMwith case(cond, True):
result1 = task_b("_result1;" + result)
task_c()
with case(cond, False):
result2 = task_b("_result2;" + result)
result = merge(result1, result2)
another_task(result)
Zanie
01/12/2023, 5:48 PMmerge
— I figured we had something like this but couldn’t remember what it was called