Manuel Mourato
08/25/2020, 2:10 PMfrom prefect.tasks.shell import ShellTask
from prefect.engine.results import LocalResult
from datetime import timedelta
from default_task_handler import tasks_notifications_handler
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
a=ShellTask(max_retries=3, retry_delay=timedelta(minutes=60), timeout=1800,
state_handlers=[tasks_notifications_handler],checkpoint=True, result=LocalResult(dir="/home/my-user/weekly_execution"),
command="ls /home/my-user/")
a.run()
The task runs, and the weekly_execution
directory is created, but nothing is persisted.
What am I doing wrong?
Is it mandatory that the task be part of a flow?
Thank you
UPDATE
Indeed if I run the task inside a flow, checkpointing works.
Is there a way to do it for individual tasks?Jim Crist-Harif
08/25/2020, 2:57 PMrun
on a task only runs the task's execution code (the bit that actually does work), the rest of the prefect machinery is configured on a task but execution of it is done by the TaskRunner
. If you want to handle checkpointing on a task, you should be able to wrap a task with a TaskRunner
and call the run
method of the runner. Something like:
from prefect import task
from prefect.engine import TaskRunner
from prefect.engine.results import LocalResult
@task(result=LocalResult(location="test.prefect"))
def test():
print("Hello!")
return 1
runner = TaskRunner(test)
runner.run()
Manuel Mourato
08/25/2020, 4:03 PM