haf
11/27/2021, 12:05 PMKilledWorker
error which seemingly fails the whole flow. Despite this, the workers are alive and fine (more in thread)haf
11/27/2021, 12:05 PMdask-worker-d4cdcb698-n9w86 1/1 Running 0 11m
dask-worker-d4cdcb698-p7f4c 1/1 Running 0 13m
dask-worker-d4cdcb698-qljjm 1/1 Running 0 5m30s
dask-worker-d4cdcb698-qpbpp 1/1 Running 0 8m9s
dask-worker-d4cdcb698-rrsf2 1/1 Running 0 14m
haf
11/27/2021, 12:05 PMhaf
11/27/2021, 12:06 PMhaf
11/27/2021, 12:12 PMhaf
11/27/2021, 12:21 PMprefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:29+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.6.3:42033> removed
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:33+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:35+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.10.4:44069> removed
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:19:48+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:04+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:12+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.10.4:44007> added
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:19+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:25+0000] DEBUG - prefect.DaskExecutor | Worker <tcp://10.4.6.3:41491> added
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:34+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:20:49+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
later
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:31:52+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: KilledWorker('fit_spend2txs_model-4-2e39f830c95848798566c200821d6e9a', <WorkerState '<tcp://10.4.6.3:38793>', name: <tcp://10.4.6.3:38793>, status: closed, memory: 0, processing: 11>)
prefect-job-9c43910a-v6pqj flow Traceback (most recent call last):
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
prefect-job-9c43910a-v6pqj flow new_state = method(self, state, *args, **kwargs)
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
prefect-job-9c43910a-v6pqj flow final_states = executor.wait(
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/prefect/executors/dask.py", line 440, in wait
prefect-job-9c43910a-v6pqj flow return self.client.gather(futures)
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 1969, in gather
prefect-job-9c43910a-v6pqj flow return self.sync(
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 865, in sync
prefect-job-9c43910a-v6pqj flow return sync(
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 327, in sync
prefect-job-9c43910a-v6pqj flow raise exc.with_traceback(tb)
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 310, in f
prefect-job-9c43910a-v6pqj flow result[0] = yield future
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
prefect-job-9c43910a-v6pqj flow value = future.result()
prefect-job-9c43910a-v6pqj flow File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 1834, in _gather
prefect-job-9c43910a-v6pqj flow raise exception.with_traceback(traceback)
prefect-job-9c43910a-v6pqj flow distributed.scheduler.KilledWorker: ('fit_spend2txs_model-4-2e39f830c95848798566c200821d6e9a', <WorkerState '<tcp://10.4.6.3:38793>', name: <tcp://10.4.6.3:38793>, status: closed, memory: 0, processing: 11>)
prefect-job-9c43910a-v6pqj flow [2021-11-27 12:31:52+0000] DEBUG - prefect.CloudFlowRunner | Flow 'run_mmm': Handling state change from Running to Failed
Anna Geller
Anna Geller
haf
11/29/2021, 4:34 PMminReplicas: 3, maxReplicas: 9
but I've removed it for now and made the cluster a permanent 5 replicas for now.
I honestly don't know where this problem might be: whether it's Dask or Prefect or something else. Perhaps when Orion is finished and it can ship traces (and possibly dask too?) this will be easier to debug.
Bumping the thread count on the Dask Worker (nthreads) and making them permanent made it more stable for now, and I got a couple of successful runs (2h 30m)Anna Geller
haf
11/29/2021, 4:57 PMhaf
11/30/2021, 8:52 AMdistributed.scheduler - INFO - Task infer_quantities-8-56a955e4e6224f5f9822da85821c36f5 marked as failed because 3 workers died while trying to run it
The only problem being I can't find any logs from the worker about this and most runs succeed. Hmmhaf
11/30/2021, 9:01 AMhaf
11/30/2021, 9:01 AMhaf
11/30/2021, 9:03 AMAnna Geller
haf
11/30/2021, 10:02 AMhaf
11/30/2021, 10:04 AMies-8
yielded no resultshaf
12/12/2021, 5:46 PMhaf
12/15/2021, 8:58 AMKevin Kho
haf
12/15/2021, 2:29 PMhaf
12/15/2021, 2:30 PMhaf
12/15/2021, 2:30 PMKevin Kho
haf
12/15/2021, 2:38 PMKevin Kho
haf
12/15/2021, 3:10 PMhaf
12/15/2021, 3:11 PMKevin Kho
haf
12/15/2021, 3:18 PMKevin Kho
Kevin Kho
scatter
to move it to the workers ahead of timehaf
12/15/2021, 3:24 PMhaf
12/15/2021, 3:24 PMhaf
12/15/2021, 3:27 PMhaf
12/15/2021, 3:28 PMhaf
12/15/2021, 3:29 PMKevin Kho
haf
12/15/2021, 3:35 PMGeorge Coyne
12/15/2021, 7:40 PMdavzucky
12/15/2021, 9:25 PMhaf
12/16/2021, 10:04 AMFROM daskdev/dask:2021.10.0-py3.9
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /opt/app
ENV POETRY_VERSION=1.1.11
RUN pip install "poetry==$POETRY_VERSION"
COPY poetry.lock pyproject.toml postinstall.py ./
COPY --chown=1000:100 infer ./infer
RUN POETRY_VIRTUALENVS_CREATE=false poetry install --no-interaction --no-ansi
RUN python postinstall.py
We run with nprocs=2
per pod, with ntreads=16
and we have five pods, sticky to five VM:s as the single workload pods running (monitoring pods for extracting system metrics are running side-by-side).haf
12/16/2021, 10:36 AMdask-worker-85784599b8-2z79p dask-worker distributed.nanny - INFO - Worker process 2772 was killed by signal 11
dask-worker-85784599b8-2z79p dask-worker distributed.nanny - WARNING - Restarting worker
haf
12/17/2021, 8:36 AMKevin Kho
haf
12/30/2021, 5:53 AMdavzucky
01/03/2022, 11:47 PMdavzucky
01/03/2022, 11:49 PMdavzucky
01/03/2022, 11:50 PMdavzucky
01/03/2022, 11:54 PMhaf
01/17/2022, 7:55 AMWhen the kill worker happen, what do you see on the dask scheduler side ?
distributed.scheduler - INFO - Task infer_quantities-8-56a955e4e6224f5f9822da85821c36f5 marked as failed because 3 workers died while trying to run it
Do you see the problem as a dask worker killed or hearbeat problem ?I don't know. My guess is that somewhere in the lib code there's a code path that gets used and causes CPU to spike. Since it's using up all 16 cores, it's probably a re-entrant / async bit of code. This effectively makes comms stop and heartbeat timeout.
Why are you not running multiple pod with 1CPU rather than pod with higher CPU limit?My thinking was that we'd do both parallelism in the small (data-level) and large (process/task level) — when this thread was started we only had in the large, but now we've rebuilt all of this to plain python, four CPU:s (we have four sampling chains), and are in the process of merging xarray support that parallelises also on data-level getting us to about 90% of 16 cores for one computation.
The first think we will need is to have a stable run without HPA, The HPA is another beast we can look at laterYes, it's gone.
haf
01/17/2022, 7:56 AM---
# Source: dask/templates/dask-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: dask-worker
labels:
app: dask
component: worker
spec:
replicas: 5
selector:
matchLabels:
app: dask
component: worker
strategy:
type: RollingUpdate
template:
metadata:
labels:
app: dask
component: worker
# <https://github.com/dask/dask-kubernetes/issues/197>
annotations:
<http://sidecar.istio.io/inject|sidecar.istio.io/inject>: "false"
spec:
serviceAccountName: dask-worker
tolerations:
- key: dedicated
operator: Equal
value: dask
effect: NoSchedule
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: dedicated
operator: In
values:
- dask
containers:
- name: dask-worker
image: europe-docker.pkg.dev/logary-delivery/cd/dask
# image: daskdev/dask:2021.10.0
args:
- dask-worker
- dask-scheduler.flows.svc:8786
- --no-dashboard
- --nthreads
- '20'
- --nprocs
- '2'
- --dashboard-address
- "8790"
- --memory-limit
- 30GB
- --death-timeout
- '60'
env:
- name: EXTRA_PIP_PACKAGES
value: fastparquet murmurhash distributed gcsfs
- name: PREFECT__LOGGING__LEVEL
value: DEBUG
- name: PREFECT__CONTEXT__SECRETS__LOGARY_PG_USER
valueFrom:
secretKeyRef:
name: analytics-pguser-modelruns
key: user
- name: PREFECT__CONTEXT__SECRETS__LOGARY_PG_PASSWORD
valueFrom:
secretKeyRef:
name: analytics-pguser-modelruns
key: password
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
ports:
- name: http-dashboard
containerPort: 8790
resources:
requests:
cpu: "15000m"
memory: 60G
limits:
cpu: "16000m"
memory: 60G
haf
01/17/2022, 7:56 AM---
# Source: dask/templates/dask-scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: dask-scheduler
labels:
app: dask
component: scheduler
spec:
replicas: 1
selector:
matchLabels:
app: dask
component: scheduler
strategy:
type: RollingUpdate
template:
metadata:
labels:
app: dask
component: scheduler
# <https://github.com/dask/dask-kubernetes/issues/197>
annotations:
<http://sidecar.istio.io/inject|sidecar.istio.io/inject>: "false"
spec:
containers:
- name: dask-scheduler
image: europe-docker.pkg.dev/logary-delivery/cd/dask
# image: daskdev/dask:2021.10.0
args:
- dask-scheduler
- --port
- "8786"
- --bokeh-port
- "8787"
ports:
- name: tcp-scheduler
containerPort: 8786
- name: http-webui
containerPort: 8787
resources:
requests:
memory: 512Mi
cpu: 500m
limits:
memory: 4Gi
cpu: 1000m
haf
01/17/2022, 7:56 AMIs your prefect agent running on the same kubernetes cluster as the dask cluster ?Yes
haf
01/17/2022, 7:57 AM# <https://github.com/dask/dask-docker/blob/main/base/Dockerfile>
# <https://docs.dask.org/en/latest/how-to/deploy-dask/docker.html>
# <https://stackoverflow.com/questions/53835198/integrating-python-poetry-with-docker>
#
FROM daskdev/dask:2021.10.0-py3.9
ARG COMMIT_SHA
ARG COMMIT_REF
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /opt/app
ENV POETRY_VERSION=1.1.11
RUN pip install "poetry==$POETRY_VERSION"
COPY poetry.lock pyproject.toml postinstall.py ./
COPY --chown=1000:100 infer ./infer
RUN POETRY_VIRTUALENVS_CREATE=false poetry install --no-interaction --no-ansi
RUN python postinstall.py
ENV COMMIT_SHA=${COMMIT_SHA} COMMIT_REF=${COMMIT_REF}
# test 1: import mmm
RUN python -c 'import mmm'
# test 2: import further in
RUN python -c 'from mmm.data.fetching import METRICS_COL_NAMES'
davzucky
01/17/2022, 9:07 AMdavzucky
01/17/2022, 9:09 AMdavzucky
01/17/2022, 9:09 AMdavzucky
01/17/2022, 9:10 AM