FuETL
01/11/2023, 6:20 PMcasetruefalsefalsetask_b()truetask_resultfrom prefect import task, Flow, case
@task()
def task_a():
    print("task_a")
    return "task_a"
@task()
def task_condition():
    return False
@task()
def task_b(value: str):
    print("task_b")
    return f"task_b={value}"
@task()
def task_c():
    print("task_c")
with Flow("test-flow") as flow:
    task_result = task_a()
    cond = task_condition()
    with case(cond, True):
        task_result = task_b(task_result)
        task_c()
    with case(cond, False):
        task_result = task_b(task_result)
flow.run()FuETL
01/12/2023, 12:35 AMZanie
Zanie
with case: …Zanie
task_result = task_b(task_result)task_bZanie
FuETL
01/12/2023, 1:16 AMtask_resultFuETL
01/12/2023, 1:21 AMFuETL
01/12/2023, 1:22 AMZanie
caseZanie
Zanie
Zanie
from prefect import task, Flow, case
from prefect.triggers import not_all_skipped
@task()
def task_a():
    print("task_a")
    return "task_a"
@task()
def task_condition():
    return False
@task()
def task_b(value: str):
    print("task_b")
    return f"task_b={value}"
@task(trigger=not_all_skipped)
def get_case(a, b):
    # select one that's not skipped (not implemented)
    return a
@task
def task_c(value):
    print(value)
with Flow("test-flow") as flow:
    a = task_a()
    cond = task_condition()
    with case(cond, True):
        b_true = task_b(a)
    with case(cond, False):
        b_false = task_b(a)
    b = get_case(b_true, b_false)
    task_c(b)FuETL
01/12/2023, 11:15 AMFuETL
01/12/2023, 11:26 AMwith case(cond, True):
        result1 = task_b("_result1;" + result)
        task_c()
    with case(cond, False):
        result2 = task_b("_result2;" + result)
    result = merge(result1, result2)
    another_task(result)Zanie
mergeZanie
