https://prefect.io logo
Title
s

Scott Brownlie

01/14/2020, 9:22 PM
Hello folks, I've just started researching Prefect as a possible alternative to Luigi and Airflow. One thing I like about Luigi is the ability to save the output of a task to disk, to be used by another task downstream. It appears that Prefect's
LocalResultsHandler
does a similar thing, however I can't seem to get it to work as I expected. I have implemented the following toy example:
from prefect.engine.result_handlers import LocalResultHandler
from prefect import task, Task, Flow

results_dir = 'prefect_results/'

@task(checkpoint=True)
def set_input():
    return 10

@task(checkpoint=True)
def square(x):
    return x**2

with Flow("test", result_handler=LocalResultHandler(dir=results_dir)) as flow:
    task1 = set_input()
    task2 = square(task1)

flow.run()
I would expect the output from each task to be saved to the specified directory automatically but it's not. Is that not what is supposed to happen?
c

Chris White

01/14/2020, 9:31 PM
Hi @Scott Brownlie - if you set the environment variable
PREFECT__FLOWS__CHECKPOINTING=true
it will work as you expect; this behavior will change when we release 0.9.0 so that the default behavior is to checkpoint anytime a result handler is present!
s

Scott Brownlie

01/15/2020, 12:23 PM
Hello @Chris White Thanks for your reply. That worked, it's now saving the task outputs. When you save the output of a task in Luigi, if you then run the flow again it will not re-run tasks whose outputs are already on disk, it will simply load the task outputs to be used downstream. In Prefect, every time I run the flow it appears to re-run the tasks and save a new version of the outputs, using the current timestamp to construct the filename. Ideally, I would only like the tasks to be re-run if the parameter values change, otherwise the task output should be loaded from disk. Is this possible with Prefect?
c

Chris White

01/15/2020, 4:12 PM
Hey @Scott Brownlie yup this is certainly possible; a few notes about the differences between Prefect and Luigi: - Prefect data exchange between tasks actually occurs _in memory_; the saving / loading of task results from disk is for tracking and recovering from failure - we call the feature you are describing “caching”; it is described more fully here: https://docs.prefect.io/core/concepts/persistence.html#output-caching Note that if you are using Prefect Core without Prefect Cloud, the cache is only stored in memory so you’ll need to take extra steps to restore it within each new process (we are currently working on a way around this)
s

Scott Brownlie

01/15/2020, 4:25 PM
thanks @Chris White The problem with storing task outputs in memory is that I'm working with large datasets and the memory would run out pretty quickly.
I read the documentation but it's still not 100% clear to me. There are use cases other than tracking and recovering from failure where I think that saving / loading of task outputs is useful. For example, I may run a cross-validation pipeline with a certain set of model hyperparameters. This pipeline would include the data pre-processing. If I want to validate a new set of model hyperparameters tomorrow I don't want to have to run the pre-processing tasks again, only the model training and validation tasks. I would like the pipeline engine to know that the pre-processing has already been run, and that the model hyperparameters don't affect this, and simply load the pre-processed data from disk. I'm not quite seeing how I can do this elegantly in Prefect.
c

Chris White

01/15/2020, 4:48 PM
To be clear, Prefect Cloud is our recommended solution for managing Flows across multiple processes so you will need to introduce a few hooks yourself for this to completely work as you expect; in particular you’ll need to save / load the full cache to disk for each process in which you run your flow. That being said, your use case is a perfect one for caching. In your case it sounds like you want your data preprocessing tasks cached. To enable this, first choose a time horizon on which you want the cache to be considered valid, and also choose a cache validator - in your case, if you implement your hyperparameters as Prefect Parameters then you might use the
all_parameters
cache validator. Next, provide the values you chose above to the initialization of each of the pre-processing tasks as described in the above doc. On each run, they will now look at the available cached states (which are stored in memory if you run with Prefect Core alone) and decide whether a valid cached state is available or not. If one is available, the task will not rerun and instead return the value from its most previous cached run. For a silly but easy example, check out: https://docs.prefect.io/core/examples/cached_task.html