Aric Huang

    Aric Huang

    7 months ago
    Is it expected that a
    Result
    using a specific serializer (e.g.
    PandasSerializer
    ) would use the same serializer when loading the result using
    prefect.tasks.prefect.get_task_run_result
    ? I have a task that uses the following task decorator:
    @task(slug="output", result=GCSResult("<path>", serializer=PandasSerializer(file_type="parquet")), checkpoint=True)
    When I try to get the result by doing:
    result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
    I get an error that seems to indicate it's trying to use
    cloudpickle
    instead of Pandas:
    File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
        return cloudpickle.loads(value)
    _pickle.UnpicklingError: A load persistent id instruction was encountered,
    but no persistent_load function was specified.
    Is there a way to have
    get_task_run_result
    use a specific serializer?
    Michael Adkins

    Michael Adkins

    7 months ago
    Hi! Can you include the full traceback?
    Aric Huang

    Aric Huang

    7 months ago
    Sure:
    └── 20:17:35 | ERROR   | Unexpected error while reading from result handler: UnpicklingError('A load persistent id instruction was encountered,\nbut no persistent_load function was specified.')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
        return cloudpickle.loads(base64.b64decode(value))
    _pickle.UnpicklingError: invalid load key, '<'.
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
        new.value = new.serializer.deserialize(serialized_value)
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
        raise exc from e
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
        return cloudpickle.loads(value)
    _pickle.UnpicklingError: A load persistent id instruction was encountered,
    but no persistent_load function was specified.
    └── 20:17:35 | ERROR   | Task 'get_flow_result[0]': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
        return cloudpickle.loads(base64.b64decode(value))
    _pickle.UnpicklingError: invalid load key, '<'.
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/flows/preprocessing_v2.py", line 96, in get_flow_result
        result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
      File "/usr/local/lib/python3.8/dist-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
        return task_run.get_result()
      File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 81, in get_result
        self._result = self._load_result()
      File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 90, in _load_result
        self.state.load_result()
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/state.py", line 153, in load_result
        self._result = result_reader.read(known_location)  # type: ignore
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 115, in read
        raise exc
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
        new.value = new.serializer.deserialize(serialized_value)
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
        raise exc from e
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
        return cloudpickle.loads(value)
    _pickle.UnpicklingError: A load persistent id instruction was encountered,
    but no persistent_load function was specified.
    └── 20:17:35 | INFO    | Task 'get_flow_result[0]': Finished task run for task with final state: 'Failed'
    └── 20:17:44 | INFO    | Flow '4875bc43-7572-4965-aac6-3e5ceab9debd_node-2': Entered state <Success>: All reference tasks succeeded.
    └── 20:17:46 | ERROR   | Unexpected error while reading from result handler: UnpicklingError('A load persistent id instruction was encountered,\nbut no persistent_load function was specified.')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
        return cloudpickle.loads(base64.b64decode(value))
    _pickle.UnpicklingError: invalid load key, '<'.
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
        new.value = new.serializer.deserialize(serialized_value)
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
        raise exc from e
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
        return cloudpickle.loads(value)
    _pickle.UnpicklingError: A load persistent id instruction was encountered,
    but no persistent_load function was specified.
    └── 20:17:46 | ERROR   | Task 'get_flow_result[1]': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 90, in deserialize
        return cloudpickle.loads(base64.b64decode(value))
    _pickle.UnpicklingError: invalid load key, '<'.
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/flows/preprocessing_v2.py", line 96, in get_flow_result
        result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
      File "/usr/local/lib/python3.8/dist-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
        return task_run.get_result()
      File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 81, in get_result
        self._result = self._load_result()
      File "/usr/local/lib/python3.8/dist-packages/prefect/backend/task_run.py", line 90, in _load_result
        self.state.load_result()
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/state.py", line 153, in load_result
        self._result = result_reader.read(known_location)  # type: ignore
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 115, in read
        raise exc
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/results/gcs_result.py", line 105, in read
        new.value = new.serializer.deserialize(serialized_value)
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 94, in deserialize
        raise exc from e
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
        return cloudpickle.loads(value)
    _pickle.UnpicklingError: A load persistent id instruction was encountered,
    but no persistent_load function was specified.
    Michael Adkins

    Michael Adkins

    7 months ago
    I don’t think we store the serializer in the backend.. hmm.
    Here’s a draft that might enable this… https://github.com/PrefectHQ/prefect/pull/5441/files
    Aric Huang

    Aric Huang

    7 months ago
    @Michael Adkins Thanks for getting this up so quickly! Will take a look 🙏
    Michael Adkins

    Michael Adkins

    7 months ago
    Did you have a chance to try this out?
    Aric Huang

    Aric Huang

    7 months ago
    Sorry, haven't had a chance to try this yet - will try to test next week