John T
11/18/2021, 9:00 PMPrefectResultTypeError: PrefectResult only supports JSONSerializer or DateTimeSerializerKevin Kho
John T
11/18/2021, 9:02 PMZanie
Zanie
PrefectResultJohn T
11/18/2021, 9:37 PMPrefectResultKevin Kho
Zanie
JSONSerializerZanie
Zanie
John T
11/19/2021, 3:14 PMJohn T
11/19/2021, 3:17 PMPrefectResultlocationvaluelocationJohn T
11/19/2021, 3:17 PMclass CloudPicklePrefectResult(Result):
    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
    def _encode(self, value: str) -> bytes:
        return value.encode('utf-8')
    
    def _decode(self, value: bytes) -> str:
        return value.decode('utf-8')
    def read(self, location: str) -> Result:
        new = self.copy()
        new.value = (
            cloudpickle.loads(
                base64.b64decode(
                    self._encode(location)
                )
            )
        )
        try:
            new.location = str(new.value)
        except:
            new.location = location
        return new
    def write(self, value_: Any, **kwargs: Any) -> Result:
        new = self.copy()
        new.value = value_
        new.location = (
            self._decode(
                base64.b64encode(
                    cloudpickle.dumps(new.value)
                )
            )
        )
        return newKevin Kho
Kevin Kho
Kevin Kho
class PickleJSONSerializer(JSONSerializer):
    def serialize(self, value: Any) -> bytes:
        return base64.b64encode(cloudpickle.dumps(value))
    def deserialize(self, value: bytes) -> Any:
        return cloudpickle.loads(base64.b64decode(value))
@task
def abc():
    return "this string"
with Flow("result_test", result = PrefectResult(serializer = PickleJSONSerializer())) as flow:
    abc()class CloudPicklePrefectResult(Result):
    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
    def read(self, location: str) -> Result:
        new = self.copy()
        new.value = (
            cloudpickle.loads(
                base64.b64decode((location.encode("utf-8"))
                )
            )
        )
        new.location = location
        return new
    def write(self, value_: Any, **kwargs: Any) -> Result:
        new = self.copy()
        new.value = base64.b64encode(cloudpickle.dumps(value_)).decode("utf-8")
        new.location = value_
        return newJohn T
11/19/2021, 5:44 PM