I am observing some weird behavior related to some...
# ask-community
r
I am observing some weird behavior related to some of our flows in the last days project 1: flows run properly • with 700 task concurrency, • 4GB memory requested for each dask-pod on an EKS cluster • pods are created and teared down adaptively using
adapt_kwargs
project 2: another flow fails before really starting • with 100 task concurrency • 2 GB memory requested • pods are not always properly teared down after flow termination, neither with
adapt_kwargs
nor with
n_workers
in KubernetesRun Errors in project 2 flows: •
Unexpected error: KeyError('data')
arrange error
futures
All flows ran properly a couple of days ago and there were only minor changes. Commonalities: • We have one prefect agent running on an EKS cluster with prefect cloud • both flows share the same prefect agent • both flows use ECR Discrepancies: • both flows have different base images • the flows are in different projects Further interesting behavior: • prefect jobs get created but error out even when now flow is running
KeyError
did not always occure, but many times, in the latest run.
Copy code
Unexpected error: KeyError('data')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "...\tasks\analyze.py", line 7, in analysis_route_1
  File "fake_analysis.py", line 17, in fake_analysis
    mat.compute()
  File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2666, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1975, in gather
    return self.sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 843, in sync
    return sync(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1869, in _gather
    response = await future
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1909, in _gather_remote
    data2, missing_keys, missing_workers = await gather_from_workers(
  File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 88, in gather_from_workers
    response.update(r["data"])
KeyError: 'data'
1
I also got some errors related to the arange operation that we perform with dask:
Copy code
mat = da.arange(memory_mb / 800 * 1e8)  # roughly memory_mb MB

    mat.compute()

    print(f"{id} is using {mat.nbytes / 2 ** 30:.4f} GB")
The idea is to simulate
fake_analyses
with different memory size, random errors etc. to investigate prefects
results
,
caching
etc.
1
I replaced
da.arange()
with
np.arange()
and this seems to work again. So I guess there have been some changes on how to use dask? Was hard to get to this error first, because there were mostly other errors first. Also it was really surprising to me since the same code worked fine until 2-3 days ago 🤔 Either way, guess prefect is working on improving things so looking forward to the release 🙏
PS: Would be good to get feedback on how to work with dask functionality now to avoid above error?
k
Was the keyerror fixed with the
arange
as well?
1
Was the stuff in your Docker image versioned?
r
Yes and no, the
KeyError
does not occur anymore, because not using
da.arange
anymore at all, only
np.arange
. Not sure whether it would come up again, if I switched back to
da.arange
... Does that answer the first question?
k
Yep
👌 1
FWIW, I didn’t see any recent Dask releases that would have caused this
🤔 1
👀 1
r
What do you mean by
the stuff
? 🙂 And what do you mean by
versioned
? The base image was created with following versions:
Copy code
name: ppoc_analytics
channels:
  - defaults
dependencies:
  - ca-certificates=2021.1.19
  - certifi=2020.12.5
  - openssl=1.1.1k
  - pip=21.0.1
  - python=3.8.8
  - setuptools=52.0.0
  - sqlite=3.35.2
  - wheel=0.36.2
  - pip:
      - -r requirements.txt
      - -e .
      # Adding these requirements later is somewhat equivalent to the docker build pipelines where
      # they are added in a step-wise process as well. Also to show the context separation of
      # production environment, test environment and development environment.
      - -r .requirements.testing.txt
      - -r .requirements.developing.txt
All other dependencies (defined in the different requirements) are not fixed:
Copy code
jupyter
jupyterlab
black
pytest
pylint
dask
pandas 
numpy 
boto3
pydng
prefect
dask_kubernetes
Maybe some upstream dependencies have changed and cause the new error?
k
Yeah the requirements is what I meant. Potentially, though I’m not sure tbh.
r
Hmm ok