Hello everyone! I have been building an ETL pipeli...
# prefect-community
v
Hello everyone! I have been building an ETL pipeline using Prefect, Pandas and Pyarrow(Parquet). I would like to make use of the
PandasSerializer
and
GCSResults
to convert the output of a task from a pandas DataFrame to a partitioned Parquet dataset, before uploading to GCS. It worked well when serializing to a single parquet file (without partitions). However, when using partitions, the Serializer fails. From my initial look into the source code, it revealed that the Results class expects a binary stream from the serializer. However, _`pandas.to_parquet(*, partition_cols=["col1"])`_ does not return a binary output as it results in multiple parquet files being created. Example code
Copy code
pandas_serializer = PandasSerializer(
    file_type="parquet",
    serialize_kwargs=dict(
        engine="pyarrow", partition_cols=["date"]
    ),
)

gcs_result = GCSResult(
    bucket="bucket-name", serializer=pandas_serializer
)

class Extract(Task):
    def __init__(self, **kwargs):

        super().__init__(**kwargs, result=gcs_result, target="target_file_location")

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        # Some processing steps
        return df
From my assessment this looks like this pattern (ie. serializer to multiple files) is not supported right now? If that is the case, what are your thoughts on the "prefect-way" to achieve the above.
d
Hi @vish, I just pinged the team to see if this is a pattern we’d like to explore more broadly (specifically, multi-file results). For now, you definitely have a couple of options: 1. write a custom result & serializer to achieve exactly what you’d like 2. Upload the partitioned files to GCS as part of your Task logic, then return the GCS file uri’s as the result of your task, and consume those using subsequent Task logic
v
Thank you very much. Took your suggestion of writing a custom result/serializer classes. Got it working but I am not super happy about the separation of concerns with my first implementation - the serializer does both "serializing (converting to parquet)" and down/uploading to GCS and the results class simply manages location and checking if the target exists. Does the job for now tho. :) ``````
💯 1
d
Awesome!