Zach
07/07/2020, 6:16 PMifelse
fn in prefect.
I have a task, we can call it Task A, that queries my database for a value and returns it. If Task A returns some value, then that value should get passed to Task B, whose output gets passed to Task C, whose output goes to Task D.
But if Task A returns None
, then I don't want Task B/C/D to run.
Right now I have it set up like this:
TASK_A = query_database_task()
RUN_TASKS_B_C_d = dummy_task() # task does nothing
ifelse()is_null(TASK_A), RUN_TASKS_B_C_d, None)
TASK_B = some_task_b(TASK_A)
TASK_B.set_upstream(RUN_TASKS_B_C_d)
TASK_C = some_task_b(TASK_B)
TASK_C.set_upstream(RUN_TASKS_B_C_d)
TASK_D = some_task_b(TASK_C)
TASK_D.set_upstream(RUN_TASKS_B_C_d)
There has to be a better way to do thisZachary Hughes
07/07/2020, 6:31 PMJim Crist-Harif
07/07/2020, 6:31 PMfrom prefect import task, case, Flow
@task
def task_a():
return None
@task
def is_null(a):
return a is None
@task
def task_b(a):
return a + 1
@task
def task_c(b):
return b + 1
@task
def task_d(c):
return c + 1
with Flow("example") as flow:
a = task_a()
with case(is_null(a), False):
b = task_b(a)
c = task_c(b)
d = task_d(c)
flow.run()
Zach
07/07/2020, 7:52 PM