Hello all Apologies if this is a basic question, b...
# prefect-community
m
Hello all Apologies if this is a basic question, but I am trying to checkpoint the output of a task, like so:
Copy code
from prefect.tasks.shell import ShellTask
from prefect.engine.results import LocalResult
from datetime import timedelta
from default_task_handler import tasks_notifications_handler
import os

os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"

a=ShellTask(max_retries=3, retry_delay=timedelta(minutes=60), timeout=1800,
                                state_handlers=[tasks_notifications_handler],checkpoint=True, result=LocalResult(dir="/home/my-user/weekly_execution"),
                                command="ls /home/my-user/")
a.run()
The task runs, and the
weekly_execution
directory is created, but nothing is persisted. What am I doing wrong? Is it mandatory that the task be part of a flow? Thank you UPDATE Indeed if I run the task inside a flow, checkpointing works. Is there a way to do it for individual tasks?
j
Hi Manuel, calling
run
on a task only runs the task's execution code (the bit that actually does work), the rest of the prefect machinery is configured on a task but execution of it is done by the
TaskRunner
. If you want to handle checkpointing on a task, you should be able to wrap a task with a
TaskRunner
and call the
run
method of the runner. Something like:
Copy code
from prefect import task
from prefect.engine import TaskRunner
from prefect.engine.results import LocalResult

@task(result=LocalResult(location="test.prefect"))
def test():
    print("Hello!")
    return 1

runner = TaskRunner(test)
runner.run()
m
Worked like a charm, thank you! 🎉