<@ULVA73B9P> I want to persist my task results as ...
# ask-marvin
j
@Marvin I want to persist my task results as parquet files. So, my serializer would involve calling pandas.DataFrame.to_parquet method. The documentation only mentions json and pickle as options. Can I write a custom serializer that persists the data as parquet?
m
To persist task results in Prefect, the results need to be stored in a location within your infrastructure, at which point Prefect will store a reference to the result. Both the serializer and the storage location can be customized in Prefect. While the documentation primarily discusses using JSON and pickle serializers, you can indeed write a custom serializer for your purposes. This means that if you want to serialize task results as Parquet files, you will have to create a custom serializer that calls the
pandas.DataFrame.to_parquet
method. Here are the steps you would generally follow to implement a custom serializer: 1. Define a custom serializer class that implements the
serialize
and
deserialize
methods. 2. Within the
serialize
method, you would call the
to_parquet
method on your DataFrame. 3. Configure your task to use your custom serializer when saving results. Unfortunately, the current information provided does not contain a precise implementation example for a Parquet serializer, but creating one would follow the same principles as any other custom serializer following Prefect's serializer interfaces. If you need more detailed information or examples, I can look up specific documentation or GitHub issues related to custom serialization in Prefect. Would you like me to do that?
j
@Marvin Does this look like the right idea?
Copy code
class ParquetSerializer(Serializer):
    def __init__(self):
        super().__init__(type="parquet")
        
    def dumps(self, obj: pd.DataFrame):
        return obj.to_parquet(path=None, engine="pyarrow")
    
    def loads(self, blob: bytes):
        fp = BytesIO(blob)
        return pd.read_parquet(fp, engine="pyarrow")
m
The idea behind your
ParquetSerializer
class seems to be on the right track. You're creating a custom serializer that overrides the
dumps
and
loads
methods to handle parquet files with the help of pandas. However, there are a few things you might want to adjust: 1. The
to_parquet
method by default doesn't return the serialized data, it saves it to a file. You will need to specify a file-like object if you want to keep it in memory. 2. Ensure that the
engine
parameter is consistent with your environment's setup. If you have
pyarrow
installed and prefer to use that, it's alright; otherwise, you may also use
fastparquet
if that is what you have. 3. Ensure that any necessary imports are included at the top of your script (e.g.,
import pandas as pd
,
from prefect.engine.serializers import Serializer
,
from io import BytesIO
). Here's a refined version of your class:
Copy code
python
import pandas as pd
from prefect.engine.serializers import Serializer
from io import BytesIO

class ParquetSerializer(Serializer):
    def __init__(self):
        super().__init__(type="parquet")

    def dumps(self, obj: pd.DataFrame) -> bytes:
        buffer = BytesIO()
        obj.to_parquet(buffer, engine="pyarrow")
        buffer.seek(0)
        return buffer.read()

    def loads(self, blob: bytes) -> pd.DataFrame:
        fp = BytesIO(blob)
        return pd.read_parquet(fp, engine="pyarrow")
This updated version uses a bytes buffer to write and read the parquet data in memory. When you want to persist task results with this serializer, you will need to configure your task with it:
Copy code
python
from prefect import task

@task(result_serializer=ParquetSerializer())
def my_task():
    # Your task logic that produces a pandas DataFrame
    ...
Do remember to test your custom serializer thoroughly to make sure it works as expected before deploying it in production.
j
@Marvin Here is my test code:
Copy code
from io import BytesIO

import pandas as pd
from prefect import flow, task
from prefect.serializers import Serializer
from typing import Literal

class ParquetSerializer(Serializer):
    type: Literal["parquet"] = "parquet"

    def dumps(self, obj: pd.DataFrame):
        if not isinstance(obj, pd.DataFrame):
            raise ValueError("Hey, this is for dataframes")
            
        fp = BytesIO()
        obj.to_parquet(path=fp, engine="pyarrow")
        fp.seek(0)
        return fp.read()

    def loads(self, blob: bytes):
        fp = BytesIO(blob)
        return pd.read_parquet(fp, engine="pyarrow")


@task(
    persist_result=True,
    result_serializer=ParquetSerializer(),
    result_storage_key="test_parquet.parquet",
)
def save_these_results():
    return pd.DataFrame(
        {
            "a": [1, 2, 3],
            "b": [4, 5, 6],
        }
    )


@flow
async def test_parquet_serializer():
    save_these_results()


if __name__ == "__main__":
    test_parquet_serializer.serve(name="Serialize-Test")
Here is the stack trace:
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/echodyne.int/janet/new_rcs/jds-pipeline/jds_pipeline/tools/revise_rcs_7.4/test_serializer.py", line 41, in test_parquet_serializer
    save_these_results()
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/tasks.py", line 549, in __call__
    return enter_task_run_engine(
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 1364, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 1532, in get_task_call_return_value
    return await future._result()
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 1762, in begin_task_run
    state = await orchestrate_task_run(
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 2064, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/states.py", line 305, in return_value_to_state
    return Completed(data=await result_factory.create_result(data))
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/results.py", line 377, in create_result
    return await PersistedResult.create(
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/results.py", line 574, in create
    await storage_block.write_path(key, content=blob.to_bytes())
  File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/results.py", line 615, in to_bytes
    return self.json().encode()
  File "pydantic/main.py", line 504, in pydantic.main.BaseModel.json
  File "/usr/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "pydantic/json.py", line 88, in pydantic.json.pydantic_encoder
  File "pydantic/json.py", line 45, in pydantic.json.lambda
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe4 in position 115: invalid continuation byte
Why is prefect calling a json encoder when I defined a ParquetSerializer on my task?
m
The stack trace you've shared indicates that Prefect is indeed attempting to encode the data as JSON even though you have specified a
ParquetSerializer
. This seems unusual because by defining a custom
ParquetSerializer
and assigning it to the task, you were instructing Prefect to handle the serialization process with parquet rather than JSON. One potential reason for this issue could be a bug or a misunderstanding in the interaction between the custom serializer and the Prefect internals. Unfortunately, a search for similar issues on GitHub did not yield any results, so this might not be a known problem at the moment. To troubleshoot this issue, I'd recommend verifying the following: 1. Ensure that your environment is set up correctly and that Prefect is using the correct version where custom serializers are supported. 2. Revisit the custom
ParquetSerializer
class to confirm that it's correctly subclassing from the Prefect
Serializer
class and that the
dumps
and
loads
methods are implemented as required. 3. Double-check the
result_storage_key
assignment to ensure it is consistent with how Prefect expects to use storage keys. 4. Review the Prefect documentation and any examples or guides to confirm that all steps are followed correctly. If the problem still persists and isn't resolved by the usual debugging steps, consider reaching out to the Prefect community or opening an issue on the Prefect GitHub repository for further assistance. Additionally, make sure to look for any updates in the documentation or the Prefect changelog that might shed light on changes in how custom serializers should be implemented. If you have recently updated Prefect, it could also be worth checking the release notes for any related regressions or breaking changes.
j
@Marvin I started looking at the prefect source code. The Prefect pickle encoder base64-encodes after pickle encoding here https://github.com/PrefectHQ/prefect/blob/c3aece29b042b4ad9097fa9e430bde75294b2044/src/prefect/serializers.py#L159C33-L159C33 and if I change my serializer to parquet-encode and then base64-encode after that, the serializer will not crash. However, this defeats the purpose of creating a parquet serializer as no parquet utilities will be able to open the resulting persisted results. I will have to write code to unbase64 all the data and turn it back into actual parquet format.
Copy code
# Does not crash, but does not create a valid parquet file
class ParquetSerializer(Serializer):
    type: Literal["parquet"] = "parquet"

    def dumps(self, obj: pd.DataFrame) -> bytes:
        if not isinstance(obj, pd.DataFrame):
            raise ValueError("Hey, this is for dataframes")
            
        fp = BytesIO()
        obj.to_parquet(path=fp, engine="pyarrow")
        fp.seek(0)
        raw = fp.getvalue()
        return base64.encodebytes(raw)

    def loads(self, blob: bytes) -> pd.DataFrame:
        
        decode_blob = base64.decodebytes(blob)
        fp = BytesIO(decode_blob)
        return pd.read_parquet(fp, engine="pyarrow")
m
The information I retrieved from the documents suggests that the default behavior for serializing results in Prefect is typically inferred from the context, meaning tasks will use the result serializer configured on the flow unless a specific serializer is indicated for a task. However, if there is no context to load the serializer from, then the serializer defined by the
PREFECT_RESULTS_DEFAULT_SERIALIZER
environment variable will be used. Prefect does support the creation of custom serializers, but the relevant documents do not provide specific implementation details. As for your findings in the Prefect source code for the pickle encoder base64-encoding data after serialization, the custom serializers should function similarly - serializing the data into content that can be transmitted or stored safely. The issue you've encountered suggests the result data needs to be in a format compatible with certain Prefect backend expectations, which seems to be a JSON-encodable format. Given that base64 encoding solves the crash but creates a non-standard parquet file, you seem to be right about Prefect expecting a different serialization format. Since the serialization format is not outright explained in the provided document excerpts, further clarification from the official Prefect documentation or directly from the Prefect development team might be necessary. A possible solution could be to handle the serialization into parquet separately from the task and then encode the resulting bytes with base64 to comply with Prefect's internal handling. Your tasks could then produce the actual parquet files or bytes while another system wraps this output in base64 for storage, though this would indeed add another step to your data handling process. Another potential approach is to use Prefect's block storage system to handle parquet files directly, but this would deviate from using custom serializers. This could involve storing the parquet file to a location of your choice (like a cloud storage bucket) and then merely returning the path or URL of the file as the task result that Prefect persists. If you need further assistance, it may be best to open an issue with the Prefect repository to seek guidance from the developers regarding the custom serialization of task results and compatibility with other formats like parquet.