Is it expected that a `Result` using a specific se...
# prefect-community
a
Is it expected that a
Result
using a specific serializer (e.g.
PandasSerializer
) would use the same serializer when loading the result using
prefect.tasks.prefect.get_task_run_result
? I have a task that uses the following task decorator:
Copy code
@task(slug="output", result=GCSResult("<path>", serializer=PandasSerializer(file_type="parquet")), checkpoint=True)
When I try to get the result by doing:
Copy code
result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
I get an error that seems to indicate it's trying to use
cloudpickle
instead of Pandas:
Copy code
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
    return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
Is there a way to have
get_task_run_result
use a specific serializer?
z
Hi! Can you include the full traceback?
a
Sure:
Copy code
└── 20:17:35 | ERROR   | Unexpected error while reading from result handler: UnpicklingError('A load persistent id instruction was encountered,\nbut no persistent_load function was specified.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
    return cloudpickle.loads(base64.b64decode(value))
_pickle.UnpicklingError: invalid load key, '<'.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
    new.value = new.serializer.deserialize(serialized_value)
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
    raise exc from e
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
    return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
└── 20:17:35 | ERROR   | Task 'get_flow_result[0]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
    return cloudpickle.loads(base64.b64decode(value))
_pickle.UnpicklingError: invalid load key, '<'.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/flows/preprocessing_v2.py", line 96, in get_flow_result
    result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
  File "/usr/local/lib/python3.8/dist-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
    return task_run.get_result()
  File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 81, in get_result
    self._result = self._load_result()
  File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 90, in _load_result
    self.state.load_result()
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/state.py", line 153, in load_result
    self._result = result_reader.read(known_location)  # type: ignore
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 115, in read
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
    new.value = new.serializer.deserialize(serialized_value)
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
    raise exc from e
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
    return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
└── 20:17:35 | INFO    | Task 'get_flow_result[0]': Finished task run for task with final state: 'Failed'
└── 20:17:44 | INFO    | Flow '4875bc43-7572-4965-aac6-3e5ceab9debd_node-2': Entered state <Success>: All reference tasks succeeded.
└── 20:17:46 | ERROR   | Unexpected error while reading from result handler: UnpicklingError('A load persistent id instruction was encountered,\nbut no persistent_load function was specified.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
    return cloudpickle.loads(base64.b64decode(value))
_pickle.UnpicklingError: invalid load key, '<'.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
    new.value = new.serializer.deserialize(serialized_value)
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
    raise exc from e
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
    return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
└── 20:17:46 | ERROR   | Task 'get_flow_result[1]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
    return cloudpickle.loads(base64.b64decode(value))
_pickle.UnpicklingError: invalid load key, '<'.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/flows/preprocessing_v2.py", line 96, in get_flow_result
    result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
  File "/usr/local/lib/python3.8/dist-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
    return task_run.get_result()
  File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 81, in get_result
    self._result = self._load_result()
  File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 90, in _load_result
    self.state.load_result()
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/state.py", line 153, in load_result
    self._result = result_reader.read(known_location)  # type: ignore
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 115, in read
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
    new.value = new.serializer.deserialize(serialized_value)
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
    raise exc from e
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
    return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
z
I don’t think we store the serializer in the backend.. hmm.
Here’s a draft that might enable this… https://github.com/PrefectHQ/prefect/pull/5441/files
a
@Zanie Thanks for getting this up so quickly! Will take a look 🙏
z
Did you have a chance to try this out?
a
Sorry, haven't had a chance to try this yet - will try to test next week