1. Hi all, I have a problem with running flows on ...
# prefect-community
j
1. Hi all, I have a problem with running flows on a dask-kubernetes cluster with a single worker. I'm sometimes getting
Unexpected error: TimeoutError()
error. Sometimes, the error happens before any task are started. I'm really unsure on how to debug this, so any help is appreciated!
The full error message is
Copy code
Unexpected error: TimeoutError()
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 597, in get_flow_run_state
    for t in final_tasks
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 375, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1988, in gather
    asynchronous=asynchronous,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 833, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1847, in _gather
    raise exception.with_traceback(traceback)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 54, in _maybe_run
    var = Variable(var_name, client=get_client())
  File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3124, in get_client
    return worker._get_client(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3024, in _get_client
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
    self.start(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
    sync(self.loop, self._start, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1046, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1127, in _ensure_connected
    msg = await asyncio.wait_for(comm.read(), timeout)
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
n
Hi @Julien Allard - is there anything else that could be hogging up that worker? Perhaps another flow or another process somewhere?
j
Hi! I don't think so. I'm using the free cloud version so I can only run a single flow.
n
Got it - can you share a min reproducible example? That'll help us narrow down what could be happening from the Prefect side
j
Ok, I'll try to make one, it's really weird because it doesn't happen every time
n
That is odd, I'm sorry you're experiencing that! If we can reliably reproduce then hopefully we'll be able to get to the bottom of it 🙂
j
Yes! If you have any tips for debugging in kubernetes- dask, I might able to find a more useful error message. I'm relatively new to Kubernetes as well as Dask
n
@Julien Allard - someone on the team just mentioned that you could try increasing the connection timeout with this environment variable in your worker pod:
Copy code
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=30s
j
Thanks! I will try this
@nicholas Hi! I tried setting the timeouts and it seems to work more consistently now. By exploring the worker logs, I often see the error
Copy code
Event loop was unresponsive in Worker for 3.31s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
I was wondering what could be the cause of this message? Is it possible that large dataframes are causing issues? Is the data sent from the worker to the scheduler after each a task completion?
n
Yeah that sounds like it could do it - I'm not totally sure of the internal Dask mechanics but you could try a couple things: increasing your worker's resources and/or having your tasks read in the data and then place it in another location, returning nothing but a reference to it to your next tasks.