Hi all, I'm working on some custom prefect seriali...
# prefect-community
b
Hi all, I'm working on some custom prefect serializers for pandas df results. I have something that seems to work but I don't think it is very robust (requires creating a local file).
Copy code
class ParquetSerializer(Serializer):

    def serialize(self, value: pd.DataFrame) -> bytes:
        # transform a Python object into bytes
        tmp_filename = str(time_ns()) + '.parquet'
        value.to_parquet(
            path=tmp_filename,
            index=False
        )
        with open(tmp_filename, 'rb') as in_file:
            df_bytes = in_file.read()
        Path(tmp_filename).unlink()
        return df_bytes

    def deserialize(self, value:bytes) -> pd.DataFrame:
        # recover a Python object from bytes        
        df_bytes_io = BytesIO(value)
        df = pd.read_parquet(df_bytes_io)
        return df
Does anyone have thoughts about the above approach? (saving as a local file then reading the bytes from the file?)
Seem like there should be a better way...
I'm using a timestamp in the file name b/c so parallel tasks don't interfere with each other
j
I’m not familiar enough with Python parquet tools to know if there’s an in-memory bytes representation that would avoid the write-to-file step. Perhaps someone else will know!
b
thanks, there is no method to return parquet bytes that I'm aware of. Do you think my approach is the next best option? (I'm not a 'real' programmer lol)
j
Don’t tell anyone, neither am I 🙂 yes, I think there’s nothing wrong with your approach, even though I understand why you’re looking to optimize it
b
cool, I give it a try for a bigger workload and see how it goes. If it works I'm can try to do a pull request..
👍 1
improved version
Copy code
class ParquetSerializer(Serializer):

    def serialize(self, value: pd.DataFrame) -> bytes:
        # transform a Python object into bytes
        with NamedTemporaryFile(mode='w+b') as tmp_ref1, open(tmp_ref1.name, 'rb') as tmp_ref2:
            value.to_parquet(
                path=tmp_ref1,
                index=False
            )
            df_bytes = tmp_ref2.read()

        return df_bytes

    def deserialize(self, value:bytes) -> pd.DataFrame:
        # recover a Python object from bytes        
        df_bytes_io = BytesIO(value)
        df = pd.read_parquet(df_bytes_io)
        return df
b
do you have a specific reason for wanting to use parquet as the intermediate format? I believe dask just uses cloudpickle for dataframes and I think it has special handling for dataframes that makes it pretty efficient, seems unlikely that this file roundtrip would be faster
b
b
looks cool! it doesn't seem like your parquet serializer would be able to take advantage of those features though...? I'm sure there are use cases for this but I guess I would just suggest you benchmark it and see how performance compares to cloudpickle, since that requires no work on your part to use 🙂 we pass pretty big dataframes around like this (a couple GB) w/o issue; anything bigger than that I assume you wouldn't use a result for anyway
b
the parquet/feather datasets are working for my with my serializers (improved with from the above version). Lots of other 'big data' tools expect parquet files in this dir sturucture (hive, presto etc.) very common use case, in my world anyway
none of these tools would work with cloudpickles
b
sure, totally; but you can't leverage those by passing around the actual bytes, you'd need to write them to a persistent (not tmp) path and pass around paths instead. which is definitely something I would be interested in too 👍
b
yeah, I think the confusion is I'm 'abusing' the results checkpoint feature. I'm able to get my task to upload a parquet/feather file in a dir structure on s3. This let me use the data with lots of useful tools that can query over all files (arrow dataset, presto, etc.) It's working for me 🤷‍♂️
👍 1