https://prefect.io logo
Title
a

Aram Panasenco

03/03/2022, 6:16 PM
I used task looping in a really long flow that failed 6 hours in due to an obscure error. I was disappointed to see that when I restarted it from the Prefect Cloud UI, it restarted from the beginning rather than from the latest state. Is there a way to preserve "task_loop_result" context and use it in the task when manually restarting a failed flow?
k

Kevin Kho

03/03/2022, 6:17 PM
I see what you mean. Could you tell me more on the use case? I think there are a couple of ways to approach this
a

Aram Panasenco

03/03/2022, 6:19 PM
I'm running ~1,000 Snowflake benchmarks in a sequence, and using LOOP between them. Attached what my task activity looked like.
k

Kevin Kho

03/03/2022, 6:25 PM
I see so a lot of the context is just alive for the duration of the Flow run. Stuff like scheduled start time and the flow name and stuff persist, but this doesn’t. I would say there are two ways to tackle this. Create the DAG with a loop:
with Flow(...) as flow:
    tasks = []
    for i in range(10):
        x = task_a(input1[i], input2[i])
        tasks.append(x)
    
    for i in range(1,10):
        tasks[i].set_upstream(tasks[i-1])
or you can use the KV Store to persist the state of the loop and fetch it so you can restart from where you left off
👍 1
a

Aram Panasenco

03/03/2022, 6:35 PM
Thanks Kevin, creating the DAG programmatically probably makes more sense if this is what I really want to do. Is there a way to create a different DAG based on a parameter's value? I have a checkbox that users can currently select that determines whether all 800 tests run or just 8 for a quick sanity check.
k

Kevin Kho

03/03/2022, 6:48 PM
not quite, because you can’t loop over a task since that’s not a static DAG and a Parameter is a task.
a

Aram Panasenco

03/03/2022, 6:51 PM
I guess I could just SKIP tasks I don't want to run based on a parameter value, though I'm not sure how long 792 skips would take 😄
k

Kevin Kho

03/03/2022, 6:53 PM
Yeah you can do that with
prefect.context.get("parameters")
and then raise the condition for sure. Uhhh…I hope not too much but those are still API calls to update the state haha so maybe 1 second each hopefully?
a

Aram Panasenco

03/03/2022, 6:59 PM
Quick follow-up question: When I loop 800 times in one task, does that count as 1 task run or 800 task runs for billing purposes?
k

Kevin Kho

03/03/2022, 7:10 PM
I think it’s the 800 because they are first class tasks with state tracking and retries
Not restart retries, like task level retries 😅
a

Aram Panasenco

03/03/2022, 7:21 PM
Are you sure, because I just checked the flow run that had been looping for 6 hours in the API, and am only seeing one task run for the task that was looping. It must have gone through hundreds of loops, but I'm only seeing 10 task runs in the API.
k

Kevin Kho

03/03/2022, 7:22 PM
I am not sure. You may be right then because it has one task id. I can leave a message with the team to confirm
a

Aram Panasenco

03/03/2022, 7:26 PM
That'd be awesome to know, because running 10 task runs vs 810 task runs could make a substantial financial difference in the long term
k

Kevin Kho

03/03/2022, 7:26 PM
yep will get back to you when i find out
🙏 1
Looping is one task in billing
👍 2
a

Aram Panasenco

03/03/2022, 7:48 PM
Thanks again Kevin!