John T
11/18/2021, 9:00 PMPrefectResult
? I’m currently encountering this error:
TypeError: PrefectResult only supports JSONSerializer or DateTimeSerializer
Kevin Kho
John T
11/18/2021, 9:02 PMZanie
Zanie
PrefectResult
stores something directly in the database column which has to be a string (ie JSON) not bytes.John T
11/18/2021, 9:37 PMPrefectResult
, that adds that extra layer of b64 encode/decode then I should be fine?Kevin Kho
Zanie
JSONSerializer
actually encodes the string into bytes.Zanie
Zanie
John T
11/19/2021, 3:14 PMJohn T
11/19/2021, 3:17 PMPrefectResult
leverages location
and value
, the Location Result in the UI has a b64 encoded string. I’m curious if I could just reassign location
to a string representation of my value.John T
11/19/2021, 3:17 PMclass CloudPicklePrefectResult(Result):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
def _encode(self, value: str) -> bytes:
return value.encode('utf-8')
def _decode(self, value: bytes) -> str:
return value.decode('utf-8')
def read(self, location: str) -> Result:
new = self.copy()
new.value = (
cloudpickle.loads(
base64.b64decode(
self._encode(location)
)
)
)
try:
new.location = str(new.value)
except:
new.location = location
return new
def write(self, value_: Any, **kwargs: Any) -> Result:
new = self.copy()
new.value = value_
new.location = (
self._decode(
base64.b64encode(
cloudpickle.dumps(new.value)
)
)
)
return new
Kevin Kho
Kevin Kho
Kevin Kho
class PickleJSONSerializer(JSONSerializer):
def serialize(self, value: Any) -> bytes:
return base64.b64encode(cloudpickle.dumps(value))
def deserialize(self, value: bytes) -> Any:
return cloudpickle.loads(base64.b64decode(value))
@task
def abc():
return "this string"
with Flow("result_test", result = PrefectResult(serializer = PickleJSONSerializer())) as flow:
abc()
But if you really want the result location to be a string in the dashboard, I am not 100% sure but it seemed to render right with this implementation (no serializer attached):
class CloudPicklePrefectResult(Result):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
def read(self, location: str) -> Result:
new = self.copy()
new.value = (
cloudpickle.loads(
base64.b64decode((location.encode("utf-8"))
)
)
)
new.location = location
return new
def write(self, value_: Any, **kwargs: Any) -> Result:
new = self.copy()
new.value = base64.b64encode(cloudpickle.dumps(value_)).decode("utf-8")
new.location = value_
return new
John T
11/19/2021, 5:44 PM