Can prefect cloud support PickleSerializer on `Pre...
# ask-community
j
Can prefect cloud support PickleSerializer on
PrefectResult
? I’m currently encountering this error:
Copy code
TypeError: PrefectResult only supports JSONSerializer or DateTimeSerializer
k
Hey @John T, I guess not then, but I feel like someone did it. Let me double check
j
Thanks!
z
If you pickle something then base64 encode the bytes to a string then it’s “JSON”
PrefectResult
stores something directly in the database column which has to be a string (ie JSON) not bytes.
j
ahh I see, so if I provide my own subclass of
PrefectResult
, that adds that extra layer of b64 encode/decode then I should be fine?
k
I guess so yeah. More like you can provide your own serializer I think
z
Hm. It looks like our
JSONSerializer
actually encodes the string into bytes.
I think this is because we don’t want to store pickles in the database
It’s dangerous since unpickling can execute arbitary code
j
I think we should be fine, since it’s just a datetime object that we ourselves generate. But it’s good to think about these things.
Following the b64 encoding approach, with how
PrefectResult
leverages
location
and
value
, the Location Result in the UI has a b64 encoded string. I’m curious if I could just reassign
location
to a string representation of my value.
Here is some sample code:
Copy code
class CloudPicklePrefectResult(Result):
    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)

    def _encode(self, value: str) -> bytes:
        return value.encode('utf-8')
    
    def _decode(self, value: bytes) -> str:
        return value.decode('utf-8')

    def read(self, location: str) -> Result:
        new = self.copy()
        new.value = (
            cloudpickle.loads(
                base64.b64decode(
                    self._encode(location)
                )
            )
        )

        try:
            new.location = str(new.value)
        except:
            new.location = location

        return new

    def write(self, value_: Any, **kwargs: Any) -> Result:
        new = self.copy()
        new.value = value_
        new.location = (
            self._decode(
                base64.b64encode(
                    cloudpickle.dumps(new.value)
                )
            )
        )
        return new
k
Let me see what we can do here. I am not confident you can make your own result class and register with it because the backend will not know how to load it. The serializer is also limited. I feel like you may need to use the task itself to do the pickling and encoding. This will take me some time. Will get back to you
Ignore my last statement. I was thinking of results that read from databases won’t work. This will work perfectly fine. I see the issue you are saying now with the Result Location. Will dig in.
👍 1
Ok so I talked to Michael and this is easier to do (without displaying the location as a string) if you subclass the JSONSerializer and create one that uses cloudpickle. Example:
Copy code
class PickleJSONSerializer(JSONSerializer):
    def serialize(self, value: Any) -> bytes:
        return base64.b64encode(cloudpickle.dumps(value))

    def deserialize(self, value: bytes) -> Any:
        return cloudpickle.loads(base64.b64decode(value))

@task
def abc():
    return "this string"

with Flow("result_test", result = PrefectResult(serializer = PickleJSONSerializer())) as flow:
    abc()
But if you really want the result location to be a string in the dashboard, I am not 100% sure but it seemed to render right with this implementation (no serializer attached):
Copy code
class CloudPicklePrefectResult(Result):
    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)

    def read(self, location: str) -> Result:
        new = self.copy()
        new.value = (
            cloudpickle.loads(
                base64.b64decode((location.encode("utf-8"))
                )
            )
        )
        new.location = location

        return new

    def write(self, value_: Any, **kwargs: Any) -> Result:
        new = self.copy()
        new.value = base64.b64encode(cloudpickle.dumps(value_)).decode("utf-8")
        new.location = value_
        return new
j
ahhhh this seems super useful, thank you so much for this! I will test it out in the following hours and keep you posted! Thanks again for all the hard work you guys do! ❤️