https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Cole Murray

09/10/2022, 1:43 AM
Hi all, Running into a bit of an odd issue around pickling of a result with a recent change to an existing flow. The return type from the task is a List[<dataclass>]. Within the object, the properties are both strings. I’ve tried re-deploying the flow, and currently in progress of upgrading from 2.2.0 to 2.3.2. Anyone experience issues around cloud pickle on returning an object from a task? Stacktrace linked in comments
2
Copy code
Can't pickle local object '__create_fn__.<locals>.__init__'
Traceback (most recent call last):
  File "/opt/venv/lib/python3.8/site-packages/workflow_etl/flows/flow.py", line 265, in hello_world
    device_paths = get_object_paths(<redacted>)
  File "/opt/venv/lib/python3.8/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 727, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/opt/venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 859, in create_task_run_then_submit
    return await future._result()
  File "/opt/venv/lib/python3.8/site-packages/prefect/futures.py", line 227, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/opt/venv/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/opt/venv/lib/python3.8/site-packages/prefect/task_runners.py", line 214, in submit
    result = await run_fn(**run_kwargs)
  File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 1000, in begin_task_run
    return await orchestrate_task_run(
  File "/opt/venv/lib/python3.8/site-packages/prefect/engine.py", line 1120, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/opt/venv/lib/python3.8/site-packages/prefect/states.py", line 129, in return_value_to_state
    return Completed(data=DataDocument.encode(serializer, result))
  File "/opt/venv/lib/python3.8/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/opt/venv/lib/python3.8/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/opt/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
AttributeError: Can't pickle local object '__create_fn__.<locals>.__init__'
Copy code
@task
def get_object_paths(<redacted>) -> List[DeviceDataFilePath]:
    return _get_object_paths(...)


def _get_object_paths(<redacted>) -> List[DeviceDataFilePath]:
    """
    redacted
    """
    time_now = datetime.datetime.utcnow()
    logger = get_run_logger()
    device_data = []

    for device_id in device_ids:
        for day in range(num_days):
            day_time = time_now - datetime.timedelta(days=day)
            year = day_time.year
            month = day_time.month
            day = day_time.day
            filter_pattern = f"<redacted>"
            <http://logger.info|logger.info>(f"Filter pattern: {filter_pattern}")
            filtered_objs = aws_utils.list_bucket(bucket=input_bucket, prefix=filter_pattern)
            objs = list(filtered_objs)
            objs = sorted(objs, key=lambda obj: obj.last_modified, reverse=True)
            ...<redacted>
    return device_data
Calling flow:
Copy code
@flow
def hello_world(...<redacted>...):
    try:
        bucket = input_bucket
        device_paths = get_object_paths(<redacted>)

   ...
a

Anna Geller

09/10/2022, 12:25 PM
it looks like your dataclass object is not pickleable with cloudpickle, to troubleshoot you could try to cloudpickle it by hand not sure how helpful this info is but your stack trace seems to refer to this function in the cpython codebase: https://github.com/python/cpython/blob/main/Lib/dataclasses.py#L413 having said that, we are working on more flexible result config functionality, which will give you more control over what results will be persisted and how
🚀 1
👍 1
c

Cole Murray

09/10/2022, 7:49 PM
Yep, not totally clear why the object isn’t pickleable. I ended up swapping the dataclass model into a pydantic model and resolved the issue
💯 1
3 Views