Jackson Maxfield Brown
04/06/2020, 11:55 PMDaskExecutor
which in turn connects to a SLURMCluster
from dask_jobqueue
. There are about 14000 items to process in a mapped task and each result in that map returns None
so no worries on memory management there. However, the workflow runs until about ~2200 tasks are completed then errors and restarts the whole workflow. The errors that I see on the main workflow / scheduler thread all follow something like:
distributed.core - ERROR - 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
Traceback (most recent call last):
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 411, in handle_comm
result = await result
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 1758, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2714, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/core.py", line 472, in handle_stream
handler(**merge(extra, msg))
File "/home/jacksonb/miniconda3/envs/cb-tools/lib/python3.7/site-packages/distributed/scheduler.py", line 2675, in handle_long_running
ts = self.tasks[key]
KeyError: 'process_fov_row-e7df27e7-ca99-433d-b1e1-c4a3f6b2a683-61d201c4-4edb-4243-9e76-4b90c3aa7835-1474'
Any ideas what could possibly be causing this issue? (This seems like a dask
question and not too much prefect
but thought someone here may know)Chris White
04/07/2020, 1:31 AMJim Crist-Harif
04/07/2020, 2:01 PMJackson Maxfield Brown
04/07/2020, 4:07 PMJim Crist-Harif
04/08/2020, 3:03 PMJackson Maxfield Brown
04/23/2020, 1:18 AMdistributed.Client.map
that I think may provide room for a feature for prefect.Task.map
.
Seeing this issue: https://github.com/dask/distributed/issues/2181
It fits decently close to the situation we had found ourselves in, while we aren't mapping millions of tasks were are mapping thousands of tasks, each of which spawn thousands more which could result in the same issue. distributed
patched this (no release yet) with this PR: https://github.com/dask/distributed/pull/3650
Which adds a batch_size
parameter to Client.map
. However, it adds it in a way that it simply submits the tasks to the scheduler in batches. (Which is fine and does work) But -- what we did was write our own wrapper which has a batch_size
that fully processes the batch then moves onto the next batch rather than simply submitting in batches. Basically: futures vs full results per batch. My proposal is similar:
Add a batch_size
or similar parameter to prefect.Task.map
which will fully process n
number of the iterables provided to potentially alleviate this issue. If this makes sense and somewhat sounds good I would be happy to post a github issue as a feature request for better tracking. This is more for updating in the case others get similar issues and for feedback if you had any.An Hoang
04/27/2020, 7:00 PMdask_jobqueue
connect to an LSFCluster
. Do you mind if I PM and ask you some questions?Jackson Maxfield Brown
04/27/2020, 7:00 PM