I've managed to get around most of the problems I ...
# ask-community
h
I've managed to get around most of the problems I had with retries and stability on Dask, but this one eludes me. I'm getting the
KilledWorker
error which seemingly fails the whole flow. Despite this, the workers are alive and fine (more in thread)
Copy code
dask-worker-d4cdcb698-n9w86             1/1     Running   0          11m
dask-worker-d4cdcb698-p7f4c             1/1     Running   0          13m
dask-worker-d4cdcb698-qljjm             1/1     Running   0          5m30s
dask-worker-d4cdcb698-qpbpp             1/1     Running   0          8m9s
dask-worker-d4cdcb698-rrsf2             1/1     Running   0          14m
I've set GCSResult on most things, and the flow continues to run like planned; so I'm not sure what the killed worker was, nor why it fails the whole flow because of it, or what I can do about it?
Previously I ran preemptible nodes, but I'm happy to wait for this to be resolved before doing back to them.
What I do have, however, is an HorizontalPodAutoscaler, that scales the workers up and down depending on how much CPU they are consuming. This makes the prefect job (the Dask client) talk in terms of adding/removing workers, which in turn might be a buggy code path (?) causing the "KilledWorker" message?
Copy code
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:29+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.6.3:42033> removed
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:33+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:35+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.10.4:44069> removed
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:48+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:04+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:12+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.10.4:44007> added
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:19+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:25+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.6.3:41491> added
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:34+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:49+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...

later

prefect-job-9c43910a-v6pqj flow [2021-11-27 12:31:52+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: KilledWorker('fit_spend2txs_model-4-2e39f830c95848798566c200821d6e9a', <WorkerState '<tcp://10.4.6.3:38793>', name: <tcp://10.4.6.3:38793>, status: closed, memory: 0, processing: 11>)
prefect-job-9c43910a-v6pqj flow Traceback (most recent call last):
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
prefect-job-9c43910a-v6pqj flow     new_state = method(self, state, *args, **kwargs)
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
prefect-job-9c43910a-v6pqj flow     final_states = executor.wait(
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/prefect/executors/dask.py", line 440, in wait
prefect-job-9c43910a-v6pqj flow     return self.client.gather(futures)
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 1969, in gather
prefect-job-9c43910a-v6pqj flow     return self.sync(
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 865, in sync
prefect-job-9c43910a-v6pqj flow     return sync(
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 327, in sync
prefect-job-9c43910a-v6pqj flow     raise exc.with_traceback(tb)
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 310, in f
prefect-job-9c43910a-v6pqj flow     result[0] = yield future
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
prefect-job-9c43910a-v6pqj flow     value = future.result()
prefect-job-9c43910a-v6pqj flow   File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 1834, in _gather
prefect-job-9c43910a-v6pqj flow     raise exception.with_traceback(traceback)
prefect-job-9c43910a-v6pqj flow distributed.scheduler.KilledWorker: ('fit_spend2txs_model-4-2e39f830c95848798566c200821d6e9a', <WorkerState '<tcp://10.4.6.3:38793>', name: <tcp://10.4.6.3:38793>, status: closed, memory: 0, processing: 11>)
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:31:52+0000] DEBUG - prefect.CloudFlowRunner | Flow 'run_mmm': Handling state change from Running to Failed
a
How did you define the HorizontalPodAutoscaler? This memory: 0 in the error on client gather still looks like not enough memory is allocated on the worker, or did I misunderstand? If you believe it’s some error on the Dask executor side, would you mind creating an issue for it and give us info on under what circumstances did it appear (Prefect & distributed version, Dask setup, etc)? But if you think it’s still a dask distributed error similar to the previous one you had, perhaps it makes sense adding it to your issue in the Dask repo?
@haf I think this documentation page provides a solution to your problem, or at least it gives hints of what you should check http://distributed.dask.org/en/stable/killed.html
h
@Anna Geller Thank you for your replies. It was defined with
minReplicas: 3, maxReplicas: 9
but I've removed it for now and made the cluster a permanent 5 replicas for now. I honestly don't know where this problem might be: whether it's Dask or Prefect or something else. Perhaps when Orion is finished and it can ship traces (and possibly dask too?) this will be easier to debug. Bumping the thread count on the Dask Worker (nthreads) and making them permanent made it more stable for now, and I got a couple of successful runs (2h 30m)
a
thanks for the update, nice work!
h
Thank you. I hope I can spend more time debugging why scaling in and our servers and using preemptibles fails, in the future.
Update on the KilledWorker; it still happens but not as frequently. This is the best log entry I've found of it:
distributed.scheduler - INFO - Task infer_quantities-8-56a955e4e6224f5f9822da85821c36f5 marked as failed because 3 workers died while trying to run it
The only problem being I can't find any logs from the worker about this and most runs succeed. Hmm
I know it was not killed by the Nanny (sounds like a board game!), because the "Worker exceeded X memory budget" is not present in the logs. Also can't find "End worker" from the section "Worker chose to exit"
Seeing a stacktrace would be nice; but I'm not seeing one
I don't think it OOM:ed because I've given them 30 GiB each and it should run in less than 3 GiB.
a
interesting! Were you able to see in the logs if this task/job “infer_quantities-8-56a955e4e6224f5f9822da85821c36f5” was retried on another worker? A retry on another worker seems to be what Dask should do in this case.
h
No, I didn't find any retry in this case
I didn't look that deep for it, as there are very many lines of logs per second but a cursory search for
ies-8
yielded no results
👍 1
OK, so back to debugging here: Now it's happening once a day. I've gotten metrics up and running, and it correlates with all 16 cores becoming used across all nodes in the cluster and a whole lot of context switching and network traffic
Any ideas?
k
If it’s memory that’s the issue, I have ideas, but not CPU. It looks like it’s CPU killing the worker. This is the first time I’ve seen this kind of thing
h
Exciting 🙂
It's literally thinking itself to death?
However, the algorithm is not that well written to be consuming 16 cores to 100% over that long periods of time, so it's more likely we're actually in a tight loop in the library codes (most likely in Dask)
k
I think so right? Sorry but I don’t really have idea on this. Are you using a mapped task that shares common inputs? Like do you have a small dataframe all the mapped tasks use?
h
For the task when it's crashing (the one that runs for 2 hours non-stop on four cores), there are two or three dataframes in play, unique to that task-index
k
Do you load them inside the task or are they passed in?
h
They are passed in from other tasks.
The aim was to make that loading cacheable but I don't think I've succeeded in doing that despite using Results.
k
Are they pandas DataFrames or Dask DataFrames?
h
Pandas for now
k
If network usage is high along with CPU, I would explore how to reduce that. Can the task read them in maybe? This is all just a guess though
I thought you potentially had one dataframe that kept moving around or being copied for tasks. For that, you could try using Dask
scatter
to move it to the workers ahead of time
h
Looks like this: so I think only CPU is high
But I mean; 5 MiB
But there are no real errors in the logs from the worker and while it's taking a pod-wide lock (which fails once in a while) the task should be retried (and then it mostly works)
And Dask only reports nominal CPU usage
k
That’s even weirder (the Dask CPU Utilization)
h
Yes
g
Hey all! Catching up on the conversation
d
Which version of dask and distributed are you using? Could it be that dask is rebalancing there cached object How many dask worker do you have power pod?
h
We're using 2021.10.0-py3.9 and distributed 2021.10.0.
Copy code
FROM daskdev/dask:2021.10.0-py3.9

RUN apt-get update \
    && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata build-essential \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /opt/app

ENV POETRY_VERSION=1.1.11

RUN pip install "poetry==$POETRY_VERSION"

COPY poetry.lock pyproject.toml postinstall.py ./
COPY --chown=1000:100 infer ./infer

RUN POETRY_VIRTUALENVS_CREATE=false poetry install --no-interaction --no-ansi

RUN python postinstall.py
We run with
nprocs=2
per pod, with
ntreads=16
and we have five pods, sticky to five VM:s as the single workload pods running (monitoring pods for extracting system metrics are running side-by-side).
Copy code
dask-worker-85784599b8-2z79p dask-worker distributed.nanny - INFO - Worker process 2772 was killed by signal 11
dask-worker-85784599b8-2z79p dask-worker distributed.nanny - WARNING - Restarting worker
Is there any way to trace what's going on?
k
We’ll see if davzucky has ideas. This is a beyong us on the Prefect side
h
Any progress on this?
d
@haf Sorry for the delay, and sorry for what I will ask. This will be a lot of question... • When the kill worker happen, what do you see on the dask scheduler side ? • Do you see the problem as a dask worker killed or hearbeat problem ? • Why are you not running multiple pod with 1CPU rather than pod with higher CPU limit?
🙏 1
The first think we will need is to have a stable run without HPA, The HPA is another beast we can look at later
• Can you share the kubernetes conf of your dask worker and dask scheduler setup ? • Is your prefect agent running on the same kubernetes cluster as the dask cluster ?
One more comment, I found running with Nanny on kubernetes to not be reliable on kubernetes. I prefer having pod with one worker CPU only and let kubernetes do the management. Nanny can be noisy on the mode
h
When the kill worker happen, what do you see on the dask scheduler side ?
distributed.scheduler - INFO - Task infer_quantities-8-56a955e4e6224f5f9822da85821c36f5 marked as failed because 3 workers died while trying to run it
Do you see the problem as a dask worker killed or hearbeat problem ?
I don't know. My guess is that somewhere in the lib code there's a code path that gets used and causes CPU to spike. Since it's using up all 16 cores, it's probably a re-entrant / async bit of code. This effectively makes comms stop and heartbeat timeout.
Why are you not running multiple pod with 1CPU rather than pod with higher CPU limit?
My thinking was that we'd do both parallelism in the small (data-level) and large (process/task level) — when this thread was started we only had in the large, but now we've rebuilt all of this to plain python, four CPU:s (we have four sampling chains), and are in the process of merging xarray support that parallelises also on data-level getting us to about 90% of 16 cores for one computation.
The first think we will need is to have a stable run without HPA, The HPA is another beast we can look at later
Yes, it's gone.
worker conf
Copy code
---
# Source: dask/templates/dask-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dask-worker

  labels:
    app: dask
    component: worker

spec:
  replicas: 5

  selector:
    matchLabels:
      app: dask
      component: worker

  strategy:
    type: RollingUpdate

  template:
    metadata:
      labels:
        app: dask
        component: worker

      # <https://github.com/dask/dask-kubernetes/issues/197>
      annotations:
        <http://sidecar.istio.io/inject|sidecar.istio.io/inject>: "false"

    spec:
      serviceAccountName: dask-worker

      tolerations:
      - key: dedicated
        operator: Equal
        value: dask
        effect: NoSchedule

      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: dedicated
                operator: In
                values:
                - dask

      containers:
      - name: dask-worker
        image: europe-docker.pkg.dev/logary-delivery/cd/dask
        # image: daskdev/dask:2021.10.0

        args:
        - dask-worker
        - dask-scheduler.flows.svc:8786
        - --no-dashboard
        - --nthreads
        - '20'
        - --nprocs
        - '2'
        - --dashboard-address
        - "8790"
        - --memory-limit
        - 30GB
        - --death-timeout
        - '60'

        env:
        - name: EXTRA_PIP_PACKAGES
          value: fastparquet murmurhash distributed gcsfs

        - name: PREFECT__LOGGING__LEVEL
          value: DEBUG

        - name: PREFECT__CONTEXT__SECRETS__LOGARY_PG_USER
          valueFrom:
            secretKeyRef:
              name: analytics-pguser-modelruns
              key: user

        - name: PREFECT__CONTEXT__SECRETS__LOGARY_PG_PASSWORD
          valueFrom:
            secretKeyRef:
              name: analytics-pguser-modelruns
              key: password

        - name: K8S_NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName

        ports:
        - name: http-dashboard
          containerPort: 8790

        resources:
          requests:
            cpu: "15000m"
            memory: 60G
          limits:
            cpu: "16000m"
            memory: 60G
scheduler conf
Copy code
---
# Source: dask/templates/dask-scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dask-scheduler

  labels:
    app: dask
    component: scheduler

spec:
  replicas: 1
  selector:
    matchLabels:
      app: dask
      component: scheduler

  strategy:
    type: RollingUpdate

  template:
    metadata:
      labels:
        app: dask
        component: scheduler

      # <https://github.com/dask/dask-kubernetes/issues/197>
      annotations:
        <http://sidecar.istio.io/inject|sidecar.istio.io/inject>: "false"

    spec:
      containers:
      - name: dask-scheduler
        image: europe-docker.pkg.dev/logary-delivery/cd/dask
        # image: daskdev/dask:2021.10.0
        
        args:
        - dask-scheduler
        - --port
        - "8786"
        - --bokeh-port
        - "8787"

        ports:
        - name: tcp-scheduler
          containerPort: 8786

        - name: http-webui
          containerPort: 8787

        resources:
          requests:
            memory: 512Mi
            cpu: 500m
          limits:
            memory: 4Gi
            cpu: 1000m
Is your prefect agent running on the same kubernetes cluster as the dask cluster ?
Yes
Dockerfile
Copy code
# <https://github.com/dask/dask-docker/blob/main/base/Dockerfile>
# <https://docs.dask.org/en/latest/how-to/deploy-dask/docker.html>
# <https://stackoverflow.com/questions/53835198/integrating-python-poetry-with-docker>
#
FROM daskdev/dask:2021.10.0-py3.9

ARG COMMIT_SHA
ARG COMMIT_REF

RUN apt-get update \
    && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata build-essential \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /opt/app

ENV POETRY_VERSION=1.1.11

RUN pip install "poetry==$POETRY_VERSION"

COPY poetry.lock pyproject.toml postinstall.py ./
COPY --chown=1000:100 infer ./infer

RUN POETRY_VIRTUALENVS_CREATE=false poetry install --no-interaction --no-ansi

RUN python postinstall.py

ENV COMMIT_SHA=${COMMIT_SHA} COMMIT_REF=${COMMIT_REF}

# test 1: import mmm
RUN python -c 'import mmm'

# test 2: import further in
RUN python -c 'from mmm.data.fetching import METRICS_COL_NAMES'
d
Ok thank you for sharing all of that. Because you are creating worker with 20 threads the GIL and be locked and the process not responding in function of your task https://docs.dask.org/en/latest/how-to/deploy-dask/single-machine.html
This is why Im running all my worker with one thread one CPU and it max everything at 100% during the run because the dask scheduler try to send the task where the data is
Could you try running alot more smaller worker?
In our case we have some worker which required high memory and are using tag to address them for the task