Domantas
06/17/2021, 10:36 AMTypeError: cannot pickle '_thread.lock' object
(I'll paste full error code in the comments) and I'm out of ideas how to properly solve it.
It is related with DaskExecutor
and it appears when trying to proceed this task(I'll upload code sample in the comments):
1. Read file content from a yml file which is located in the S3 storage. For file reading I'm using raw boto3
implementation.
2. Read bytes from the downloded yml file
3. Load yml file and convert it into list
Does anyone knows a solution to this problem?Domantas
06/17/2021, 10:37 AM@task
def read_tables_yaml(file_name: str) -> List[Dict[Any, Any]]:
s3_object = s3_client.get_object(Bucket="bucket_name", Key=f"yml/{file_name}.yml")["Body"]
yml_content = s3_object.read()
return list(yaml.load_all(yml_content, Loader=yaml.FullLoader))
Domantas
06/17/2021, 10:40 AMUnexpected error: TypeError("cannot pickle '_thread.lock' object") Traceback (most recent call last): File "/app/.venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps result = pickle.dumps(x, **dump_kwargs) _pickle.PicklingError: Can't pickle <function read_tables_yaml at 0x7f35b56c1700>: attribute lookup read_tables_yaml on __main__ failed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/app/.venv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/app/.venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state task_states[task] = executor.submit( File "/app/.venv/lib/python3.8/site-packages/prefect/executors/dask.py", line 395, in submit fut = self.client.submit( File "/app/.venv/lib/python3.8/site-packages/distributed/client.py", line 1593, in submit futures = self._graph_to_futures( File "/app/.venv/lib/python3.8/site-packages/distributed/client.py", line 2580, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) File "/app/.venv/lib/python3.8/site-packages/dask/highlevelgraph.py", line 991, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/app/.venv/lib/python3.8/site-packages/dask/highlevelgraph.py", line 423, in __dask_distributed_pack__ dsk = toolz.valmap(dumps_task, dsk) File "/app/.venv/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap rv.update(zip(d.keys(), map(func, d.values()))) File "/app/.venv/lib/python3.8/site-packages/distributed/worker.py", line 3666, in dumps_task d["kwargs"] = warn_dumps(task[3]) File "/app/.venv/lib/python3.8/site-packages/distributed/worker.py", line 3678, in warn_dumps b = dumps(obj, protocol=4) File "/app/.venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 60, in dumps result = cloudpickle.dumps(x, **dump_kwargs) File "/app/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/app/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_thread.lock' object
Amanda Wee
06/17/2021, 12:47 PMs3_client
, which presumably is a boto3.client("s3", ...)
. Could it be that you're having this at flow level or something such that Prefect attempts to pickle it, but it cannot be pickled? A solution might be to create it within the task.davzucky
06/17/2021, 11:48 PM