Robin

    Robin

    1 year ago
    I am observing some weird behavior related to some of our flows in the last days project 1: flows run properly • with 700 task concurrency, • 4GB memory requested for each dask-pod on an EKS cluster • pods are created and teared down adaptively using
    adapt_kwargs
    project 2: another flow fails before really starting • with 100 task concurrency • 2 GB memory requested • pods are not always properly teared down after flow termination, neither with
    adapt_kwargs
    nor with
    n_workers
    in KubernetesRun Errors in project 2 flows: •
    Unexpected error: KeyError('data')
    arrange error
    futures
    All flows ran properly a couple of days ago and there were only minor changes. Commonalities: • We have one prefect agent running on an EKS cluster with prefect cloud • both flows share the same prefect agent • both flows use ECR Discrepancies: • both flows have different base images • the flows are in different projects Further interesting behavior: • prefect jobs get created but error out even when now flow is running
    KeyError
    did not always occure, but many times, in the latest run.
    Unexpected error: KeyError('data')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "...\tasks\analyze.py", line 7, in analysis_route_1
      File "fake_analysis.py", line 17, in fake_analysis
        mat.compute()
      File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 285, in compute
        (result,) = compute(self, traverse=False, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 567, in compute
        results = schedule(dsk, keys, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2666, in get
        results = self.gather(packed, asynchronous=asynchronous, direct=direct)
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1975, in gather
        return self.sync(
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 843, in sync
        return sync(
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 353, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 336, in f
        result[0] = yield future
      File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1869, in _gather
        response = await future
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1909, in _gather_remote
        data2, missing_keys, missing_workers = await gather_from_workers(
      File "/usr/local/lib/python3.8/site-packages/distributed/utils_comm.py", line 88, in gather_from_workers
        response.update(r["data"])
    KeyError: 'data'
    I also got some errors related to the arange operation that we perform with dask:
    mat = da.arange(memory_mb / 800 * 1e8)  # roughly memory_mb MB
    
        mat.compute()
    
        print(f"{id} is using {mat.nbytes / 2 ** 30:.4f} GB")
    The idea is to simulate
    fake_analyses
    with different memory size, random errors etc. to investigate prefects
    results
    ,
    caching
    etc.
    I replaced
    da.arange()
    with
    np.arange()
    and this seems to work again. So I guess there have been some changes on how to use dask? Was hard to get to this error first, because there were mostly other errors first. Also it was really surprising to me since the same code worked fine until 2-3 days ago 🤔 Either way, guess prefect is working on improving things so looking forward to the release 🙏
    PS: Would be good to get feedback on how to work with dask functionality now to avoid above error?
    Kevin Kho

    Kevin Kho

    1 year ago
    Was the keyerror fixed with the
    arange
    as well?
    Was the stuff in your Docker image versioned?
    Robin

    Robin

    1 year ago
    Yes and no, the
    KeyError
    does not occur anymore, because not using
    da.arange
    anymore at all, only
    np.arange
    . Not sure whether it would come up again, if I switched back to
    da.arange
    ... Does that answer the first question?
    Kevin Kho

    Kevin Kho

    1 year ago
    FWIW, I didn’t see any recent Dask releases that would have caused this
    Robin

    Robin

    1 year ago
    What do you mean by
    the stuff
    ? 🙂 And what do you mean by
    versioned
    ? The base image was created with following versions:
    name: ppoc_analytics
    channels:
      - defaults
    dependencies:
      - ca-certificates=2021.1.19
      - certifi=2020.12.5
      - openssl=1.1.1k
      - pip=21.0.1
      - python=3.8.8
      - setuptools=52.0.0
      - sqlite=3.35.2
      - wheel=0.36.2
      - pip:
          - -r requirements.txt
          - -e .
          # Adding these requirements later is somewhat equivalent to the docker build pipelines where
          # they are added in a step-wise process as well. Also to show the context separation of
          # production environment, test environment and development environment.
          - -r .requirements.testing.txt
          - -r .requirements.developing.txt
    All other dependencies (defined in the different requirements) are not fixed:
    jupyter
    jupyterlab
    black
    pytest
    pylint
    dask
    pandas 
    numpy 
    boto3
    pydng
    prefect
    dask_kubernetes
    Maybe some upstream dependencies have changed and cause the new error?
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah the requirements is what I meant. Potentially, though I’m not sure tbh.
    Robin

    Robin

    1 year ago
    Hmm ok