We're running into an intermittent failures runnin...
# ask-community
j
We're running into an intermittent failures running Flows for ML hyperparameter tuning on a small/medium sized Dask cluster (12 workers) on AWS. The stack trace (I'll post in a thread reply in a sec) shows an exception down in Dask/pickle, but this is unfortunately intermittent. Anyone run into something similar? (This is on Dask version 2021.07.1 and a slightly older Prefect version: 0.13.19)
Copy code
ERROR
CloudFlowRunner
Unexpected error: TypeError("__init__() missing 1 required positional argument: 'lock_file'")
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 632, in get_flow_run_state
    s.map_states = executor.wait(mapped_children[t])
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 452, in wait
    return self.client.gather(futures)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 2020, in gather
    return self.sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 861, in sync
    return sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 1914, in _gather
    response = await future
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 1965, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 866, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 651, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py", line 215, in read
    msg = await from_frames(
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 111, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 103, in _decode_default
    return merge_and_deserialize(
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 474, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 406, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
TypeError: __init__() missing 1 required positional argument: 'lock_file'
k
Hey @Joe Schmid, I haven’t seen this specifically, this seems like a version mismatch somewhere right? I would be surprised that this is intermittent. I suspect that Dask version seems pretty recent compared to the Prefect version and this does seem like it’s coming from the distributed library. What are you versions for distributed and python on client, scheduler and worker?
j
@Kevin Kho Yeah, I totally agree -- that exception should be deterministic and should happen every time. (Yet somehow it's not!) dask and distributed versions are identical on all components (client, scheduler, worker) and Python version is 3.8.6:
Copy code
dask                              2021.7.1
distributed                       2021.7.1
k
Did you try listing the versions of everything for all components? That’s the only thing left to do but I presume you’re using an image right?