Hi folks, I am facing this error - whenever my fl...
# prefect-community
m
Hi folks, I am facing this error - whenever my flow involves spinning up a considerable number of workers (more than 100 dask workers) - note the same flow runs fine when I set a smaller number of workers… (the error is invoked after the flow goes into Running state, but before any tasks are run)
Copy code
Unexpected error: TimeoutError()
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 410, in get_flow_run_state
    with executor.start():
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 239, in start
    with Client(self.address, **self.client_kwargs) as client:
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
    self.start(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
    sync(self.loop, self._start, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1046, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1108, in _ensure_connected
    await asyncio.wait_for(self._update_scheduler_info(), timeout)
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
it seems to me the solution to avoid this timeout is to add a task that waits for the workers to be ready - anyone else run into this ? (I am using a DaskKubernetesEnvironment for execution on AWS EKS)
I see this issue which seems very related - https://github.com/PrefectHQ/prefect/issues/2277 @Dylan - you mention that the last version definitely resolved this - mind sharing the version number you are referencing ?
I am using prefect version
0.12.6
I might be thinking this has to do with the number of workers - but I just got the same error after I lowered the number of workers …
j
Hi Marwan, this issue is raising a
TimeoutError
like in #2277, but the reason here is likely different (old versions of prefect created many dask clients which could result in spurious timeouts). Recent versions of prefect only create a single client, which alleviated many of these issues. Here it looks like the initial connect is timing out, is your cluster failing to start quickly?
How are you creating a dask cluster? What environment/executor setup are you using with your flows?
m
@Jim Crist-Harif thank you for taking the time to respond and to explain what might have caused this on earlier versions - what is very peculiar is that the TimeoutError is thrown for one flow, but not for another and I am running both on the same cluster… I am using a kubernetes agent, and the flow uses a DaskKubernetesEnvironment where I am passing the same worker spec (just different image) and a minimum and maximum number of workers
is your cluster failing to start quickly?
when I look at the k8s event logs the dask-root worker pods are instantly created in both flow runs
j
Hmmm, that is odd. Can you provide any of the following? • The version dask-kubernetes, prefect, and distributed you're running with • The logs from all pods in a failed flow run (you might want to sanitize these, I'm mainly interested in the dask-related logs and tracebacks). Uploading these as a github gist might be easiest 🤷 • What k8s service you're using (e.g. aws, gke, etc..) and if you have any non-default k8s networking configurations setup that might affect two pods communicating.
m
hi @Jim Crist-Harif - sorry when I said it had nothing to do with the number of workers I was wrong - I thought I was changing the number of workers but I wasn’t due to a stupid mistake.. (sorry) [ but it is good to pinpoint the cause of the error being the number of workers … ] so the flow that runs fine is the one with 30 workers, the one that crashes with a TimeoutError is the one with 300 workers … dask-kubernetes ==0.10.1 prefect ==0.12.6 I am using aws EKS with the AWS autoscaler to provision the nodes - default networking configuration here is a snippet of the logs from the only relevant pod (other pods dont show any logs)
Copy code
$ kubectl logs pod/prefect-dask-job-4134a74e-aba5-4ee4-b62b-476ff83e676e-kq5k2
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  <tcp://192.168.2.18:44869>
distributed.scheduler - INFO -   dashboard at:                     :8787
[2020-08-11 22:57:13] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'fnma-construct-pipeline'
[2020-08-11 22:57:15] INFO - prefect.CloudFlowRunner | Starting flow run.
[2020-08-11 22:57:15] DEBUG - prefect.CloudFlowRunner | Flow 'xxx-xxx': Handling state change from Scheduled to Running
[2020-08-11 22:57:31] ERROR - prefect.CloudFlowRunner | Unexpected error: TimeoutError()
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 410, in get_flow_run_state
    with executor.start():
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 239, in start
    with Client(self.address, **self.client_kwargs) as client:
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
    self.start(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
    sync(self.loop, self._start, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1046, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1108, in _ensure_connected
    await asyncio.wait_for(self._update_scheduler_info(), timeout)
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
[2020-08-11 22:57:32] DEBUG - prefect.CloudFlowRunner | Flow 'fnma-construct-pipeline': Handling state change from Running to Failed
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://192.168.2.18:49448 remote=tcp://192.168.2.18:44869>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://192.168.2.18:49446 remote=tcp://192.168.2.18:44869>
distributed.core - INFO - Event loop was unresponsive in Scheduler for 33.70s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.71s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.63.225:34299', name: 192, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.63.225:34299>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.1.106:35021', name: 210, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.1.106:35021>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.63.190:42053', name: 167, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.63.190:42053>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.15.173:36475', name: 278, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.15.173:36475>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.45.145:46167', name: 42, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.45.145:46167>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.27.15:40629', name: 216, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.27.15:40629>
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Scheduler for 5.60s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 4.40s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.27.15:40629', name: 216, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.27.15:40629>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.1.106:35021', name: 210, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.1.106:35021>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.63.225:34299', name: 192, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.63.225:34299>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.63.190:42053', name: 167, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.63.190:42053>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.15.173:36475', name: 278, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.15.173:36475>
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.3.82:42369', name: 23, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.3.82:42369>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://192.168.14.230:36415', name: 195, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, <tcp://192.168.14.230:36415>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.45.145:46167', name: 42, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.45.145:46167>
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.scheduler - INFO - Remove worker <Worker 'tcp://192.168.3.82:42369', name: 23, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to <tcp://192.168.3.82:42369>
message has been deleted
j
Hmm, interesting, it looks like something is blocking the event loop (a blocking operation that should be offloaded to a thread clearly isn't somewhere). I suspect this is a bug in distributed or dask-kubernetes, not prefect, but we'd need to reproduce better to narrow things down. What is the worker scale configuration you're using here (adapt min/max)?
m
@Jim Crist-Harif - I just faced this issue with a min workers of 30 and a max workers of 35, but definitely seem to encounter it more as the number of workers increases below is a sample worker spec
Copy code
kind: Pod
metadata:
  labels:
    app: prefect-dask-worker
spec:
  containers:
  - args:
    - dask-worker
    - --no-dashboard
    - --death-timeout
    - '60'
    - --nthreads
    - '1'
    - --nprocs
    - '1'
    - --memory-limit
    - 4GB
    env:
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          key: AWS_ACCESS_KEY_ID
          name: aws-s3-secret
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          key: AWS_SECRET_ACCESS_KEY
          name: aws-s3-secret
    image: <pathtoimage>
    imagePullPolicy: IfNotPresent
    name: dask-worker
    resources:
      limits:
        cpu: 1000m
        memory: 4G
      requests:
        cpu: 1000m
        memory: 4G
  restartPolicy: Never
Yes it seems something is blocking the connection from the worker to the scheduler … and also it is worth noting that these dask config connection timeout settings can’t be currently controlled by prefect
DaskKubernetesEnvironment doesn’t have any dask client kwargs that it can pass on to the DaskExecutor for instance for me to try and change the timeout config settings….