Peyton Murray
07/06/2021, 11:45 PMimport prefect as pf
from prefect.engine.results import LocalResult
@pf.task(checkpoint=True, result=LocalResult(dir=path_to_result)
def my_task(a, b, c):
return do_stuff(a, b, c)
with pf.Flow('my flow') as flow:
my_task(1, 2, 'foo') # <--- I want to be able to specify path_to_result here
flow.run()
What's the right way to structure this to specify path_to_result
at the indicated location?Kevin Kho
from prefect import task, Flow, Task
from prefect.engine.results import LocalResult
class MyTask(Task):
def __init__(self, a, b, c):
self.a = a
self.b = b
super().__init__(result=LocalResult(c))
def run(self):
return self.a + self.b
with Flow('my flow') as flow:
abc = MyTask(1, 2, 'foo')() # <--- I want to be able to specify path_to_result here
flow.run()
Peyton Murray
07/07/2021, 10:30 PM*prefect.engine.serializers.Serializer*()
? I have a task which returns train and test dataframes and I'd like to write them to two separate local files.Kevin Kho
PandasSerializer
and it doesn’t work because the PandasSerializer expects a dataframe and calls to_whatever
on it. It doesn’t expect a tuple of DataFrames. I suggest you just use pd.read_csv()
or what you need inside the task and load it in the subsequent tasks to support the nout=2
. I assume you tried using the PandasSerializer
and got an error right?Peyton Murray
07/08/2021, 4:36 AMPandasSerializer
will work. I was hoping to be able to do something like this to have two results cached (and targeted) for a single task:
from prefect import Task
from prefect.engine.results import LocalResult
from prefect.engine.serializers import PandasSerializer
import pandas as pd
class MyTask(Task):
def __init__(self, a, b, storage_path_a, storage_path_b):
self.a = a
self.b = b
self.storage_path_a = storage_path_a
self.storage_path_b = storage_path_b
super().__init__(
checkpoint=True,
result=[
LocalResult(location=storage_path_a, serializer=PandasSerializer('csv')),
LocalResult(location=storage_path_b, serializer=PandasSerializer('csv')),
],
target=[
storage_path_a,
storage_path_b,
]
)
def run(self):
return pd.DataFrame(self.a), pd.DataFrame(self.b)