I want to define a task that stores results before...
# ask-community
p
I want to define a task that stores results before running it inside a flow, like this:
Copy code
import prefect as pf
from prefect.engine.results import LocalResult

@pf.task(checkpoint=True, result=LocalResult(dir=path_to_result)
def my_task(a, b, c):
    return do_stuff(a, b, c)

with pf.Flow('my flow') as flow:
    my_task(1, 2, 'foo')  # <--- I want to be able to specify path_to_result here

flow.run()
What's the right way to structure this to specify
path_to_result
at the indicated location?
k
Hey @Peyton Murray, I think you need to use the Task class like:
Copy code
from prefect import task, Flow, Task
from prefect.engine.results import LocalResult

class MyTask(Task):
    def __init__(self, a, b, c):
        self.a = a
        self.b = b
        super().__init__(result=LocalResult(c))

    def run(self):
        return self.a + self.b

with Flow('my flow') as flow:
    abc = MyTask(1, 2, 'foo')()  # <--- I want to be able to specify path_to_result here
    
flow.run()
p
Thanks @Kevin Kho - and what if my task returns more than one value in a tuple? Do I need to subclass
*prefect.engine.serializers.Serializer*()
? I have a task which returns train and test dataframes and I'd like to write them to two separate local files.
k
I am not sure this can work with the result and serializers. I tried using the
PandasSerializer
and it doesn’t work because the PandasSerializer expects a dataframe and calls
to_whatever
on it. It doesn’t expect a tuple of DataFrames. I suggest you just use
pd.read_csv()
or what you need inside the task and load it in the subsequent tasks to support the
nout=2
. I assume you tried using the
PandasSerializer
and got an error right?
p
Hmm, okay, thanks for the help - and no, it doesn't look like a single
PandasSerializer
will work. I was hoping to be able to do something like this to have two results cached (and targeted) for a single task:
Copy code
from prefect import Task
from prefect.engine.results import LocalResult
from prefect.engine.serializers import PandasSerializer
import pandas as pd


class MyTask(Task):

    def __init__(self, a, b, storage_path_a, storage_path_b):
        self.a = a
        self.b = b
        self.storage_path_a = storage_path_a
        self.storage_path_b = storage_path_b
        super().__init__(
            checkpoint=True,
            result=[
                LocalResult(location=storage_path_a, serializer=PandasSerializer('csv')),
                LocalResult(location=storage_path_b, serializer=PandasSerializer('csv')),
            ],
            target=[
                storage_path_a,
                storage_path_b,
            ]
        )

    def run(self):
        return pd.DataFrame(self.a), pd.DataFrame(self.b)