itay livni
02/22/2021, 9:12 PMStartFlowRun
in a task
. Where the flow inside the task only runs once. In the example below I would expect it to run three times.
Using
state = adder.run(parameters=dict(num_lst=num_lst))
# # Get results
param_tsk = adder.get_tasks("add_num")
num_lst = state.result[param_tsk[0]]._result.value
Works as expected. Any thoughts on how to run a Flow in a loop with Cloud? Thanks!Kyle Moon-Wright
02/22/2021, 10:26 PMKyle Moon-Wright
02/22/2021, 10:26 PMidempotency_key=iteration_counter
) to achieve what you’re after.itay livni
02/22/2021, 11:20 PMidempotency_key
to the run method but still no luck. Still unclear where `task_run_id={iteration_counter}`belongs. Thanks again
import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.signals import LOOP
@task
def add_num(num_lst):
return [num_lst[0], sum(num_lst)]
# Flow that gets looped over
with Flow("adder") as adder:
num_lst = Parameter("num_lst")
new_lst = add_num(num_lst)
@task
def loop_flow(
my_num,
max_loops=3
):
# Has to be set because prefect.context has no "task_loop_count"
# until the second loop
task_loop_count = prefect.context.get("task_loop_count", 1)
# Get results from
loop_payload = prefect.context.get("task_loop_result", {})
num_lst = loop_payload.get("num_lst", [my_num])
# state = adder.run(parameters=dict(num_lst=num_lst))
# # Get results
# param_tsk = adder.get_tasks("add_num")
# num_lst = state.result[param_tsk[0]]._result.value
addNum = StartFlowRun(
flow_name='adder',
project_name='lmap',
wait=True
)
res_msg = addNum.run(
parameters=dict(num_lst=num_lst),
idempotency_key=str(task_loop_count)
# task_run_id={task_loop_count}
)
print(task_loop_count)
if (task_loop_count == max_loops):
print(num_lst)
return num_lst # return statements end the loop
raise LOOP(message="Loop count {task_run_count}", result=dict(num_lst=num_lst))
with Flow("outer-flow") as outer_fl:
num = Parameter("num", default=1)
num_lst = loop_flow(num)
# Register
adder.register(
project_name="lmap", labels=["make-lmap"],
build=True
)
outer_fl.run()
Kyle Moon-Wright
02/23/2021, 12:04 AMiteration_counter
to the idempotency_key
kwarg in the run method (I just realized the snippet I made was confusing so I fixed the code above). Also you will need to increment that counter in your LOOP signal to make the next pass, or AFAIK the next iteration will never occur since it occurs in your child flow.itay livni
02/23/2021, 3:15 AM