Manuel Mourato

08/25/2020, 2:10 PM
Hello all Apologies if this is a basic question, but I am trying to checkpoint the output of a task, like so:
from import ShellTask
from prefect.engine.results import LocalResult
from datetime import timedelta
from default_task_handler import tasks_notifications_handler
import os

os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"

a=ShellTask(max_retries=3, retry_delay=timedelta(minutes=60), timeout=1800,
                                state_handlers=[tasks_notifications_handler],checkpoint=True, result=LocalResult(dir="/home/my-user/weekly_execution"),
                                command="ls /home/my-user/")
The task runs, and the
directory is created, but nothing is persisted. What am I doing wrong? Is it mandatory that the task be part of a flow? Thank you UPDATE Indeed if I run the task inside a flow, checkpointing works. Is there a way to do it for individual tasks?

Jim Crist-Harif

08/25/2020, 2:57 PM
Hi Manuel, calling
on a task only runs the task's execution code (the bit that actually does work), the rest of the prefect machinery is configured on a task but execution of it is done by the
. If you want to handle checkpointing on a task, you should be able to wrap a task with a
and call the
method of the runner. Something like:
from prefect import task
from prefect.engine import TaskRunner
from prefect.engine.results import LocalResult

def test():
    return 1

runner = TaskRunner(test)

Manuel Mourato

08/25/2020, 4:03 PM
Worked like a charm, thank you! 🎉