Has anyone run into any problems reading a result ...
# ask-community
a
Has anyone run into any problems reading a result written with a serializer other than the default pickle serializer? I set the result for a task to use GCSResult with the JSONSerializer. When I try to read the result from task runs obtained from
Client.get_flow_run_info(..)
, the read function is using the default pickle serializer. I used prefect 0.15.1 in both the flow environment and to run the read code. Am I missing something? Thank you in advance!
Copy code
result = GCSResult(
    bucket="prefect-cloud-results",
    location="{flow_name}/{flow_run_id}/provenance.json",
    serializer=JSONSerializer()
)
@task(result=result)
def set_provenance_data(flow_run_id: str, prefect_cloud_client: prefect.client.client.Client = None):
...
k
Hey @Ayla Khan , I have seen this happen specifically when the serialization and deserialization happened with different Python versions. Both Python versions and Prefect versions (a bit less on this one) need to match on both ends. Are you using a container that has a different Python version (3.6, 3.7, 3.8)?
a
Yes, they're different. Thank you - I'll try a container with a different Python version and see if that fixes it!
I tried running deserialization in a container running from the same Prefect Docker image (
prefecthq/prefect:0.15.1-python3.6
) as I used to run the flow that serialized the result. Still seeing the error
k
Could you give me your
cloudpickle
version? I’ll make an environment and test this
a
Thank you!
Copy code
root@893f541d47cb:/# pip list | grep cloudpickle
cloudpickle                 1.6.0
k
Will try to replicate sometime today. Feel free to ping me if I don’t get back you by tom. I just have a bunch of stuff today.
a
ok, sounds good
k
I know what you are saying now. I can replicate that error when I serialize with JSON and read with PickleSerializer. Did you attach the Result to the class or use it inside the task? Did you attach it to the Flow?
🙏 1
a
I want to return the result from the task and use it (attach I guess?) inside a Flow
k
I mean did you do
@task(result = xxx)
or
Flow(result = xxx)
or
Copy code
@task
def abc():
    result.write(xxx)
Wondering how the result was defined.
a
Ah, ok! I'm doing
@task(result = xxx)
k
So I talked to the team. The serializer is not included in the result by design. It is loaded in when the Task runs, so it won’t work for the script that you posted originally. What are you trying to do?
a
I have some data about flow runs that for ergonomic reasons, is better to serialize as JSON instead of pickling, and save that JSON file to a GCS bucket. It's also convenient to have this data available to query through
Client.get_flow_run_info
and the task run data that gets returned, which is what I was testing in the script snippet I posted. I have a workaround by calling raw google storage code directly, but it would be nice to be able to get that data by reading from the result. Is there any way to specify what serializer a result should use when reading a value from a previously run task state's result? Would I set a Result on a task or flow that's instantiated with the serializer the result should use and then call the result read function?
k
Yeah so I was thinking maybe you should just instantiate the Result manually like
res = GCSResult(bucket="xxx", serializer=JSONSerializer())
, and then you can do
res.read(location)
a
Ok, I'll give that a try later today. Thank you!
Yes, that'll work and its cleaner than using raw GCS code in a task. Thank you again for your help
k
Thanks for your understanding!