https://prefect.io logo
Title
a

Armin

03/27/2020, 8:50 AM
Hi, I'm having trouble checkpointing a task. Given a task definition:
class SomeTask(Task):
 def __init__(self, **kwargs):
  super().__init__(**kwargs)
  # SomeTask specific init code
 def run(self, input):
  output = process(input)
  return output
And flow:
some_task = SomeTask(checkpoint=True, output_handler=LocalResultHandler(dir=cache_location))

def some_flow():
 with Flow():
  output_handler(input)

if __name__ == '__main__':
 some_flow()
The
cache_location
remains empty and on multiple runs of the flow (with same input to
some_task
) the result is recomputed every time. Could somebody please suggest what am I missing?
c

Chris White

03/27/2020, 2:16 PM
What version of Prefect are you running?
a

Armin

03/27/2020, 4:07 PM
0.9.7
c

Chris White

03/27/2020, 4:38 PM
oh sorry, it looks like you aren’t actually running your flow, you’re only building it. You should instead do:
with Flow("you-need-a-name") as flow:
    output_handler(input)

if __name__ == "__main__":
    flow.run()
a

Armin

03/27/2020, 4:50 PM
My bad, I forgot to add the flow.run() part
this is what I was actually trying to run:
some_task = SomeTask(checkpoint=True, output_handler=LocalResultHandler(dir=cache_location))
def some_flow():
 with Flow() as flow:
  output_handler(input)
 flow.run()
if __name__ == '__main__':
 some_flow()
c

Chris White

03/27/2020, 4:52 PM
Did you add the
some_task
task to your Flow?
a

Armin

03/27/2020, 4:54 PM
yes, sorry I'm having trouble abstracting away my source code
some_task = SomeTask(checkpoint=True, output_handler=LocalResultHandler(dir=cache_location))
def some_flow():
 with Flow() as flow:
  some_task(input)
 flow.run()
if __name__ == '__main__':
 some_flow()
I'm certain the code executes, because it logs the inside data loading loop and the subsiquent analysis tasks complete as intended, the only issues are that the task doesn't save anything to the cached location and it executes the data loading steps upon each run.
c

Chris White

03/27/2020, 5:07 PM
hmmm gotcha
oh my apologies, I think this was incorrectly removed from the documentation: you need to set an environment variable / configuration setting to `true`: https://github.com/PrefectHQ/prefect/blob/0.9.7/src/prefect/config.toml#L58 or
PREFECT__FLOWS___CHECKPOINTING=true
a

Armin

03/28/2020, 5:34 AM
Alright, so now I managed to save the checkpoint, but how do I make sure that on successive runs the cache is retrieved?