Avi A
07/16/2020, 12:27 PMDaskExecutor
in the past but it was even worse because every now and then the execution froze and I stopped receiving logs so I left it for the time being because it’s enough for me to run on a strong machine and don’t need a cluster yet.Laura Lorenz (she/her)
07/16/2020, 1:32 PMRLIMIT_NPROC
is an OS limit for how many threads can spawn, so it makes sense that openBLAS can’t make any more if you’ve hit the OS limit as the log suggests. It looks like some other people on the internet tried to debug this type of issue but limiting the number of threads a single openBLAS worker can spawn with some environment variables (https://github.com/xianyi/OpenBLAS#setting-the-number-of-threads-using-environment-variables)Avi A
07/16/2020, 1:42 PMLaura Lorenz (she/her)
07/16/2020, 1:49 PMopenblas version
or similar would workAvi A
07/16/2020, 1:57 PMthreads
explicitly. The second issue seems to be related. Not clear though why my processes need that many threads. Is it opening a thread for each task separately and fails to close it? Not sureDaskScheduler
but I’m getting “Too many open files”; tried increasing the open files limit to about 1M and it’s still complainingLaura Lorenz (she/her)
07/16/2020, 2:04 PMThreadPool
in Prefect will start with the default for Pool
, as many ‘workers’ (threads in the case of ThreadPool) as cpu_count() (which definitely doesn’t seem like it should get up to 837332, haha)Avi A
07/16/2020, 2:09 PMLaura Lorenz (she/her)
07/16/2020, 2:14 PMAvi A
07/16/2020, 2:20 PMnumpy
is the one using openBLAS 🙂 . still not sure why it’s exhausting the thread pool>>> import numpy
>>> print(numpy.show_config())
blas_mkl_info:
NOT AVAILABLE
blis_info:
NOT AVAILABLE
openblas_info:
libraries = ['openblas', 'openblas']
library_dirs = ['/usr/local/lib']
language = c
define_macros = [('HAVE_CBLAS', None)]
Laura Lorenz (she/her)
07/16/2020, 2:22 PMexport OPENBLAS_NUM_THREADS=1
where your workers are, to see if we can see any difference or not? (or there’s some sort of runtime technique too: https://github.com/xianyi/OpenBLAS#setting-the-number-of-threads-at-runtime)Avi A
07/16/2020, 2:27 PMprefect
version to have my stuff running 😞Laura Lorenz (she/her)
07/16/2020, 2:29 PMJim Crist-Harif
07/16/2020, 10:33 PMOPENBLAS_NUM_THREADS=1
).Avi A
07/21/2020, 9:39 AMLocalDaskExecutor
, but I also had “too many open files” error when using regular DaskExecutor
. Here’s a minimal example
@task
def extract(max_size=100):
return list(range(max_size))
@task
def transform(x, c=2):
sleep(0.5)
return x*c
@task
def sum_task(l):
return sum(l)
with Flow("Test Dask too many files") as flow:
data_size = Parameter("data_size", default=100)
data = extract(data_size)
data2 = transform.map(data, unmapped(2))
final = sum_task(data2)
Running this flow fails after completing 55 tasks, saying Unexpected error: OSError(24, 'Too many open files')
.
Then, if I try to submit another task, it fails immediately with the same error. So it appears that something is preventing Dask from closing the open files when running prefect stuff on it. More info:
OS: Ubuntu 18.04.4
prefect 0.12.4
dask / distributed: 2.21.0
Machine has 4 cores (not sure if relevant)
File limit is extremely high:
root@dask-scheduler1:/home/avi/research# ulimit -n
1048576
max_size=30
it finishes successfully. Then if I submit it again with max_size=30
without restarting the dask worker, it fails with the “too many files” error! :(root@dask-scheduler1:~# lsof -a -p 7610 | tail -2
dask-sche 7610 root 261u IPv4 342284 0t0 TCP XXXXX:8786->XXXXX:41768 (ESTABLISHED)
dask-sche 7610 root 262u IPv4 342286 0t0 TCP XXXXX:8786->XXXXX:41770 (ESTABLISHED)
I honestly don’t know why it creates hundreds of these connections 😞prefect==0.12.1
and everything is ok now. I’m suspecting @Jeremiah’s change with the edges on the execution flow (Jeremiah, apologies if I’m spamming you for no reason)
https://github.com/PrefectHQ/prefect/pull/2898
WDYT?Jeremiah
07/21/2020, 1:42 PMAvi A
07/21/2020, 1:46 PM0.12.4
.Jeremiah
07/21/2020, 1:51 PMJim Crist-Harif
07/21/2020, 2:53 PMflow.register()
) or just with core flow.run()
? Dask can take up a few file descriptors, but not that many.Avi A
07/21/2020, 2:55 PMJim Crist-Harif
07/21/2020, 2:56 PMAvi A
07/21/2020, 2:56 PMsupervisord
configJim Crist-Harif
07/21/2020, 2:57 PMDockerfile
to produce one), that'd help. I don't think anything shy of that would be helpful in reproducing.Avi A
07/21/2020, 2:57 PMroot@dask-scheduler1:~# cat /etc/supervisor/conf.d/10_dask.conf
[program:dask_scheduler]
command=dask-scheduler
user=root
environment=OPENBLAS_NUM_THREADS=1
[program:dask_worker]
command=dask-worker localhost:8786 --nprocs 1
environment=OPENBLAS_NUM_THREADS=1
directory=/home/avi
user=root
Jim Crist-Harif
07/21/2020, 3:00 PMAvi A
07/21/2020, 3:01 PMJim Crist-Harif
07/21/2020, 3:02 PMfrom prefect.engine.executors import DaskExecutor
...
flow.run(executor=DaskExecutor())
Avi A
07/21/2020, 3:04 PMLocalDaskExecutor
?Jim Crist-Harif
07/21/2020, 3:05 PMLocalDaskExecutor
uses dask's local schedulers, DaskExecutor()
(with no arguments) uses the distributed scheduler running on one machine.Avi A
07/21/2020, 3:07 PMdocker run -it --name scheduler -e EXTRA_PIP_PACKAGES="prefect==0.12.3" -p 8787:8787 -p 8786:8786 --network dask daskdev/dask dask-scheduler
2nd terminal (worker) - note the prefect version
docker run -it --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" daskdev/dask dask-worker scheduler:8786
on the 3rd terminal, run the script (from a virtualenv with prefect==0.12.3
installed:
from time import sleep
import prefect
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.executors import DaskExecutor
@task
def extract(max_size=100):
prefect.context.get('logger').info("extracting")
return list(range(max_size))
@task
def transform(x, c=2, sleep_time=0.5):
prefect.context.get('logger').info("transforming %d", x)
sleep(sleep_time)
return x*c
@task
def sum_task(l):
return sum(l)
if __name__ == '__main__':
with Flow("Test Dask too many files") as flow:
data_size = Parameter("data_size", default=100)
sleep_time = Parameter("sleep_time", default=0.1)
data = extract(data_size)
data2 = transform.map(data, unmapped(2), unmapped(sleep_time))
# data3 = transform.map(data2)
final = sum_task(data2)
flow.run(executor=DaskExecutor(address='<tcp://localhost:8786>'))
If you do the same with prefect==0.12.2
then you’ll find it works perfectlyJim Crist-Harif
07/21/2020, 4:07 PMand it’s even worse because on version 0.12.3 it doesn’t even runDoesn't run in what way (there's lots of ways something might not run)? My first guess here is a networking issue where your docker networking setup fails to connect the scheduler to the client. Running inside a container with the client connecting to
<tcp://scheduler:8786>
worked fine for me.docker network create dask
Scheduler
docker run -it --rm --name scheduler --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" -p 8787:8787 -p 8786:8786 daskdev/dask dask-scheduler
Worker
docker run -it --rm --name worker --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" daskdev/dask dask-worker <tcp://scheduler:8786>
Client
cat test.py | docker run -i --rm --name client --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" daskdev/dask python
Where test.py
is
from time import sleep
import prefect
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.executors import DaskExecutor
@task
def extract(max_size=100):
prefect.context.get('logger').info("extracting")
return list(range(max_size))
@task
def transform(x, c=2, sleep_time=0.5):
prefect.context.get('logger').info("transforming %d", x)
sleep(sleep_time)
return x*c
@task
def sum_task(l):
return sum(l)
if __name__ == '__main__':
with Flow("Test Dask too many files") as flow:
data_size = Parameter("data_size", default=100)
sleep_time = Parameter("sleep_time", default=0.1)
data = extract(data_size)
data2 = transform.map(data, unmapped(2), unmapped(sleep_time))
final = sum_task(data2)
flow.run(executor=DaskExecutor(address='<tcp://scheduler:8786>'))
DaskExecutor
instead of letting DaskExecutor
create and cleanup the cluster itself) will create and cleanup a new dask connection for every task. This doesn't appear to be leaking connections (at least in the current dask release), but isn't ideal - we should be able to use a long running dask connection for all tasks instead. Will fix.
This might be the source of your issue, but at least in my tests it doesn't seem to leak file descriptors like you're seeing, it's just less efficient than it could be.
Also note that if you use a temporary dask cluster per flow run (let DaskExecutor
startup/shutdown the cluster itself), this code path goes away. In general I recommend doing this when possible, a long running dask cluster is generally an antipattern IMO (but should still work if needed).Avi A
07/22/2020, 8:26 AM0.12.2
it worked perfectly (naturally I recreated the docker images with that version, so no “long-running” effect here).
That said, I’m not sure how this is a Dask issue, given I’ve only changed the prefect version, but I hope you fix will fix it.
Are you saying that I should be using DaskCluster()
and let prefect spawn a cluster instead of setting it up on my own? That would create a local cluster, right? If that’s the case then how will I be able to create a multi-node cluster?Jim Crist-Harif
07/22/2020, 1:40 PMAre you saying that I should be usingDaskCluster()
and let prefect spawn a cluster instead of setting it up on my own?
That would create a local cluster, right? If that’s the case then how
will I be able to create a multi-node cluster?That's my recommendation, yes. By default
DaskCluster
will create a local cluster, but you can customize the type of temporary cluster by passing in cluster_class
and cluster_kwargs
. See the docs for more information: https://docs.prefect.io/api/latest/engine/executors.html#daskexecutorThat said, I’m not sure how this is a Dask issue, given I’ve only changed the prefect version, but I hope you fix will fix it.There's an upstream bug in dask that only shows up on some deployment setups (yours included). Prefect 0.12.3 updated some code that let you run into that upstream bug. The new release 0.12.5 changes how we use dask to avoid this issue until it's patched upstream. Please try the new 0.12.5 release to see if it resolves your problem.