Tom Klein
07/09/2023, 12:40 PMConcurrentTaskRunner
to the DaskTaskRunner
.
Basically we encounter the issue mentioned here: https://discourse.prefect.io/t/picklingerror-error-when-using-a-dasktaskrunner/908
What’s the best way to work around this, other than changing the object being returned (assuming that’s not feasible) ? is there some way to override how serialization is done on it? or - why does it work with the regular task runner but not with dask?File "/Users/redacted/Library/Caches/pypoetry/virtualenvs/prefect-tool-nqGlPUR4-py3.9/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function get_feature_group_data at 0x1334dcee0>: it's not the same object as __main__.get_feature_group_data
2023-07-09 15:50:48,768 - distributed.protocol.pickle - ERROR - Failed to serialize {'task': <prefect.tasks.Task object at 0x1065e7f70>, 'task_run': TaskRun(id=UUID('46e50a13-9074-4ebc-8c9d-e7e4dc2eb499'), name='Get Feature Group Data-0', flow_run_id=UUID('94c1a094-2865-47cd-8540-0f9e8146acfd'), task_key='__main__.get_feature_group_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('fdd1635e-3468-45ff-a0ab-b31ca5491a18'), task_inputs={'feature_group': [TaskRunResult(input_type='task_run', id=UUID('676aedf2-f950-4712-94f4-7047fff48f36'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 7, 9, 12, 50, 48, 730929, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=50067), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {'feature_group': HBFeatureGroup(name='accounts_context', sagemaker_session=<sagemaker.session.Session object at 0x12c6180d0>, feature_definitions=[])}, 'wait_for': None, 'result_factory': ResultFactory(persist_result=False, cache_result_in_memory=True, serializer=PickleSerializer(type='pickle', picklelib='cloudpickle', picklelib_version='2.2.1'), storage_block_id=UUID('cef80357-3b80-4dcc-bbac-c554dff38b71'), storage_block=LocalFileSystem(basepath='/Users/redacted/.prefect/storage'), storage_key_fn=<function DEFAULT_STORAGE_KEY_FN at 0x128d511f0>), 'log_prints': False, 'settings': SettingsContext(profile=Profile(name='default', settings={<PREFECT_API_KEY: str>: 'redacted',