Armin
03/27/2020, 8:50 AMclass SomeTask(Task):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# SomeTask specific init code
def run(self, input):
output = process(input)
return output
And flow:
some_task = SomeTask(checkpoint=True, output_handler=LocalResultHandler(dir=cache_location))
def some_flow():
with Flow():
output_handler(input)
if __name__ == '__main__':
some_flow()
The cache_location
remains empty and on multiple runs of the flow (with same input to some_task
) the result is recomputed every time. Could somebody please suggest what am I missing?Chris White
03/27/2020, 2:16 PMArmin
03/27/2020, 4:07 PMChris White
03/27/2020, 4:38 PMwith Flow("you-need-a-name") as flow:
output_handler(input)
if __name__ == "__main__":
flow.run()
Armin
03/27/2020, 4:50 PMsome_task = SomeTask(checkpoint=True, output_handler=LocalResultHandler(dir=cache_location))
def some_flow():
with Flow() as flow:
output_handler(input)
flow.run()
if __name__ == '__main__':
some_flow()
Chris White
03/27/2020, 4:52 PMsome_task
task to your Flow?Armin
03/27/2020, 4:54 PMsome_task = SomeTask(checkpoint=True, output_handler=LocalResultHandler(dir=cache_location))
def some_flow():
with Flow() as flow:
some_task(input)
flow.run()
if __name__ == '__main__':
some_flow()
Chris White
03/27/2020, 5:07 PMPREFECT__FLOWS___CHECKPOINTING=true
Armin
03/28/2020, 5:34 AM