Hello. Have another question while checking on ifelse() flow. What is my best approach if I have a lot of downstream tasks based on a ifelse decision. Do I wrap all the downstream tasks with a ifelse()?
Tsang Yong
11/07/2019, 8:06 PM
actually I was able to workaround my flow logic with SKIP
Tsang Yong
11/07/2019, 8:14 PM
maybe…🤔
c
Chris White
11/07/2019, 9:38 PM
tasks which are downstream of the skipped task will also skip, so you shouldn’t have to worry about them!
b
Brett Naul
11/11/2019, 6:24 PM
piggy-backing on this: is there a way to put multiple tasks behind an
ifelse
at once? the following runs both
add
tasks and only the
List
is skipped
Copy code
@task
def add(x, y):
print(x, y)
return x + y
with Flow('flow') as flow:
do_stuff = Parameter('do_stuff')
ifelse(do_stuff, [add(1, 2), add(2,3)], None)
curious if this was what @Tsang Yong was dealing with as well (maybe w/ a shared upstream dependency for all of the tasks that gets skipped by the ifelse?)
j
Jeremiah
11/11/2019, 6:35 PM
Hi Brett, we used to have a warning for this situation, but it was confusing because it also showed in other situations, so we removed it in https://github.com/PrefectHQ/prefect/pull/1514. Essentially, though it’s surprising, the
add
tasks are actually upstream of the
list
task, and therefore they aren’t gated by the
ifelse
.
Jeremiah
11/11/2019, 6:37 PM
One way to resolve this is to introduce a pass-through task (
Task()
will do it) as the argument to
ifelse
, and have your addition tasks all sit downstream of it.
Jeremiah
11/11/2019, 6:37 PM
That way the
ifelse
check will gate their execution via the pass-through task.
b
Brett Naul
11/11/2019, 6:44 PM
makes sense; this seemed to work for me
Copy code
from typing import List
from prefect import Task
def do_if(condition: Task, *tasks) -> None:
flag = Task()
ifelse(condition, flag, None)
for task in tasks:
task.set_upstream(flag)
@task
def add(x, y):
print(x, y)
return x + y
with Flow('flow') as flow:
do_stuff = Parameter('do_stuff')
total_1 = add(1, 2)
total_2 = add(2, 3)
do_if(do_stuff, total_1, total_2)
state = flow.run(parameters={'do_stuff': False})
j
Jeremiah
11/11/2019, 6:48 PM
👍 Glad this works! However, interestingly, this could give you unexpected behavior in the future, depending on the complexity of your real-world tasks that are upstream of the
add
. Prefect executes tasks in a non-deterministic topological order. That means that your constant tasks (
1, 2, 2, 3
) MIGHT get executed before even
do_stuff
, simply because the system identifies that they have no upstream blockers. Obviously in this example it doesn’t matter but just wanted to call it out as an implementation detail for your real-world cases!
b
Brett Naul
11/11/2019, 6:50 PM
👍 should be fine here but good to keep in mind, thanks!
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.