This seems quite hard. I would actually advise to use the for-loop in the functional API to add the tasks to the Flow if you can?
👀 1
c
Chris L.
01/24/2022, 4:18 AM
Ah of course. I could just use task mapping.
k
Kevin Kho
01/24/2022, 4:20 AM
Yes but also you can do something like this:
Copy code
from prefect import Flow, task
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
🙌 1
c
Chris L.
01/24/2022, 4:21 AM
Didn't know that was possible! Thanks :O
k
Kevin Kho
01/24/2022, 4:23 AM
Only if the
inputs
is a pre-defined list. If it’s dynamic and the result of a Task, then you can’t loop over it because the value doesnt exist when the DAG is built
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.