Looking for potential insight that others may have...
# prefect-community
j
Looking for potential insight that others may have. We are running a prefect core workflow using the
DaskExecutor
which in turn connects to a
SLURMCluster
from
dask_jobqueue
. There are about 14000 items to process in a mapped task and each result in that map returns
None
so no worries on memory management there. However, the workflow runs until about ~2200 tasks are completed then errors and restarts the whole workflow. The errors that I see on the main workflow / scheduler thread all follow something like:
Copy code
distributed.core - ERROR - 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
Traceback (most recent call last):
  File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 411, in handle_comm
    result = await result
  File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 1758, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2714, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 472, in handle_stream
    handler(**merge(extra, msg))
  File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2675, in handle_long_running
    ts = self.tasks[key]
KeyError: 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
Any ideas what could possibly be causing this issue? (This seems like a
dask
question and not too much
prefect
but thought someone here may know)
Specifically I am wondering if this bug is back: https://github.com/PrefectHQ/prefect/issues/943#issuecomment-484566721 Due to this: https://github.com/dask/distributed/issues/3386#issuecomment-597645377 14000 tasks isn't too many but note that each task usually generates about 600 more tasks.
c
@Jim Crist-Harif might have some insight here
j
Hmmm. There's no reason this should be specific to the dask backend cluster (i.e. the same issue should be reproducible in a local cluster as well). The error you're seeing indicates that a key is missing in the scheduler, which could be due to a few things (client dropping futures so GC collects them, improper graph structure, bugs in distributed, etc...). I'd need a reproducible issue to help debug more.
j
Will try to create a minimal reproducible one today šŸ™‚
šŸ‘ 1
Still debugging this but currently leading theory is that something is going wrong on our Network / SLURMCluster / number of file opens on shared storage. Will update if we keep debugging and find it as a dask / prefect issue but for now looking like our side.
j
Sounds good. If you get stuck I recommend opening an issue in dask/dask or dask/distributed - the collective community may have run into the same problem at some point. If you ping me (jcrist on github) I'll be sure to look at it sooner.
j
Oof, its been 16 days from my original posting of this question. I have some updates that may be of interest to the Prefect team. So there were a lot of things wrong related to our infrastructure that I won't go into simply because they are our issues and not dask or prefect issues. But, there is an issue with
distributed.Client.map
that I think may provide room for a feature for
prefect.Task.map
. Seeing this issue: https://github.com/dask/distributed/issues/2181 It fits decently close to the situation we had found ourselves in, while we aren't mapping millions of tasks were are mapping thousands of tasks, each of which spawn thousands more which could result in the same issue.
distributed
patched this (no release yet) with this PR: https://github.com/dask/distributed/pull/3650 Which adds a
batch_size
parameter to
Client.map
. However, it adds it in a way that it simply submits the tasks to the scheduler in batches. (Which is fine and does work) But -- what we did was write our own wrapper which has a
batch_size
that fully processes the batch then moves onto the next batch rather than simply submitting in batches. Basically: futures vs full results per batch. My proposal is similar: Add a
batch_size
or similar parameter to
prefect.Task.map
which will fully process
n
number of the iterables provided to potentially alleviate this issue. If this makes sense and somewhat sounds good I would be happy to post a github issue as a feature request for better tracking. This is more for updating in the case others get similar issues and for feedback if you had any.
a
hey @Jackson Maxfield Brown, Iā€™m also encountering obstacles using Dask via
dask_jobqueue
connect to an
LSFCluster
. Do you mind if I PM and ask you some questions?
j
Feel free to ping me. May not have the best answers but happy to try to help