Cody Peterson
05/10/2023, 12:04 AMMarvin
05/10/2023, 12:06 AMprefect.engine.result.serializers.Serializer.
2. Implement the serialize method to convert your data to parquet format and write it to a file.
3. Implement the deserialize method to read the parquet file and convert it back to your original data format.
Once you've created your custom serializer, you can use it with your Result class. For example, if you're using the LocalResult class, you can pass your custom serializer as the serializer argument when creating a new instance of LocalResult.
Here's a simple example of a custom serializer for parquet files:
python
import pandas as pd
from prefect.engine.result.serializers import Serializer
class ParquetSerializer(Serializer):
def __init__(self, compression=None):
self.compression = compression
def serialize(self, value, **kwargs):
df = pd.DataFrame(value)
parquet_buffer = df.to_parquet(compression=self.compression)
return parquet_buffer
def deserialize(self, serialized_value, **kwargs):
df = pd.read_parquet(serialized_value)
return df.to_dict(orient="records")
Remember, this is just a starting point, and you might need to adjust the implementation based on your specific requirements.
Now go forth and serialize, my human friend.Cody Peterson
05/10/2023, 12:07 AMCody Peterson
05/10/2023, 12:10 AMMarvin
05/10/2023, 12:12 AMParquetSerializer with a LocalResult in a task within a flow. As for the deserialize method, it can return any arbitrary object, including a DataFrame. I've updated the deserialize method in the example below to return a DataFrame directly.
python
import pandas as pd
from prefect import task, Flow
from prefect.engine.result.serializers import Serializer
from prefect.engine.results import LocalResult
class ParquetSerializer(Serializer):
def __init__(self, compression=None):
self.compression = compression
def serialize(self, value, **kwargs):
df = pd.DataFrame(value)
parquet_buffer = df.to_parquet(compression=self.compression)
return parquet_buffer
def deserialize(self, serialized_value, **kwargs):
df = pd.read_parquet(serialized_value)
return df
@task(result=LocalResult(serializer=ParquetSerializer()))
def create_dataframe():
data = {
"col1": [1, 2, 3],
"col2": ["A", "B", "C"]
}
df = pd.DataFrame(data)
return df
@task
def print_dataframe(df):
print(df)
with Flow("parquet_example") as flow:
df = create_dataframe()
print_dataframe(df)
flow.run()
In this example, the create_dataframe task creates a DataFrame and stores the result using the LocalResult class with the custom ParquetSerializer. The print_dataframe task then takes the DataFrame as input and prints it.
Remember, life is like a DataFrame, full of rows and columns, and sometimes missing values.Cody Peterson
05/10/2023, 12:34 AMNate
05/10/2023, 1:59 AMfrom io import BytesIO
from typing import TypeVar
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from prefect.serializers import Serializer
from typing_extensions import Literal
D = TypeVar("D")
class ParquetSerializer(Serializer):
"""
Serializes data to Parquet.
"""
type: Literal["parquet"] = "parquet"
def dumps(self, obj: D) -> bytes:
"""Encode the object into a blob of bytes."""
table = pa.Table.from_pandas(obj)
buf = BytesIO()
pq.write_table(table, buf)
return buf.getvalue()
def loads(self, blob: bytes) -> D:
"""Decode the blob of bytes into an object."""
buf = BytesIO(blob)
table = pq.read_table(buf)
return table.to_pandas()
## Example usage:
from prefect import flow, task
@task(result_serializer=ParquetSerializer())
def create_dataframe() -> pd.DataFrame:
df = pd.DataFrame({
'A': [1, 2, 3, 4, 5],
'B': ['a', 'b', 'c', 'd', 'e'],
'C': pd.date_range('2023-05-01', periods=5),
})
# Convert the pandas DataFrame to a PyArrow Table
return pa.Table.from_pandas(df)
@flow
def parquet_flow():
return create_dataframe()
if __name__ == "__main__":
parquet_flow()Nate
05/10/2023, 2:02 AMsrc/prefect/serializers.py and write a new subclass ParquetSerializer that uses pyarrow and pandasMarvin
05/10/2023, 2:04 AMParquetSerializer subclass that uses pyarrow and pandas.
python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from prefect.engine.result.serializers import Serializer
class ParquetSerializer(Serializer):
def __init__(self, compression=None):
self.compression = compression
def serialize(self, value, **kwargs):
df = pd.DataFrame(value)
table = pa.Table.from_pandas(df)
parquet_buffer = pa.BufferOutputStream()
pq.write_table(table, parquet_buffer, compression=self.compression)
return parquet_buffer.getvalue().to_pybytes()
def deserialize(self, serialized_value, **kwargs):
parquet_buffer = pa.py_buffer(serialized_value)
table = pq.read_table(parquet_buffer)
df = table.to_pandas()
return df
This implementation uses pyarrow and pandas to create a custom ParquetSerializer. The serialize method converts a DataFrame to a Parquet format using pyarrow, and the deserialize method reads the Parquet data and converts it back to a DataFrame.
Now you can use this serializer in your tasks and flows, just like in the previous example. Remember, the more serializers, the merrier... or so they say.Nate
05/10/2023, 2:04 AMNate
05/10/2023, 2:07 AMsrc/prefect/serializers.py and it shows that you need to implement a dumps and loads method
it should also have a type: Literal = "parquet" fieldMarvin
05/10/2023, 2:08 AMParquetSerializer subclass for Prefect 2, implementing the dumps and loads methods, and including the type field:
python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from prefect.serializers import Serializer
from typing_extensions import Literal
class ParquetSerializer(Serializer):
type: Literal = "parquet"
def __init__(self, compression=None):
self.compression = compression
def dumps(self, value) -> bytes:
df = pd.DataFrame(value)
table = pa.Table.from_pandas(df)
parquet_buffer = pa.BufferOutputStream()
pq.write_table(table, parquet_buffer, compression=self.compression)
return parquet_buffer.getvalue().to_pybytes()
def loads(self, serialized_value: bytes):
parquet_buffer = pa.py_buffer(serialized_value)
table = pq.read_table(parquet_buffer)
df = table.to_pandas()
return df
Now the ParquetSerializer is in line with Prefect 2's requirements. You can use this updated serializer in your tasks and flows. Remember, even robots make mistakes, but we learn from them.Cody Peterson
05/11/2023, 6:29 PM