Cody Peterson
05/10/2023, 12:04 AMMarvin
05/10/2023, 12:06 AMprefect.engine.result.serializers.Serializer
.
2. Implement the serialize
method to convert your data to parquet format and write it to a file.
3. Implement the deserialize
method to read the parquet file and convert it back to your original data format.
Once you've created your custom serializer, you can use it with your Result class. For example, if you're using the LocalResult
class, you can pass your custom serializer as the serializer
argument when creating a new instance of LocalResult
.
Here's a simple example of a custom serializer for parquet files:
python
import pandas as pd
from prefect.engine.result.serializers import Serializer
class ParquetSerializer(Serializer):
def __init__(self, compression=None):
self.compression = compression
def serialize(self, value, **kwargs):
df = pd.DataFrame(value)
parquet_buffer = df.to_parquet(compression=self.compression)
return parquet_buffer
def deserialize(self, serialized_value, **kwargs):
df = pd.read_parquet(serialized_value)
return df.to_dict(orient="records")
Remember, this is just a starting point, and you might need to adjust the implementation based on your specific requirements.
Now go forth and serialize, my human friend.Cody Peterson
05/10/2023, 12:07 AMMarvin
05/10/2023, 12:12 AMParquetSerializer
with a LocalResult
in a task within a flow. As for the deserialize
method, it can return any arbitrary object, including a DataFrame. I've updated the deserialize
method in the example below to return a DataFrame directly.
python
import pandas as pd
from prefect import task, Flow
from prefect.engine.result.serializers import Serializer
from prefect.engine.results import LocalResult
class ParquetSerializer(Serializer):
def __init__(self, compression=None):
self.compression = compression
def serialize(self, value, **kwargs):
df = pd.DataFrame(value)
parquet_buffer = df.to_parquet(compression=self.compression)
return parquet_buffer
def deserialize(self, serialized_value, **kwargs):
df = pd.read_parquet(serialized_value)
return df
@task(result=LocalResult(serializer=ParquetSerializer()))
def create_dataframe():
data = {
"col1": [1, 2, 3],
"col2": ["A", "B", "C"]
}
df = pd.DataFrame(data)
return df
@task
def print_dataframe(df):
print(df)
with Flow("parquet_example") as flow:
df = create_dataframe()
print_dataframe(df)
flow.run()
In this example, the create_dataframe
task creates a DataFrame and stores the result using the LocalResult
class with the custom ParquetSerializer
. The print_dataframe
task then takes the DataFrame as input and prints it.
Remember, life is like a DataFrame, full of rows and columns, and sometimes missing values.Cody Peterson
05/10/2023, 12:34 AMNate
05/10/2023, 1:59 AMfrom io import BytesIO
from typing import TypeVar
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from prefect.serializers import Serializer
from typing_extensions import Literal
D = TypeVar("D")
class ParquetSerializer(Serializer):
"""
Serializes data to Parquet.
"""
type: Literal["parquet"] = "parquet"
def dumps(self, obj: D) -> bytes:
"""Encode the object into a blob of bytes."""
table = pa.Table.from_pandas(obj)
buf = BytesIO()
pq.write_table(table, buf)
return buf.getvalue()
def loads(self, blob: bytes) -> D:
"""Decode the blob of bytes into an object."""
buf = BytesIO(blob)
table = pq.read_table(buf)
return table.to_pandas()
## Example usage:
from prefect import flow, task
@task(result_serializer=ParquetSerializer())
def create_dataframe() -> pd.DataFrame:
df = pd.DataFrame({
'A': [1, 2, 3, 4, 5],
'B': ['a', 'b', 'c', 'd', 'e'],
'C': pd.date_range('2023-05-01', periods=5),
})
# Convert the pandas DataFrame to a PyArrow Table
return pa.Table.from_pandas(df)
@flow
def parquet_flow():
return create_dataframe()
if __name__ == "__main__":
parquet_flow()
src/prefect/serializers.py
and write a new subclass ParquetSerializer
that uses pyarrow and pandasMarvin
05/10/2023, 2:04 AMParquetSerializer
subclass that uses pyarrow
and pandas
.
python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from prefect.engine.result.serializers import Serializer
class ParquetSerializer(Serializer):
def __init__(self, compression=None):
self.compression = compression
def serialize(self, value, **kwargs):
df = pd.DataFrame(value)
table = pa.Table.from_pandas(df)
parquet_buffer = pa.BufferOutputStream()
pq.write_table(table, parquet_buffer, compression=self.compression)
return parquet_buffer.getvalue().to_pybytes()
def deserialize(self, serialized_value, **kwargs):
parquet_buffer = pa.py_buffer(serialized_value)
table = pq.read_table(parquet_buffer)
df = table.to_pandas()
return df
This implementation uses pyarrow
and pandas
to create a custom ParquetSerializer
. The serialize
method converts a DataFrame to a Parquet format using pyarrow
, and the deserialize
method reads the Parquet data and converts it back to a DataFrame.
Now you can use this serializer in your tasks and flows, just like in the previous example. Remember, the more serializers, the merrier... or so they say.Nate
05/10/2023, 2:04 AMsrc/prefect/serializers.py
and it shows that you need to implement a dumps
and loads
method
it should also have a type: Literal = "parquet"
fieldMarvin
05/10/2023, 2:08 AMParquetSerializer
subclass for Prefect 2, implementing the dumps
and loads
methods, and including the type
field:
python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from prefect.serializers import Serializer
from typing_extensions import Literal
class ParquetSerializer(Serializer):
type: Literal = "parquet"
def __init__(self, compression=None):
self.compression = compression
def dumps(self, value) -> bytes:
df = pd.DataFrame(value)
table = pa.Table.from_pandas(df)
parquet_buffer = pa.BufferOutputStream()
pq.write_table(table, parquet_buffer, compression=self.compression)
return parquet_buffer.getvalue().to_pybytes()
def loads(self, serialized_value: bytes):
parquet_buffer = pa.py_buffer(serialized_value)
table = pq.read_table(parquet_buffer)
df = table.to_pandas()
return df
Now the ParquetSerializer
is in line with Prefect 2's requirements. You can use this updated serializer in your tasks and flows. Remember, even robots make mistakes, but we learn from them.Cody Peterson
05/11/2023, 6:29 PM