Maxime Lavoie
04/20/2020, 9:43 PMKyle Moon-Wright
04/20/2020, 10:08 PMflow.add_task(big_task)
, you would only add one task to the flow despite three tasks existing in big_task
.Maxime Lavoie
04/21/2020, 5:45 AMclass DTask(Task):
def __init__(self):
...
def run(self):
a = ATask().run()
b = BTask(a).run()
c = CTask(b).run()
return c
emre
04/21/2020, 8:18 AM@task
def a(x):
return 2 * x
@task
def b(x):
return x - 1
@task
def c(x):
return x / 2
@task
def e():
return 5
def abc(upstream):
a_res = a(upstream)
b_res = b(a_res)
return c(b_res)
with Flow('test') as f:
upstream = e()
abc(upstream)
f.run()
This produces E->A->B->C
, and it seems downstream tasks can be added to C
via the returned value of abc(upstream)
Jeremiah
04/21/2020, 1:13 PMA →B →C
in your flow, we’d recommend using a function to abstract the complexities of creating three tasks away (Prefect’s ifelse
is a function that generates a few tasks for you, for example)Maxime Lavoie
04/21/2020, 2:44 PMEdidiong Etuk
04/21/2020, 5:17 PM