Alex Furrier
07/02/2021, 10:03 PMprefect.engine.serializers.PandasSerializer
and passing that to the task as a local result. Like so:
@task(result=LocalResult(serializer=PandasSerializer(
'csv', serialize_kwargs={'index': False})))
def my_df_task(df: DataFrame) -> DataFrame:
However using this method for the previously described task (input DF, output List) fails as it's trying to use the PandasSerializer to serialize the list.
What's the most sensible way around this?
My hacky workaround is to create a serializer that uses PandasSerializer
methods for deserializing the Dataframe and PickleSerializer
methods for serializing the list.
I feel like there has to be a smarter way to do so.Kevin Kho
Kevin Kho
@task(result=LocalResult(serializer=PandasSerializer(
'csv', serialize_kwargs={'index': False})))
def do_nothing(df: pd.DataFrame) -> pd.DataFrame:
return df
@task(result=LocalResult(serializer=PickleSerializer()))
def some_operation(df: pd.DataFrame) -> List:
return df['a'].to_list()
testdf = pd.DataFrame({"a": [1,2,3], "b": [1,2,3]})
with Flow("test") as flow:
df = do_nothing(testdf)
a_list = some_operation(df)