vish
02/22/2021, 3:17 PMPandasSerializer
and GCSResults
to convert the output of a task from a pandas DataFrame to a partitioned Parquet dataset, before uploading to GCS. It worked well when serializing to a single parquet file (without partitions).
However, when using partitions, the Serializer fails. From my initial look into the source code, it revealed that the Results class expects a binary stream from the serializer. However, _`pandas.to_parquet(*, partition_cols=["col1"])`_ does not return a binary output as it results in multiple parquet files being created.
Example code
pandas_serializer = PandasSerializer(
file_type="parquet",
serialize_kwargs=dict(
engine="pyarrow", partition_cols=["date"]
),
)
gcs_result = GCSResult(
bucket="bucket-name", serializer=pandas_serializer
)
class Extract(Task):
def __init__(self, **kwargs):
super().__init__(**kwargs, result=gcs_result, target="target_file_location")
def run(self, df: pd.DataFrame) -> pd.DataFrame:
# Some processing steps
return df
From my assessment this looks like this pattern (ie. serializer to multiple files) is not supported right now? If that is the case, what are your thoughts on the "prefect-way" to achieve the above.Dylan
vish
02/23/2021, 12:37 AMDylan
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by