t

    Tsang Yong

    2 years ago
    Hello. Have another question while checking on ifelse() flow. What is my best approach if I have a lot of downstream tasks based on a ifelse decision. Do I wrap all the downstream tasks with a ifelse()?
    actually I was able to workaround my flow logic with SKIP
    maybe…🤔
    Chris White

    Chris White

    2 years ago
    tasks which are downstream of the skipped task will also skip, so you shouldn’t have to worry about them!
    b

    Brett Naul

    2 years ago
    piggy-backing on this: is there a way to put multiple tasks behind an
    ifelse
    at once? the following runs both
    add
    tasks and only the
    List
    is skipped
    @task
    def add(x, y):
        print(x, y)
        return x + y
    
    with Flow('flow') as flow:
        do_stuff = Parameter('do_stuff')
        ifelse(do_stuff, [add(1, 2), add(2,3)], None)
    curious if this was what @Tsang Yong was dealing with as well (maybe w/ a shared upstream dependency for all of the tasks that gets skipped by the ifelse?)
    Jeremiah

    Jeremiah

    2 years ago
    Hi Brett, we used to have a warning for this situation, but it was confusing because it also showed in other situations, so we removed it in https://github.com/PrefectHQ/prefect/pull/1514. Essentially, though it’s surprising, the
    add
    tasks are actually upstream of the
    list
    task, and therefore they aren’t gated by the
    ifelse
    .
    One way to resolve this is to introduce a pass-through task (
    Task()
    will do it) as the argument to
    ifelse
    , and have your addition tasks all sit downstream of it.
    That way the
    ifelse
    check will gate their execution via the pass-through task.
    b

    Brett Naul

    2 years ago
    makes sense; this seemed to work for me
    from typing import List
    from prefect import Task
    
    def do_if(condition: Task, *tasks) -> None:
        flag = Task()
        ifelse(condition, flag, None)
        for task in tasks:
            task.set_upstream(flag)
    
    @task
    def add(x, y):
        print(x, y)
        return x + y
    
    with Flow('flow') as flow:
        do_stuff = Parameter('do_stuff')
        total_1 = add(1, 2)
        total_2 = add(2, 3)
        do_if(do_stuff, total_1, total_2)
    
    state = flow.run(parameters={'do_stuff': False})
    Jeremiah

    Jeremiah

    2 years ago
    👍 Glad this works! However, interestingly, this could give you unexpected behavior in the future, depending on the complexity of your real-world tasks that are upstream of the
    add
    . Prefect executes tasks in a non-deterministic topological order. That means that your constant tasks (
    1, 2, 2, 3
    ) MIGHT get executed before even
    do_stuff
    , simply because the system identifies that they have no upstream blockers. Obviously in this example it doesn’t matter but just wanted to call it out as an implementation detail for your real-world cases!
    b

    Brett Naul

    2 years ago
    👍 should be fine here but good to keep in mind, thanks!