Janet Carson
12/22/2023, 6:52 PMMarvin
12/22/2023, 6:53 PMpandas.DataFrame.to_parquet
method.
Here are the steps you would generally follow to implement a custom serializer:
1. Define a custom serializer class that implements the serialize
and deserialize
methods.
2. Within the serialize
method, you would call the to_parquet
method on your DataFrame.
3. Configure your task to use your custom serializer when saving results.
Unfortunately, the current information provided does not contain a precise implementation example for a Parquet serializer, but creating one would follow the same principles as any other custom serializer following Prefect's serializer interfaces.
If you need more detailed information or examples, I can look up specific documentation or GitHub issues related to custom serialization in Prefect. Would you like me to do that?Janet Carson
12/22/2023, 7:33 PMclass ParquetSerializer(Serializer):
def __init__(self):
super().__init__(type="parquet")
def dumps(self, obj: pd.DataFrame):
return obj.to_parquet(path=None, engine="pyarrow")
def loads(self, blob: bytes):
fp = BytesIO(blob)
return pd.read_parquet(fp, engine="pyarrow")
Marvin
12/22/2023, 8:14 PMParquetSerializer
class seems to be on the right track. You're creating a custom serializer that overrides the dumps
and loads
methods to handle parquet files with the help of pandas. However, there are a few things you might want to adjust:
1. The to_parquet
method by default doesn't return the serialized data, it saves it to a file. You will need to specify a file-like object if you want to keep it in memory.
2. Ensure that the engine
parameter is consistent with your environment's setup. If you have pyarrow
installed and prefer to use that, it's alright; otherwise, you may also use fastparquet
if that is what you have.
3. Ensure that any necessary imports are included at the top of your script (e.g., import pandas as pd
, from prefect.engine.serializers import Serializer
, from io import BytesIO
).
Here's a refined version of your class:
python
import pandas as pd
from prefect.engine.serializers import Serializer
from io import BytesIO
class ParquetSerializer(Serializer):
def __init__(self):
super().__init__(type="parquet")
def dumps(self, obj: pd.DataFrame) -> bytes:
buffer = BytesIO()
obj.to_parquet(buffer, engine="pyarrow")
buffer.seek(0)
return buffer.read()
def loads(self, blob: bytes) -> pd.DataFrame:
fp = BytesIO(blob)
return pd.read_parquet(fp, engine="pyarrow")
This updated version uses a bytes buffer to write and read the parquet data in memory. When you want to persist task results with this serializer, you will need to configure your task with it:
python
from prefect import task
@task(result_serializer=ParquetSerializer())
def my_task():
# Your task logic that produces a pandas DataFrame
...
Do remember to test your custom serializer thoroughly to make sure it works as expected before deploying it in production.Janet Carson
12/22/2023, 10:56 PMfrom io import BytesIO
import pandas as pd
from prefect import flow, task
from prefect.serializers import Serializer
from typing import Literal
class ParquetSerializer(Serializer):
type: Literal["parquet"] = "parquet"
def dumps(self, obj: pd.DataFrame):
if not isinstance(obj, pd.DataFrame):
raise ValueError("Hey, this is for dataframes")
fp = BytesIO()
obj.to_parquet(path=fp, engine="pyarrow")
fp.seek(0)
return fp.read()
def loads(self, blob: bytes):
fp = BytesIO(blob)
return pd.read_parquet(fp, engine="pyarrow")
@task(
persist_result=True,
result_serializer=ParquetSerializer(),
result_storage_key="test_parquet.parquet",
)
def save_these_results():
return pd.DataFrame(
{
"a": [1, 2, 3],
"b": [4, 5, 6],
}
)
@flow
async def test_parquet_serializer():
save_these_results()
if __name__ == "__main__":
test_parquet_serializer.serve(name="Serialize-Test")
Here is the stack trace:
Encountered exception during execution:
Traceback (most recent call last):
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
result = await flow_call.aresult()
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/home/echodyne.int/janet/new_rcs/jds-pipeline/jds_pipeline/tools/revise_rcs_7.4/test_serializer.py", line 41, in test_parquet_serializer
save_these_results()
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/tasks.py", line 549, in __call__
return enter_task_run_engine(
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 1364, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
return self.future.result(timeout=timeout)
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 1532, in get_task_call_return_value
return await future._result()
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/task_runners.py", line 231, in submit
result = await call()
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 1762, in begin_task_run
state = await orchestrate_task_run(
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/engine.py", line 2064, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/states.py", line 305, in return_value_to_state
return Completed(data=await result_factory.create_result(data))
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/results.py", line 377, in create_result
return await PersistedResult.create(
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/results.py", line 574, in create
await storage_block.write_path(key, content=blob.to_bytes())
File "/home/echodyne.int/janet/new_rcs/new_rcs/lib/python3.10/site-packages/prefect/results.py", line 615, in to_bytes
return self.json().encode()
File "pydantic/main.py", line 504, in pydantic.main.BaseModel.json
File "/usr/lib/python3.10/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "pydantic/json.py", line 88, in pydantic.json.pydantic_encoder
File "pydantic/json.py", line 45, in pydantic.json.lambda
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe4 in position 115: invalid continuation byte
Why is prefect calling a json encoder when I defined a ParquetSerializer on my task?Marvin
12/22/2023, 10:56 PMParquetSerializer
. This seems unusual because by defining a custom ParquetSerializer
and assigning it to the task, you were instructing Prefect to handle the serialization process with parquet rather than JSON.
One potential reason for this issue could be a bug or a misunderstanding in the interaction between the custom serializer and the Prefect internals. Unfortunately, a search for similar issues on GitHub did not yield any results, so this might not be a known problem at the moment.
To troubleshoot this issue, I'd recommend verifying the following:
1. Ensure that your environment is set up correctly and that Prefect is using the correct version where custom serializers are supported.
2. Revisit the custom ParquetSerializer
class to confirm that it's correctly subclassing from the Prefect Serializer
class and that the dumps
and loads
methods are implemented as required.
3. Double-check the result_storage_key
assignment to ensure it is consistent with how Prefect expects to use storage keys.
4. Review the Prefect documentation and any examples or guides to confirm that all steps are followed correctly.
If the problem still persists and isn't resolved by the usual debugging steps, consider reaching out to the Prefect community or opening an issue on the Prefect GitHub repository for further assistance.
Additionally, make sure to look for any updates in the documentation or the Prefect changelog that might shed light on changes in how custom serializers should be implemented. If you have recently updated Prefect, it could also be worth checking the release notes for any related regressions or breaking changes.Janet Carson
12/23/2023, 12:43 AM# Does not crash, but does not create a valid parquet file
class ParquetSerializer(Serializer):
type: Literal["parquet"] = "parquet"
def dumps(self, obj: pd.DataFrame) -> bytes:
if not isinstance(obj, pd.DataFrame):
raise ValueError("Hey, this is for dataframes")
fp = BytesIO()
obj.to_parquet(path=fp, engine="pyarrow")
fp.seek(0)
raw = fp.getvalue()
return base64.encodebytes(raw)
def loads(self, blob: bytes) -> pd.DataFrame:
decode_blob = base64.decodebytes(blob)
fp = BytesIO(decode_blob)
return pd.read_parquet(fp, engine="pyarrow")
Marvin
12/23/2023, 12:43 AMPREFECT_RESULTS_DEFAULT_SERIALIZER
environment variable will be used.
Prefect does support the creation of custom serializers, but the relevant documents do not provide specific implementation details. As for your findings in the Prefect source code for the pickle encoder base64-encoding data after serialization, the custom serializers should function similarly - serializing the data into content that can be transmitted or stored safely.
The issue you've encountered suggests the result data needs to be in a format compatible with certain Prefect backend expectations, which seems to be a JSON-encodable format.
Given that base64 encoding solves the crash but creates a non-standard parquet file, you seem to be right about Prefect expecting a different serialization format. Since the serialization format is not outright explained in the provided document excerpts, further clarification from the official Prefect documentation or directly from the Prefect development team might be necessary.
A possible solution could be to handle the serialization into parquet separately from the task and then encode the resulting bytes with base64 to comply with Prefect's internal handling. Your tasks could then produce the actual parquet files or bytes while another system wraps this output in base64 for storage, though this would indeed add another step to your data handling process.
Another potential approach is to use Prefect's block storage system to handle parquet files directly, but this would deviate from using custom serializers. This could involve storing the parquet file to a location of your choice (like a cloud storage bucket) and then merely returning the path or URL of the file as the task result that Prefect persists.
If you need further assistance, it may be best to open an issue with the Prefect repository to seek guidance from the developers regarding the custom serialization of task results and compatibility with other formats like parquet.