Hello again - I am running into issues running `St...
# ask-community
i
Hello again - I am running into issues running
StartFlowRun
in a
task
. Where the flow inside the task only runs once. In the example below I would expect it to run three times. Using
Copy code
state = adder.run(parameters=dict(num_lst=num_lst))
# # Get results
param_tsk = adder.get_tasks("add_num")
num_lst = state.result[param_tsk[0]]._result.value
Works as expected. Any thoughts on how to run a Flow in a loop with Cloud? Thanks!
k
Hey @itay livni, Would you mind putting your code into a thread to allow for visibility from other questions?
In regards to best steps here: Prefect uses idempotency keys based on task run ID within your task which prevents duplicate runs, so it may be best to create an idempotency key yourself and pass it to the StartFlowRun task’s run method (maybe adding a counter like
idempotency_key=iteration_counter
) to achieve what you’re after.
i
@Kyle Moon-Wright Here is the code. I added the
idempotency_key
to the run method but still no luck. Still unclear where `task_run_id={iteration_counter}`belongs. Thanks again
Copy code
import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.signals import LOOP

@task
def add_num(num_lst):
    return [num_lst[0], sum(num_lst)]

# Flow that gets looped over
with Flow("adder") as adder:
    num_lst = Parameter("num_lst")
    new_lst = add_num(num_lst)

@task
def loop_flow(
    my_num,
    max_loops=3
    ):

    # Has to be set because prefect.context has no "task_loop_count" 
    # until the second loop
    task_loop_count = prefect.context.get("task_loop_count", 1)

    # Get results from 
    loop_payload = prefect.context.get("task_loop_result", {})
    num_lst = loop_payload.get("num_lst", [my_num])

    # state = adder.run(parameters=dict(num_lst=num_lst))
    # # Get results
    # param_tsk = adder.get_tasks("add_num")
    # num_lst = state.result[param_tsk[0]]._result.value

    addNum = StartFlowRun(
        flow_name='adder', 
        project_name='lmap',
        wait=True
        )

    res_msg = addNum.run(
        parameters=dict(num_lst=num_lst),
        idempotency_key=str(task_loop_count)
        # task_run_id={task_loop_count}
    )
    

    print(task_loop_count)
    if (task_loop_count == max_loops):
        print(num_lst)
        return num_lst  # return statements end the loop

    raise LOOP(message="Loop count {task_run_count}", result=dict(num_lst=num_lst))


with Flow("outer-flow") as outer_fl:
    num = Parameter("num", default=1)
    num_lst = loop_flow(num)

# Register 
adder.register(
    project_name="lmap", labels=["make-lmap"],
    build=True   
)

outer_fl.run()
k
Right, so passing something like an
iteration_counter
to the
idempotency_key
kwarg in the run method (I just realized the snippet I made was confusing so I fixed the code above). Also you will need to increment that counter in your LOOP signal to make the next pass, or AFAIK the next iteration will never occur since it occurs in your child flow.
i
@Kyle Moon-Wright Thank you