Thread
#prefect-community
    Aram Panasenco

    Aram Panasenco

    6 months ago
    I used task looping in a really long flow that failed 6 hours in due to an obscure error. I was disappointed to see that when I restarted it from the Prefect Cloud UI, it restarted from the beginning rather than from the latest state. Is there a way to preserve "task_loop_result" context and use it in the task when manually restarting a failed flow?
    Kevin Kho

    Kevin Kho

    6 months ago
    I see what you mean. Could you tell me more on the use case? I think there are a couple of ways to approach this
    Aram Panasenco

    Aram Panasenco

    6 months ago
    I'm running ~1,000 Snowflake benchmarks in a sequence, and using LOOP between them. Attached what my task activity looked like.
    Kevin Kho

    Kevin Kho

    6 months ago
    I see so a lot of the context is just alive for the duration of the Flow run. Stuff like scheduled start time and the flow name and stuff persist, but this doesn’t. I would say there are two ways to tackle this. Create the DAG with a loop:
    with Flow(...) as flow:
        tasks = []
        for i in range(10):
            x = task_a(input1[i], input2[i])
            tasks.append(x)
        
        for i in range(1,10):
            tasks[i].set_upstream(tasks[i-1])
    or you can use the KV Store to persist the state of the loop and fetch it so you can restart from where you left off
    Aram Panasenco

    Aram Panasenco

    6 months ago
    Thanks Kevin, creating the DAG programmatically probably makes more sense if this is what I really want to do. Is there a way to create a different DAG based on a parameter's value? I have a checkbox that users can currently select that determines whether all 800 tests run or just 8 for a quick sanity check.
    Kevin Kho

    Kevin Kho

    6 months ago
    not quite, because you can’t loop over a task since that’s not a static DAG and a Parameter is a task.
    Aram Panasenco

    Aram Panasenco

    6 months ago
    I guess I could just SKIP tasks I don't want to run based on a parameter value, though I'm not sure how long 792 skips would take 😄
    Kevin Kho

    Kevin Kho

    6 months ago
    Yeah you can do that with
    prefect.context.get("parameters")
    and then raise the condition for sure. Uhhh…I hope not too much but those are still API calls to update the state haha so maybe 1 second each hopefully?
    Aram Panasenco

    Aram Panasenco

    6 months ago
    Quick follow-up question: When I loop 800 times in one task, does that count as 1 task run or 800 task runs for billing purposes?
    Kevin Kho

    Kevin Kho

    6 months ago
    I think it’s the 800 because they are first class tasks with state tracking and retries
    Not restart retries, like task level retries 😅
    Aram Panasenco

    Aram Panasenco

    6 months ago
    Are you sure, because I just checked the flow run that had been looping for 6 hours in the API, and am only seeing one task run for the task that was looping. It must have gone through hundreds of loops, but I'm only seeing 10 task runs in the API.
    Kevin Kho

    Kevin Kho

    6 months ago
    I am not sure. You may be right then because it has one task id. I can leave a message with the team to confirm
    Aram Panasenco

    Aram Panasenco

    6 months ago
    That'd be awesome to know, because running 10 task runs vs 810 task runs could make a substantial financial difference in the long term
    Kevin Kho

    Kevin Kho

    6 months ago
    yep will get back to you when i find out
    Looping is one task in billing
    Aram Panasenco

    Aram Panasenco

    6 months ago
    Thanks again Kevin!