Hello Prefect, I'm getting an error `TypeError: c...
# ask-community
d
Hello Prefect, I'm getting an error
TypeError: cannot pickle '_thread.lock' object
(I'll paste full error code in the comments) and I'm out of ideas how to properly solve it. It is related with
DaskExecutor
and it appears when trying to proceed this task(I'll upload code sample in the comments): 1. Read file content from a yml file which is located in the S3 storage. For file reading I'm using raw
boto3
implementation. 2. Read bytes from the downloded yml file 3. Load yml file and convert it into list Does anyone knows a solution to this problem?
Code sample:
Copy code
@task
def read_tables_yaml(file_name: str) -> List[Dict[Any, Any]]:
    s3_object = s3_client.get_object(Bucket="bucket_name", Key=f"yml/{file_name}.yml")["Body"]
    yml_content = s3_object.read()
    return list(yaml.load_all(yml_content, Loader=yaml.FullLoader))
Full error code:
Copy code
Unexpected error: TypeError("cannot pickle '_thread.lock' object") Traceback (most recent call last): File "/app/.venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps result = pickle.dumps(x, **dump_kwargs) _pickle.PicklingError: Can't pickle <function read_tables_yaml at 0x7f35b56c1700>: attribute lookup read_tables_yaml on __main__ failed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/app/.venv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/app/.venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state task_states[task] = executor.submit( File "/app/.venv/lib/python3.8/site-packages/prefect/executors/dask.py", line 395, in submit fut = self.client.submit( File "/app/.venv/lib/python3.8/site-packages/distributed/client.py", line 1593, in submit futures = self._graph_to_futures( File "/app/.venv/lib/python3.8/site-packages/distributed/client.py", line 2580, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) File "/app/.venv/lib/python3.8/site-packages/dask/highlevelgraph.py", line 991, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/app/.venv/lib/python3.8/site-packages/dask/highlevelgraph.py", line 423, in __dask_distributed_pack__ dsk = toolz.valmap(dumps_task, dsk) File "/app/.venv/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap rv.update(zip(d.keys(), map(func, d.values()))) File "/app/.venv/lib/python3.8/site-packages/distributed/worker.py", line 3666, in dumps_task d["kwargs"] = warn_dumps(task[3]) File "/app/.venv/lib/python3.8/site-packages/distributed/worker.py", line 3678, in warn_dumps b = dumps(obj, protocol=4) File "/app/.venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 60, in dumps result = cloudpickle.dumps(x, **dump_kwargs) File "/app/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/app/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) TypeError: cannot pickle '_thread.lock' object
a
I notice that your code makes reference to
s3_client
, which presumably is a
boto3.client("s3", ...)
. Could it be that you're having this at flow level or something such that Prefect attempts to pickle it, but it cannot be pickled? A solution might be to create it within the task.
👍 1
d
We had this problem if you are running different version of python between your agent and your worker. In this case dask. This is because pickles serialization is different between version