Thread
#prefect-community
    Peyton Murray

    Peyton Murray

    1 year ago
    I want to define a task that stores results before running it inside a flow, like this:
    import prefect as pf
    from prefect.engine.results import LocalResult
    
    @pf.task(checkpoint=True, result=LocalResult(dir=path_to_result)
    def my_task(a, b, c):
        return do_stuff(a, b, c)
    
    with pf.Flow('my flow') as flow:
        my_task(1, 2, 'foo')  # <--- I want to be able to specify path_to_result here
    
    flow.run()
    What's the right way to structure this to specify
    path_to_result
    at the indicated location?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Peyton Murray, I think you need to use the Task class like:
    from prefect import task, Flow, Task
    from prefect.engine.results import LocalResult
    
    class MyTask(Task):
        def __init__(self, a, b, c):
            self.a = a
            self.b = b
            super().__init__(result=LocalResult(c))
    
        def run(self):
            return self.a + self.b
    
    with Flow('my flow') as flow:
        abc = MyTask(1, 2, 'foo')()  # <--- I want to be able to specify path_to_result here
        
    flow.run()
    Peyton Murray

    Peyton Murray

    1 year ago
    Thanks @Kevin Kho - and what if my task returns more than one value in a tuple? Do I need to subclass
    *prefect.engine.serializers.Serializer*()
    ? I have a task which returns train and test dataframes and I'd like to write them to two separate local files.
    Kevin Kho

    Kevin Kho

    1 year ago
    I am not sure this can work with the result and serializers. I tried using the
    PandasSerializer
    and it doesn’t work because the PandasSerializer expects a dataframe and calls
    to_whatever
    on it. It doesn’t expect a tuple of DataFrames. I suggest you just use
    pd.read_csv()
    or what you need inside the task and load it in the subsequent tasks to support the
    nout=2
    . I assume you tried using the
    PandasSerializer
    and got an error right?
    Peyton Murray

    Peyton Murray

    1 year ago
    Hmm, okay, thanks for the help - and no, it doesn't look like a single
    PandasSerializer
    will work. I was hoping to be able to do something like this to have two results cached (and targeted) for a single task:
    from prefect import Task
    from prefect.engine.results import LocalResult
    from prefect.engine.serializers import PandasSerializer
    import pandas as pd
    
    
    class MyTask(Task):
    
        def __init__(self, a, b, storage_path_a, storage_path_b):
            self.a = a
            self.b = b
            self.storage_path_a = storage_path_a
            self.storage_path_b = storage_path_b
            super().__init__(
                checkpoint=True,
                result=[
                    LocalResult(location=storage_path_a, serializer=PandasSerializer('csv')),
                    LocalResult(location=storage_path_b, serializer=PandasSerializer('csv')),
                ],
                target=[
                    storage_path_a,
                    storage_path_b,
                ]
            )
    
        def run(self):
            return pd.DataFrame(self.a), pd.DataFrame(self.b)