https://prefect.io logo
Title
m

Matt Lampe

04/18/2023, 3:22 PM
Hi, any help would be greatly appreciated. I've been tasked with assisting upgrading Prefect 1.x code to Prefect 2, and for the most part the migration has been smooth. I have a flow that has a task that raises a LOOP signal, and I'm struggling to figure out how to properly handle/recreate this functionality in prefect 2. Is there a good way to do this same thing in Prefect 2?
raise LOOP(message=f"Status Run {n}={v_run_code}", result=dict(n=n + 1, code=v_run_code))
1
z

Zanie

04/18/2023, 3:52 PM
Hey! If you need to loop in Prefect 2 you can just write a loop using standard Python instead of raising a signal.
e.g.
for n in range(n):
   my_task(n)
or
n = 0
while my_task(n) is None:
    n += 1
in the second, you’d return
None
to indicate another iteration
j

Jason Damiani

04/18/2023, 3:57 PM
@Zanie in Prefect 1 I have a flow that uses LOOP signal within a task that is run as a mapped task. What would be the best option in Prefect 2? A nested loop? (I'm essentially using LOOP to dynamically spawn sequential tasks for the given mapped task)
z

Zanie

04/18/2023, 5:28 PM
Hm that’s a bit trickier. A loop should work though like
futures = my_task.map(range(100))
for i, future in enumerate(futures):
   if future.result() is None:
      my_task.submit(future, i + 1)
You’d need another loop that ensures everything is done too if you want multiple loops to happen
Can you share some more about your use-case? I haven’t seen someone loop in a mapped task like this before. cc @justabill / @Chris White
:thank-you: 1
j

Jason Damiani

04/18/2023, 5:49 PM
The use case is essentially API pagination. We map over a set of IDs, each mapped task run then paginates over api calls until a stop condition is achieved. Using LOOP, each individual API call can be retried using the standard prefect machinery
👍 2
z

Zanie

04/18/2023, 7:08 PM
I think in the current world that would be best implemented with concurrent subflows (for each id) then a loop executing the tasks
👍 1