Hi team - following on from <https://github.com/Pr...
# ask-community
b
Hi team - following on from https://github.com/PrefectHQ/prefect/issues/3619, I'm seeing a looping task exit early with
Cached
despite not being complete. I wonder if this is the expected result or a bug. Example in thead
Example
Copy code
import os
import shutil

import prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.signals import LOOP


def make_filename(**kwargs):
    idx = kwargs['task_loop_count'] if 'task_loop_count' in kwargs else 1
    return f'test-{idx}.prefect'


@task(result=LocalResult(dir='./results'), target=make_filename, checkpoint=True)
def loop_test(count=2):
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)\

    print(f"Running for {n=}")

    if n >= count:
        return n

    raise LOOP(f'Iteration {n}', result=dict(n=n+1))


with Flow("test") as flow:
    count = Parameter("count")
    x = loop_test(count=count)

# Cleanup cache
if os.path.exists("./results"):
    shutil.rmtree("./results")

# Simulate stopping halfway
flow.run(count=3)

# Restart
flow.run(count=5)
And output logs:
Copy code
flows/test.py
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Starting task run...
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Finished task run for task with final state: 'Success'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
Running for n=1
Running for n=2
Running for n=3
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Success'
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Starting task run...
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Finished task run for task with final state: 'Success'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Cached'
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Here I'm trying to simulate a task that might read from an API with an unknown end - and I would like to persist interim results incase of fallover, but when I restart the same run, the first
Loop
result that the task finds causes the whole task to be set to
Cached
causing an early exit
My desired outcome here is that the looping subtasks are recognised as cached - but not the parent task until it actually succeeds
(however if this behaviour is expected, I will find another way to do what I want, just wanted to confirm)
k
Oh sorry forgot to respond Brad. I'll take a look at this later, but on first glance, this looks expected given the issue you linked yep.
Do you think you can maybe implement this logic with the KV Store to store the caching information?
b
Possibly - but I'm leveraging the state information for other things so I'd prefer to use that