a

    Alex Furrier

    1 year ago
    I'm trying to run a task in Prefect server that takes a Pandas DataFrame as input and outputs a python list. The server utilizes a Dask backend and as such can be fairly particular about serialization. Previously for tasks involving DataFrames I've been serializing them using
    prefect.engine.serializers.PandasSerializer
    and passing that to the task as a local result. Like so:
    @task(result=LocalResult(serializer=PandasSerializer(
        'csv', serialize_kwargs={'index': False})))
    def my_df_task(df: DataFrame) -> DataFrame:
    However using this method for the previously described task (input DF, output List) fails as it's trying to use the PandasSerializer to serialize the list. What's the most sensible way around this? My hacky workaround is to create a serializer that uses
    PandasSerializer
    methods for deserializing the Dataframe and
    PickleSerializer
    methods for serializing the list. I feel like there has to be a smarter way to do so.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Alex Furrier, I think the serializer is just applied to the result so you shouldn’t need this different serialization and deserialization methods to read in the DataFrame (I assume it comes in as a DataFrame into the task, not serialized). Let me test this. One sec.
    What error do you get? I tried something like this and it worked.
    @task(result=LocalResult(serializer=PandasSerializer(
        'csv', serialize_kwargs={'index': False})))
    def do_nothing(df: pd.DataFrame) -> pd.DataFrame:
        return df
    
    @task(result=LocalResult(serializer=PickleSerializer()))
    def some_operation(df: pd.DataFrame) -> List:
        return df['a'].to_list()
    
    testdf = pd.DataFrame({"a": [1,2,3], "b": [1,2,3]})
    
    with Flow("test") as flow:
        df = do_nothing(testdf)
        a_list = some_operation(df)