https://prefect.io logo
b

Bob Colner

07/03/2020, 4:51 PM
Hi all, I'm working on some custom prefect serializers for pandas df results. I have something that seems to work but I don't think it is very robust (requires creating a local file).
Copy code
class ParquetSerializer(Serializer):

    def serialize(self, value: pd.DataFrame) -> bytes:
        # transform a Python object into bytes
        tmp_filename = str(time_ns()) + '.parquet'
        value.to_parquet(
            path=tmp_filename,
            index=False
        )
        with open(tmp_filename, 'rb') as in_file:
            df_bytes = in_file.read()
        Path(tmp_filename).unlink()
        return df_bytes

    def deserialize(self, value:bytes) -> pd.DataFrame:
        # recover a Python object from bytes        
        df_bytes_io = BytesIO(value)
        df = pd.read_parquet(df_bytes_io)
        return df
Does anyone have thoughts about the above approach? (saving as a local file then reading the bytes from the file?)
Seem like there should be a better way...
I'm using a timestamp in the file name b/c so parallel tasks don't interfere with each other
j

Jeremiah

07/03/2020, 4:58 PM
I’m not familiar enough with Python parquet tools to know if there’s an in-memory bytes representation that would avoid the write-to-file step. Perhaps someone else will know!
b

Bob Colner

07/03/2020, 5:01 PM
thanks, there is no method to return parquet bytes that I'm aware of. Do you think my approach is the next best option? (I'm not a 'real' programmer lol)
j

Jeremiah

07/03/2020, 5:02 PM
Don’t tell anyone, neither am I 🙂 yes, I think there’s nothing wrong with your approach, even though I understand why you’re looking to optimize it
b

Bob Colner

07/03/2020, 5:04 PM
cool, I give it a try for a bigger workload and see how it goes. If it works I'm can try to do a pull request..
👍 1
improved version
Copy code
class ParquetSerializer(Serializer):

    def serialize(self, value: pd.DataFrame) -> bytes:
        # transform a Python object into bytes
        with NamedTemporaryFile(mode='w+b') as tmp_ref1, open(tmp_ref1.name, 'rb') as tmp_ref2:
            value.to_parquet(
                path=tmp_ref1,
                index=False
            )
            df_bytes = tmp_ref2.read()

        return df_bytes

    def deserialize(self, value:bytes) -> pd.DataFrame:
        # recover a Python object from bytes        
        df_bytes_io = BytesIO(value)
        df = pd.read_parquet(df_bytes_io)
        return df
b

Brett Naul

07/03/2020, 11:08 PM
do you have a specific reason for wanting to use parquet as the intermediate format? I believe dask just uses cloudpickle for dataframes and I think it has special handling for dataframes that makes it pretty efficient, seems unlikely that this file roundtrip would be faster
b

Bob Colner

07/08/2020, 4:57 PM
b

Brett Naul

07/08/2020, 5:03 PM
looks cool! it doesn't seem like your parquet serializer would be able to take advantage of those features though...? I'm sure there are use cases for this but I guess I would just suggest you benchmark it and see how performance compares to cloudpickle, since that requires no work on your part to use 🙂 we pass pretty big dataframes around like this (a couple GB) w/o issue; anything bigger than that I assume you wouldn't use a result for anyway
b

Bob Colner

07/08/2020, 5:06 PM
the parquet/feather datasets are working for my with my serializers (improved with from the above version). Lots of other 'big data' tools expect parquet files in this dir sturucture (hive, presto etc.) very common use case, in my world anyway
none of these tools would work with cloudpickles
b

Brett Naul

07/08/2020, 5:14 PM
sure, totally; but you can't leverage those by passing around the actual bytes, you'd need to write them to a persistent (not tmp) path and pass around paths instead. which is definitely something I would be interested in too 👍
b

Bob Colner

07/08/2020, 5:19 PM
yeah, I think the confusion is I'm 'abusing' the results checkpoint feature. I'm able to get my task to upload a parquet/feather file in a dir structure on s3. This let me use the data with lots of useful tools that can query over all files (arrow dataset, presto, etc.) It's working for me 🤷‍♂️
👍 1