Chris L.
01/24/2022, 3:52 AMResourceManager().setup()
to my tasks and also have cleanup. Thanks!Kevin Kho
Chris L.
01/24/2022, 4:18 AMKevin Kho
from prefect import Flow, task
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
Chris L.
01/24/2022, 4:21 AMKevin Kho
inputs
is a pre-defined list. If it’s dynamic and the result of a Task, then you can’t loop over it because the value doesnt exist when the DAG is built