I used <task looping> in a really long flow that f...
# prefect-community
a
I used task looping in a really long flow that failed 6 hours in due to an obscure error. I was disappointed to see that when I restarted it from the Prefect Cloud UI, it restarted from the beginning rather than from the latest state. Is there a way to preserve "task_loop_result" context and use it in the task when manually restarting a failed flow?
k
I see what you mean. Could you tell me more on the use case? I think there are a couple of ways to approach this
a
I'm running ~1,000 Snowflake benchmarks in a sequence, and using LOOP between them. Attached what my task activity looked like.
k
I see so a lot of the context is just alive for the duration of the Flow run. Stuff like scheduled start time and the flow name and stuff persist, but this doesn’t. I would say there are two ways to tackle this. Create the DAG with a loop:
Copy code
with Flow(...) as flow:
    tasks = []
    for i in range(10):
        x = task_a(input1[i], input2[i])
        tasks.append(x)
    
    for i in range(1,10):
        tasks[i].set_upstream(tasks[i-1])
or you can use the KV Store to persist the state of the loop and fetch it so you can restart from where you left off
👍 1
a
Thanks Kevin, creating the DAG programmatically probably makes more sense if this is what I really want to do. Is there a way to create a different DAG based on a parameter's value? I have a checkbox that users can currently select that determines whether all 800 tests run or just 8 for a quick sanity check.
k
not quite, because you can’t loop over a task since that’s not a static DAG and a Parameter is a task.
a
I guess I could just SKIP tasks I don't want to run based on a parameter value, though I'm not sure how long 792 skips would take 😄
k
Yeah you can do that with
prefect.context.get("parameters")
and then raise the condition for sure. Uhhh…I hope not too much but those are still API calls to update the state haha so maybe 1 second each hopefully?
a
Quick follow-up question: When I loop 800 times in one task, does that count as 1 task run or 800 task runs for billing purposes?
k
I think it’s the 800 because they are first class tasks with state tracking and retries
Not restart retries, like task level retries 😅
a
Are you sure, because I just checked the flow run that had been looping for 6 hours in the API, and am only seeing one task run for the task that was looping. It must have gone through hundreds of loops, but I'm only seeing 10 task runs in the API.
k
I am not sure. You may be right then because it has one task id. I can leave a message with the team to confirm
a
That'd be awesome to know, because running 10 task runs vs 810 task runs could make a substantial financial difference in the long term
k
yep will get back to you when i find out
🙏 1
Looping is one task in billing
👍 2
a
Thanks again Kevin!