I’ve tried using `DaskExecutor` in the past but it...
# prefect-community
a
I’ve tried using
DaskExecutor
in the past but it was even worse because every now and then the execution froze and I stopped receiving logs so I left it for the time being because it’s enough for me to run on a strong machine and don’t need a cluster yet.
l
Hi @Avi A, I’m taking a look, I agree it looks like OpenBLAS is trying to spawn new threads but is unable to do so. What version of prefect and OpenBLAS are you on?
Digging a bit more,
RLIMIT_NPROC
is an OS limit for how many threads can spawn, so it makes sense that openBLAS can’t make any more if you’ve hit the OS limit as the log suggests. It looks like some other people on the internet tried to debug this type of issue but limiting the number of threads a single openBLAS worker can spawn with some environment variables (https://github.com/xianyi/OpenBLAS#setting-the-number-of-threads-using-environment-variables)
a
prefect - v 0.12.4 how do I check openBLAS version?
I think I’ve been getting these since I’ve upgraded prefect. Does that make sense? Does something make Prefect open more threads?
l
Not sure — openBLAS is from your task code, so you or someone along the way installed it on your workers; it’s not a dependency of prefect. It looks from the README that it is a single binary, so hopefully some CLI like
openblas version
or similar would work
I’ll take a look at the Prefect changelog, I don’t think anything has increased the number of threads necessary to run LocalDaskExecutor (or any other executor for that matter) but I will take a look!
Ok, here are some hints: • 2 months ago the LocalDaskScheduler defaulted to ‘threads’ instead of ‘synchronous’ • 10 days ago the threads for that scheduler were changed to be instantiated as a multiprocessing.pool.ThreadPool (https://github.com/PrefectHQ/prefect/commit/b5d85684d46939a55b28cfe626a2930583af2a72#diff-45f9c04cc175792b97145f1f3b8faea1R425) instead of whatever it was before (digging into that side to see what that has changed)
a
ok so for the second one - I’m already setting to
threads
explicitly. The second issue seems to be related. Not clear though why my processes need that many threads. Is it opening a thread for each task separately and fails to close it? Not sure
I tried to move to a
DaskScheduler
but I’m getting “Too many open files”; tried increasing the open files limit to about 1M and it’s still complaining
l
If I’m following the line here properly the
ThreadPool
in Prefect will start with the default for
Pool
, as many ‘workers’ (threads in the case of ThreadPool) as cpu_count() (which definitely doesn’t seem like it should get up to 837332, haha)
a
when I’m running the flow with less mapped tasks (about 60) it works, but when I run the full thing with no limit I get these errors in the middle of execution
l
Yeah, personally my guess is if OpenBLAS is spawning multiple new threads for every task it is being called in, and it is being called in every mapped task, depending how many OpenBLAS goes after we could get up pretty high (837332 still seems extreme but I suppose I don’t know your machine size / flow size / workload very intimately so it could be possible). If your CPU count is very high, then it’s possible the change 10 days ago to set the Prefect Thread pool to cpu_count significantly increased the amount of parallelism you are experiencing
a
it seems that
numpy
is the one using openBLAS 🙂 . still not sure why it’s exhausting the thread pool
Copy code
>>> import numpy
>>> print(numpy.show_config())
blas_mkl_info:
  NOT AVAILABLE
blis_info:
  NOT AVAILABLE
openblas_info:
    libraries = ['openblas', 'openblas']
    library_dirs = ['/usr/local/lib']
    language = c
    define_macros = [('HAVE_CBLAS', None)]
l
Gotcha, tricky numpy always throwing in the curveballs haha 🙂 Can I suggest you try limiting OpenBLAS’s thread greediness with
export OPENBLAS_NUM_THREADS=1
where your workers are, to see if we can see any difference or not? (or there’s some sort of runtime technique too: https://github.com/xianyi/OpenBLAS#setting-the-number-of-threads-at-runtime)
a
actually I’ve reverted the
prefect
version to have my stuff running 😞
sometime in the next few days I will make time to upgrade again and see if limiting OpenBLAS helps in any way, and will follow up here
l
Ok gotcha! Ya any experiments you have time to do on that front is helpful info 🙂
🚀 1
j
Nesting threading environments (threaded python calling threaded blas) is generally frowned upon no matter whether you're using prefect or not - having nested levels of parallelism can lead to resource contention and slower execution in general. For dask we generally recommend users disable threaded blas (e.g. using
OPENBLAS_NUM_THREADS=1
).
IIUC you're saying that your code worked before upgrading prefect, and now no longer works? Nothing in the latest couple of releases should have changed how prefect allocates threads, so I'm surprised by this. If you could provide a reproducible example for us that would be very useful.
Dask nor prefect nor openblas should be terribly thread hungry, so if you're hitting these limits that's the sign of a bug in some codebase (prefect, dask, your code, etc...). A reproducible example would be helpful here, as this isn't an issue I've seen before.
a
following up on this: my original post was using
LocalDaskExecutor
, but I also had “too many open files” error when using regular
DaskExecutor
. Here’s a minimal example
Copy code
@task
def extract(max_size=100):
    return list(range(max_size))

@task
def transform(x, c=2):
    sleep(0.5)
    return x*c

@task
def sum_task(l):
    return sum(l)


with Flow("Test Dask too many files") as flow:
    data_size = Parameter("data_size", default=100)
    data = extract(data_size)
    data2 = transform.map(data, unmapped(2))
    final = sum_task(data2)
Running this flow fails after completing 55 tasks, saying
Unexpected error: OSError(24, 'Too many open files')
. Then, if I try to submit another task, it fails immediately with the same error. So it appears that something is preventing Dask from closing the open files when running prefect stuff on it. More info: OS: Ubuntu 18.04.4 prefect 0.12.4 dask / distributed: 2.21.0 Machine has 4 cores (not sure if relevant) File limit is extremely high:
Copy code
root@dask-scheduler1:/home/avi/research# ulimit -n
1048576
another update: If I run the above example with
max_size=30
it finishes successfully. Then if I submit it again with
max_size=30
without restarting the dask worker, it fails with the “too many files” error! :(
yet another update: The files that Dask scheduler is overflowing with are internal connections:
Copy code
root@dask-scheduler1:~# lsof -a -p 7610 | tail -2
dask-sche 7610 root  261u     IPv4             342284      0t0     TCP XXXXX:8786->XXXXX:41768 (ESTABLISHED)
dask-sche 7610 root  262u     IPv4             342286      0t0     TCP XXXXX:8786->XXXXX:41770 (ESTABLISHED)
I honestly don’t know why it creates hundreds of these connections 😞
yet another update! I’ve reverted to
prefect==0.12.1
and everything is ok now. I’m suspecting @Jeremiah’s change with the edges on the execution flow (Jeremiah, apologies if I’m spamming you for no reason) https://github.com/PrefectHQ/prefect/pull/2898 WDYT?
j
Sorry I haven’t been following this thread - but that PR hasn’t been merged yet
a
oops I’m sorry. I thought I was looking at the merged code but I was checked out on that branch… I hope the others will be able to help, I’m pretty sure I won’t be the only one having this issue when upgrading to
0.12.4
.
j
👌no worries
j
Hmmm, this is odd. Is this running with cloud/server (
flow.register()
) or just with core
flow.run()
? Dask can take up a few file descriptors, but not that many.
a
happens on both
try it yourself with the code I provided
j
I'm unable to reproduce.
a
hmmm….
I’ll provide my
supervisord
config
j
Since this is an OS-interaction issue, if you wouldn't mind creating a reproducible example with a docker image (at least a
Dockerfile
to produce one), that'd help. I don't think anything shy of that would be helpful in reproducing.
a
Copy code
root@dask-scheduler1:~# cat /etc/supervisor/conf.d/10_dask.conf

[program:dask_scheduler]
command=dask-scheduler
user=root
environment=OPENBLAS_NUM_THREADS=1

[program:dask_worker]
command=dask-worker localhost:8786 --nprocs 1
environment=OPENBLAS_NUM_THREADS=1
directory=/home/avi
user=root
I have no idea how to create a docker image with my config
can you guide me through it?
j
Maybe. One question - it looks like you have a long running dask cluster that you're running flows against. Is that accurate? Do you run into issues immediately, or only if the cluster has been running for a while?
a
immediately
I ran the toy example I provided on the cluster without running my original flow…
j
Can you try without using that dask cluster, and instead using a temporary cluster? Something like
Copy code
from prefect.engine.executors import DaskExecutor

...

flow.run(executor=DaskExecutor())
a
is that the same as running with
LocalDaskExecutor
?
j
No.
LocalDaskExecutor
uses dask's local schedulers,
DaskExecutor()
(with no arguments) uses the distributed scheduler running on one machine.
a
ok wait I’m working on reproducing with docker
👍 1
ok @Jim Crist-Harif. I’m trying with docker locally and it’s even worse because on version 0.12.3 it doesn’t even run 😞 I hope the following helps in reproducing 1st terminal (scheduler):
Copy code
docker run -it --name scheduler -e EXTRA_PIP_PACKAGES="prefect==0.12.3" -p 8787:8787 -p 8786:8786 --network dask daskdev/dask dask-scheduler
2nd terminal (worker) - note the prefect version
Copy code
docker run -it --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" daskdev/dask dask-worker scheduler:8786
on the 3rd terminal, run the script (from a virtualenv with
prefect==0.12.3
installed:
Copy code
from time import sleep

import prefect
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.executors import DaskExecutor


@task
def extract(max_size=100):
    prefect.context.get('logger').info("extracting")
    return list(range(max_size))

@task
def transform(x, c=2, sleep_time=0.5):
    prefect.context.get('logger').info("transforming %d", x)
    sleep(sleep_time)
    return x*c

@task
def sum_task(l):
    return sum(l)


if __name__ == '__main__':
    with Flow("Test Dask too many files") as flow:
        data_size = Parameter("data_size", default=100)
        sleep_time = Parameter("sleep_time", default=0.1)
        data = extract(data_size)
        data2 = transform.map(data, unmapped(2), unmapped(sleep_time))
        # data3 = transform.map(data2)
        final = sum_task(data2)

    flow.run(executor=DaskExecutor(address='<tcp://localhost:8786>'))
If you do the same with
prefect==0.12.2
then you’ll find it works perfectly
unfortunately I have to leave now and won’t be at my station until tomorrow. I hope you’ll be able to reproduce this!
j
Hmmm, I'm still unable to reproduce - I did as described above exactly, except I also ran the provided flow in a third container with the same image.
Everything ran perfectly fine.
and it’s even worse because on version 0.12.3 it doesn’t even run
Doesn't run in what way (there's lots of ways something might not run)? My first guess here is a networking issue where your docker networking setup fails to connect the scheduler to the client. Running inside a container with the client connecting to
<tcp://scheduler:8786>
worked fine for me.
Exact steps taken: Depending on setup, you may need to manually create the docker network first
Copy code
docker network create dask
Scheduler
Copy code
docker run -it --rm --name scheduler --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" -p 8787:8787 -p 8786:8786 daskdev/dask dask-scheduler
Worker
Copy code
docker run -it --rm --name worker --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" daskdev/dask dask-worker <tcp://scheduler:8786>
Client
Copy code
cat test.py | docker run -i --rm --name client --network dask -e EXTRA_PIP_PACKAGES="prefect==0.12.3" daskdev/dask python
Where
test.py
is
Copy code
from time import sleep
import prefect
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.executors import DaskExecutor


@task
def extract(max_size=100):
    prefect.context.get('logger').info("extracting")
    return list(range(max_size))


@task
def transform(x, c=2, sleep_time=0.5):
    prefect.context.get('logger').info("transforming %d", x)
    sleep(sleep_time)
    return x*c


@task
def sum_task(l):
    return sum(l)


if __name__ == '__main__':
    with Flow("Test Dask too many files") as flow:
        data_size = Parameter("data_size", default=100)
        sleep_time = Parameter("sleep_time", default=0.1)
        data = extract(data_size)
        data2 = transform.map(data, unmapped(2), unmapped(sleep_time))
        final = sum_task(data2)

    flow.run(executor=DaskExecutor(address='<tcp://scheduler:8786>'))
This did turn up a potential issue where prefect >= 0.12.3 when used with a long running dask cluster (happens when an address is passed to
DaskExecutor
instead of letting
DaskExecutor
create and cleanup the cluster itself) will create and cleanup a new dask connection for every task. This doesn't appear to be leaking connections (at least in the current dask release), but isn't ideal - we should be able to use a long running dask connection for all tasks instead. Will fix. This might be the source of your issue, but at least in my tests it doesn't seem to leak file descriptors like you're seeing, it's just less efficient than it could be. Also note that if you use a temporary dask cluster per flow run (let
DaskExecutor
startup/shutdown the cluster itself), this code path goes away. In general I recommend doing this when possible, a long running dask cluster is generally an antipattern IMO (but should still work if needed).
While fixing the above, I did finally reproduce your issue. It only happens when using a temporary dask cluster on certain systems, leading to a bug in dask that creates multiple clients rather than reusing the existing one (dask is doing the wrong thing here, not prefect, will also need to push a patch upstream). PR coming soon.
a
Hey Jim, thanks for investigating! That’s awesome! When I said “doesn’t work” I meant that it hangs after a while. In the worker logs I see it reports timeouts when connecting to the scheduler. This error propagates back to the prefect agent. But downgrading to
0.12.2
it worked perfectly (naturally I recreated the docker images with that version, so no “long-running” effect here). That said, I’m not sure how this is a Dask issue, given I’ve only changed the prefect version, but I hope you fix will fix it. Are you saying that I should be using
DaskCluster()
and let prefect spawn a cluster instead of setting it up on my own? That would create a local cluster, right? If that’s the case then how will I be able to create a multi-node cluster?
j
Are you saying that I should be using
DaskCluster()
 
and let prefect spawn a cluster instead of setting it up on my own?
That would create a local cluster, right? If that’s the case then how
will I be able to create a multi-node cluster?
That's my recommendation, yes. By default
DaskCluster
will create a local cluster, but you can customize the type of temporary cluster by passing in
cluster_class
and
cluster_kwargs
. See the docs for more information: https://docs.prefect.io/api/latest/engine/executors.html#daskexecutor
🤔 1
That said, I’m not sure how this is a Dask issue, given I’ve only changed the prefect version, but I hope you fix will fix it.
There's an upstream bug in dask that only shows up on some deployment setups (yours included). Prefect 0.12.3 updated some code that let you run into that upstream bug. The new release 0.12.5 changes how we use dask to avoid this issue until it's patched upstream. Please try the new 0.12.5 release to see if it resolves your problem.
👏🏼 1