m

    Manuel Mourato

    2 years ago
    Hello all Apologies if this is a basic question, but I am trying to checkpoint the output of a task, like so:
    from prefect.tasks.shell import ShellTask
    from prefect.engine.results import LocalResult
    from datetime import timedelta
    from default_task_handler import tasks_notifications_handler
    import os
    
    os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
    
    a=ShellTask(max_retries=3, retry_delay=timedelta(minutes=60), timeout=1800,
                                    state_handlers=[tasks_notifications_handler],checkpoint=True, result=LocalResult(dir="/home/my-user/weekly_execution"),
                                    command="ls /home/my-user/")
    a.run()
    The task runs, and the
    weekly_execution
    directory is created, but nothing is persisted. What am I doing wrong? Is it mandatory that the task be part of a flow? Thank you UPDATE Indeed if I run the task inside a flow, checkpointing works. Is there a way to do it for individual tasks?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi Manuel, calling
    run
    on a task only runs the task's execution code (the bit that actually does work), the rest of the prefect machinery is configured on a task but execution of it is done by the
    TaskRunner
    . If you want to handle checkpointing on a task, you should be able to wrap a task with a
    TaskRunner
    and call the
    run
    method of the runner. Something like:
    from prefect import task
    from prefect.engine import TaskRunner
    from prefect.engine.results import LocalResult
    
    @task(result=LocalResult(location="test.prefect"))
    def test():
        print("Hello!")
        return 1
    
    runner = TaskRunner(test)
    runner.run()
    m

    Manuel Mourato

    2 years ago
    Worked like a charm, thank you! 🎉