I'm trying to run a task in Prefect server that ta...
# ask-community
a
I'm trying to run a task in Prefect server that takes a Pandas DataFrame as input and outputs a python list. The server utilizes a Dask backend and as such can be fairly particular about serialization. Previously for tasks involving DataFrames I've been serializing them using
prefect.engine.serializers.PandasSerializer
and passing that to the task as a local result. Like so:
Copy code
@task(result=LocalResult(serializer=PandasSerializer(
    'csv', serialize_kwargs={'index': False})))
def my_df_task(df: DataFrame) -> DataFrame:
However using this method for the previously described task (input DF, output List) fails as it's trying to use the PandasSerializer to serialize the list. What's the most sensible way around this? My hacky workaround is to create a serializer that uses
PandasSerializer
methods for deserializing the Dataframe and
PickleSerializer
methods for serializing the list. I feel like there has to be a smarter way to do so.
k
Hi @Alex Furrier, I think the serializer is just applied to the result so you shouldn’t need this different serialization and deserialization methods to read in the DataFrame (I assume it comes in as a DataFrame into the task, not serialized). Let me test this. One sec.
What error do you get? I tried something like this and it worked.
Copy code
@task(result=LocalResult(serializer=PandasSerializer(
    'csv', serialize_kwargs={'index': False})))
def do_nothing(df: pd.DataFrame) -> pd.DataFrame:
    return df

@task(result=LocalResult(serializer=PickleSerializer()))
def some_operation(df: pd.DataFrame) -> List:
    return df['a'].to_list()

testdf = pd.DataFrame({"a": [1,2,3], "b": [1,2,3]})

with Flow("test") as flow:
    df = do_nothing(testdf)
    a_list = some_operation(df)