Hi folks, we occasionally face this error when run...
# prefect-community
m
Hi folks, we occasionally face this error when running our flow on EKS with a static Dask cluster setup (i.e. using a LocalEnvironment with a DaskExecutor) and a kubernetes agent: (please see the thread for more details)
j
Yeah, you'll want to set
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT
on the workers, scheduler, and flow-runner processes (e.g. pods if you're using k8s). See also this thread: https://prefect-community.slack.com/archives/CL09KU1K7/p1600165695297200. Note you'll need to be running 0.13.7 or above for that fix to be in effect.
m
@Jim Crist-Harif - thank you ! will give it a try and see if it helps
j
Hi @Marwan Sarieddine - Thanks for the question. Looks like Jim has already answered it for you and hope that helps you. Would you mind moving the stack trace into the thread? We're trying to keep the main page user friendly and large stack traces make it harder to see all the questions.
m
@Jenny sure - apologies on that
j
Thank you!! No need to apologize!
m
Full traceback of the issue is below
Copy code
Unexpected error: OSError("Timed out trying to connect to '<tcp://dask-scheduler:8786>' after 10 s: Timed out trying to connect to '<tcp://dask-scheduler:8786>' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f9ae227a190>: ConnectionRefusedError: [Errno 111] Connection refused")
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 322, in connect
    _raise(error)
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://dask-scheduler:8786>' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f9ae227a190>: ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 252, in start
    with Client(self.address, **self.client_kwargs) as client:
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 744, in __init__
    self.start(timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 949, in start
    sync(self.loop, self._start, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1039, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1096, in _ensure_connected
    comm = await connect(
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 334, in connect
    _raise(error)
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://dask-scheduler:8786>' after 10 s: Timed out trying to connect to '<tcp://dask-scheduler:8786>' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f9ae227a190>: ConnectionRefusedError: [Errno 111] Connection refused
The way we resolve this is simply by restarting the flow run ... (so it seems to me like the issue is some sort of a race condition that is hard to replicate... ) Ideally we'd want to decrease the chances of this error arising, so we are wondering which environment variables we can set. (on the dask worker and dask scheduler deployment manifests) that will help increase this timeout to 30s or 60s lets say ... I see the following default setting in dask distributed which I think is what needs to be changed - it would be great if someone more familiar with dask can confirm the exact variable names ... https://github.com/dask/distributed/blob/master/distributed/distributed.yaml#L129 (any other suggestions that we could try are also much appreciated)
🙏 1