Hi! I am trying to run a dask distributed flow (fr...
# prefect-integrations
a
Hi! I am trying to run a dask distributed flow (from the UI) and I keep getting pickle errors. Basically
Copy code
cbhm-worker        | 11:05:23.932 | ERROR   | prefect.engine - Engine execution of flow run '6199c661-7d1a-4fc9-babe-7a26d28fbd04' exited with unexpected exception
cbhm-worker        | Traceback (most recent call last):
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 60, in dumps
cbhm-worker        |     result = pickle.dumps(x, **dump_kwargs)
cbhm-worker        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cbhm-worker        | _pickle.PicklingError: Can't pickle <function migrate_cluster_batch at 0xffffad09b600>: it's not the same object as cbhm.worker.flows.migrate_cluster_batch.migrate_cluster_batch
cbhm-worker        |
cbhm-worker        | During handling of the above exception, another exception occurred:
cbhm-worker        |
cbhm-worker        | Traceback (most recent call last):
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 65, in dumps
cbhm-worker        |     pickler.dump(x)
cbhm-worker        | _pickle.PicklingError: Can't pickle <function migrate_cluster_batch at 0xffffad09b600>: it's not the same object as cbhm.worker.flows.migrate_cluster_batch.migrate_cluster_batch
cbhm-worker        |
cbhm-worker        | During handling of the above exception, another exception occurred:
cbhm-worker        |
cbhm-worker        | Traceback (most recent call last):
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 366, in serialize
cbhm-worker        |     header, frames = dumps(x, context=context) if wants_context else dumps(x)
cbhm-worker        |                      ^^^^^^^^^^^^^^^^^^^^^^^^^
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
cbhm-worker        |     frames[0] = pickle.dumps(
cbhm-worker        |                 ^^^^^^^^^^^^^
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 77, in dumps
cbhm-worker        |     result = cloudpickle.dumps(x, **dump_kwargs)
cbhm-worker        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1529, in dumps
cbhm-worker        |     cp.dump(obj)
cbhm-worker        |   File "/usr/local/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1295, in dump
cbhm-worker        |     return super().dump(obj)
cbhm-worker        |            ^^^^^^^^^^^^^^^^^
cbhm-worker        | TypeError: cannot pickle '_thread.lock' object
th structure is that the flow is in
cbhm.worker.flows_migrate_parallel
and it imports
migrate_cluster_batch
(a
Task
i execute via
submit
) from
cbhm.worker.flows.migrate_cluster_batch
. Any ideas what could be happening? I've been fighting with Dask for 5 days now and i can't get anything beyond a very simple flow to run.
n
hi @Albert Puig are you passing something like a client object into your task that you're trying to run with dask?
Copy code
TypeError: cannot pickle '_thread.lock' object
this means you're trying to have dask serialize a thread Lock which cannot be serialized