What's the right way to structure this to specify
import prefect as pf from prefect.engine.results import LocalResult @pf.task(checkpoint=True, result=LocalResult(dir=path_to_result) def my_task(a, b, c): return do_stuff(a, b, c) with pf.Flow('my flow') as flow: my_task(1, 2, 'foo') # <--- I want to be able to specify path_to_result here flow.run()
at the indicated location?
from prefect import task, Flow, Task from prefect.engine.results import LocalResult class MyTask(Task): def __init__(self, a, b, c): self.a = a self.b = b super().__init__(result=LocalResult(c)) def run(self): return self.a + self.b with Flow('my flow') as flow: abc = MyTask(1, 2, 'foo')() # <--- I want to be able to specify path_to_result here flow.run()
? I have a task which returns train and test dataframes and I'd like to write them to two separate local files.
and it doesn’t work because the PandasSerializer expects a dataframe and calls
on it. It doesn’t expect a tuple of DataFrames. I suggest you just use
or what you need inside the task and load it in the subsequent tasks to support the
. I assume you tried using the
and got an error right?
will work. I was hoping to be able to do something like this to have two results cached (and targeted) for a single task:
from prefect import Task from prefect.engine.results import LocalResult from prefect.engine.serializers import PandasSerializer import pandas as pd class MyTask(Task): def __init__(self, a, b, storage_path_a, storage_path_b): self.a = a self.b = b self.storage_path_a = storage_path_a self.storage_path_b = storage_path_b super().__init__( checkpoint=True, result=[ LocalResult(location=storage_path_a, serializer=PandasSerializer('csv')), LocalResult(location=storage_path_b, serializer=PandasSerializer('csv')), ], target=[ storage_path_a, storage_path_b, ] ) def run(self): return pd.DataFrame(self.a), pd.DataFrame(self.b)