Fabrice Toussaint
08/11/2021, 11:57 AMValueError("Cycle found; flows must be acyclic!")
? Or to rephrase the question: "Why is it cyclic at all?"
Minimum example:
from prefect import Flow, apply_map, task
@task()
def do_something_one(item):
return item + 1
@task()
def do_something_two(item, item_one):
return item + item_one
@task()
def do_something_three(item_combination):
return item_combination[0] + item_combination[1]
# mapping function one
def do_something_with_one_two(item):
result_one = do_something_one(item)
result_two = do_something_two(item, result_one)
return (result_one, result_two)
# mapping function two
def do_something_with_three(item):
return do_something_three(item)
with Flow("test flow") as flow:
l = [1, 2, 3]
results = apply_map(do_something_with_one_two, l)
results_results = apply_map(do_something_with_three, results)
flow.run()
The visualization can be seen below.Fabrice Toussaint
08/11/2021, 12:08 PMfrom prefect import Flow, apply_map, task
@task()
def do_something_one(item):
return item + 1
@task()
def do_something_two(item, item_one):
print(item, item_one)
return item + item_one
@task()
def do_something_three(item_combination):
print(item_combination)
return item_combination[0] + item_combination[1]
# mapping function one
@task()
def do_something_with_one_two(item):
result_one = do_something_one.run(item)
result_two = do_something_two.run(item, result_one)
return (result_one, result_two)
# mapping function two
@task()
def do_something_with_three(item):
return do_something_three.run(item)
@task()
def print_results(r):
print(r)
return None
with Flow("test flow") as flow:
l = [1]
results = apply_map(do_something_with_one_two, l)
results_results = apply_map(do_something_with_three, results)
print_results(results_results)
flow.run()
By adding .run in the mapped tasks it works (and actually decorating them as a task)Kevin Kho
Fabrice Toussaint
08/12/2021, 9:13 AMKevin Kho
Kevin Kho
Marvin
08/12/2021, 4:17 PM