Hello,
I am having and issue that relates to serializing my results and setting checkpoint=False. I have a task that returns a Google ProtoBuff object, say like:
@task(checkpoint=False)
def fetch() -> ProtoBufObject:
return fetch_proto_buff_object()
In the past I have set checkpoint=False to avoid serialization errors with the ProtoBuff objects, but now, even with checkpoint=False I am getting the following when i try to register the flow.
TypeError: can't pickle google.protobuf.pyext._message.MessageDescriptor objects
To resolve this I tried to create a serializer for the ProtoBuff like so:
class PBSerializer(Serializer):
def serialize(self, value: Any) -> bytes:
# transform a Python object into bytes
return value.SerializeToString()
def deserialize(self, value: bytes) -> Any:
# recover a Python object from bytes
ts = amanzi_pb2.TimeSeries
return ts.ParseFromString(value)
and called like:
@task(checkpoint=False, result=LocalResult(serializer=PBSerializer()))
def fetch() -> ProtoBufObject:
return fetch_proto_buff_object()
But I get the same error still. It doesn't seem like the result serializer is being used, but it is also not causing any errors. I have tried to debug and step through the code but can't figure out what is happening.
Any help, guidance, ideas for how to resolve would be welcomed.
I am on v 0.13.19