https://prefect.io logo
t

Tom Klein

07/09/2023, 12:40 PM
Hello, We have an issue with prefect-dask where, if our tasks rely on a “complex” object being returned (e.g. one that cannot be pickled) - we have a problem moving from the default
ConcurrentTaskRunner
to the
DaskTaskRunner
. Basically we encounter the issue mentioned here: https://discourse.prefect.io/t/picklingerror-error-when-using-a-dasktaskrunner/908 What’s the best way to work around this, other than changing the object being returned (assuming that’s not feasible) ? is there some way to override how serialization is done on it? or - why does it work with the regular task runner but not with dask?
although, from the error we get it seems it’s complaining about the task function itself, rather than its arguments:
Copy code
File "/Users/redacted/Library/Caches/pypoetry/virtualenvs/prefect-tool-nqGlPUR4-py3.9/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function get_feature_group_data at 0x1334dcee0>: it's not the same object as __main__.get_feature_group_data
some more info from the error itself:
Copy code
2023-07-09 15:50:48,768 - distributed.protocol.pickle - ERROR - Failed to serialize {'task': <prefect.tasks.Task object at 0x1065e7f70>, 'task_run': TaskRun(id=UUID('46e50a13-9074-4ebc-8c9d-e7e4dc2eb499'), name='Get Feature Group Data-0', flow_run_id=UUID('94c1a094-2865-47cd-8540-0f9e8146acfd'), task_key='__main__.get_feature_group_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('fdd1635e-3468-45ff-a0ab-b31ca5491a18'), task_inputs={'feature_group': [TaskRunResult(input_type='task_run', id=UUID('676aedf2-f950-4712-94f4-7047fff48f36'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 7, 9, 12, 50, 48, 730929, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=50067), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {'feature_group': HBFeatureGroup(name='accounts_context', sagemaker_session=<sagemaker.session.Session object at 0x12c6180d0>, feature_definitions=[])}, 'wait_for': None, 'result_factory': ResultFactory(persist_result=False, cache_result_in_memory=True, serializer=PickleSerializer(type='pickle', picklelib='cloudpickle', picklelib_version='2.2.1'), storage_block_id=UUID('cef80357-3b80-4dcc-bbac-c554dff38b71'), storage_block=LocalFileSystem(basepath='/Users/redacted/.prefect/storage'), storage_key_fn=<function DEFAULT_STORAGE_KEY_FN at 0x128d511f0>), 'log_prints': False, 'settings': SettingsContext(profile=Profile(name='default', settings={<PREFECT_API_KEY: str>: 'redacted',