Tsang Yong
11/07/2019, 7:57 PMChris White
Brett Naul
11/11/2019, 6:24 PMifelse
at once? the following runs both add
tasks and only the List
is skipped
@task
def add(x, y):
print(x, y)
return x + y
with Flow('flow') as flow:
do_stuff = Parameter('do_stuff')
ifelse(do_stuff, [add(1, 2), add(2,3)], None)
curious if this was what @Tsang Yong was dealing with as well (maybe w/ a shared upstream dependency for all of the tasks that gets skipped by the ifelse?)Jeremiah
add
tasks are actually upstream of the list
task, and therefore they aren’t gated by the ifelse
.Task()
will do it) as the argument to ifelse
, and have your addition tasks all sit downstream of it.ifelse
check will gate their execution via the pass-through task.Brett Naul
11/11/2019, 6:44 PMfrom typing import List
from prefect import Task
def do_if(condition: Task, *tasks) -> None:
flag = Task()
ifelse(condition, flag, None)
for task in tasks:
task.set_upstream(flag)
@task
def add(x, y):
print(x, y)
return x + y
with Flow('flow') as flow:
do_stuff = Parameter('do_stuff')
total_1 = add(1, 2)
total_2 = add(2, 3)
do_if(do_stuff, total_1, total_2)
state = flow.run(parameters={'do_stuff': False})
Jeremiah
add
. Prefect executes tasks in a non-deterministic topological order. That means that your constant tasks (1, 2, 2, 3
) MIGHT get executed before even do_stuff
, simply because the system identifies that they have no upstream blockers. Obviously in this example it doesn’t matter but just wanted to call it out as an implementation detail for your real-world cases!Brett Naul
11/11/2019, 6:50 PM