Brad
09/14/2021, 2:55 AMCached
despite not being complete. I wonder if this is the expected result or a bug. Example in theadBrad
09/14/2021, 2:56 AMimport os
import shutil
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.signals import LOOP
def make_filename(**kwargs):
idx = kwargs['task_loop_count'] if 'task_loop_count' in kwargs else 1
return f'test-{idx}.prefect'
@task(result=LocalResult(dir='./results'), target=make_filename, checkpoint=True)
def loop_test(count=2):
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)\
print(f"Running for {n=}")
if n >= count:
return n
raise LOOP(f'Iteration {n}', result=dict(n=n+1))
with Flow("test") as flow:
count = Parameter("count")
x = loop_test(count=count)
# Cleanup cache
if os.path.exists("./results"):
shutil.rmtree("./results")
# Simulate stopping halfway
flow.run(count=3)
# Restart
flow.run(count=5)
And output logs:
flows/test.py
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Starting task run...
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Finished task run for task with final state: 'Success'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
Running for n=1
Running for n=2
Running for n=3
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Success'
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Starting task run...
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'count': Finished task run for task with final state: 'Success'
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
[2021-09-14 12:53:41+1000] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Cached'
[2021-09-14 12:53:41+1000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Brad
09/14/2021, 2:58 AMLoop
result that the task finds causes the whole task to be set to Cached
causing an early exitBrad
09/14/2021, 3:01 AMBrad
09/14/2021, 3:02 AMKevin Kho
Kevin Kho
Brad
09/15/2021, 10:07 PM