Hi! I am running prefect on an on prem HTC cluster...
# prefect-community
s
Hi! I am running prefect on an on prem HTC cluster using 
HTCondor
 as workload manager and using a 
DaskExecutor
 connected to a cluster started with  
dask jobqueue
. I am mapping a function over a large number of images  (20000). I started running a subset of data (5000) using a predefine number of workers and the function is mapped and run correctly using all the workers available. If the cluster is started in adaptive  mode using 
dask jobqueue
  prefect is able to increase the number of processes run as expected by the adaptive mode and monitored in the workload manager however the size of the dask cluster doesn't change and the function isn't mapped, not even to the minimum number of workers that are predefined in the cluster. Interestingly 
HTCondor
 allocates new running processing but it seems to be independent from the dask cluster. It seems that the 
dask jobqueue
  initialised cluster cannot communicate with the processes started by prefect. After few minutes the processes started by prefect die printing out this error:
Copy code
distributed.scheduler - ERROR - Workers don't have promised key: ['<tcp://192.168.0.2:40093>']
# the tcp address change
Any help will be greatly appreciated! Thanks!
j
Hi Simone, are you saying that when run in adaptive mode your cluster starts new jobs for workers to scale up, but those workers never successfully connect? This sounds like a potential bug in distributed/dask-jobqueue. The first thing I'd check is if you can use dask-jobqueue without prefect, and if the cluster still works in adaptive mode. Creating a dask-jobqueue cluster in adaptive mode, creating a
distributed.Client
to connect to it, and then calling
client.map(time.sleep, [30] * 1000)
(or something like that) should be sufficient to test. This would help rule out bugs upstream, or issues with your setup. If that all works appropriately, I'd want to know: • What version of
dask
,
distributed
, and
dask-jobqueue
are you using? • What is your
DaskExecutor
configuration? The log about workers not having a promised key is a bit odd - that appears to be conflating a scheduler address with a key, which isn't a codepath that should ever occur.
s
Hi Jim, thanks a lot for the reply! I agree with you it seems that the scheduler cannot communicate with the workers. dask-jobqueue without prefect works fine. also in adaptive mode. dask version 2.14.0 dask-jobqueue 0.7.1 distributed 2.19.0 cluster parameters: cores: 1 memory: 10GB disk: 0.1GB local_directory: /tmp from dask_jobqueue.htcondor import HTCondorCluster cluster = HTCondorCluster(cores=cores, memory=memory, disk=disk,local_directory=local_directory,death_timeout=2400) cluster.adapt(minimum_jobs=32) for the DaskExecutor I just connect it to the tcp address of the cluster. ´with Flow("filtering-counting",environment=LocalEnvironment(DaskExecutor(address='tcp://193.10.16.58:27069')), storage=Local(directory='/home/simone/tmp_code/flows')) as flow:' I tested to add a maximum_jobs and interestingly enough the number of workers increase and the function get mapped to 1/10 of the images i am processing and run in parallel but then it stops and this time no error.
j
Hmmm, interesting. What version of prefect?
s
prefect version 0.13.14
j
Ok. If you run prefect like described above, what behavior do you see (trying to clarify from the initial report)? • Do the dask workers start correctly and connect to the scheduler? Prefect shouldn't be able to affect that, managing of the dask cluster is entirely in dask's realm • Do the new dask workers get allocated work? You might open the dask dashboard for the cluster and view to see if the new workers are picking up tasks. • You said you saw error messages on the workers about missing promised keys. Do those workers exit (they shouldn't from that error alone)? • Are your tasks memory intensive? Are you seeing workers dieing due to resource usage at all? What happens if you run a lightweight task (e.g.
time.sleep
) instead?
The error message you're seeing indicates an unhealthy dask cluster, but its not something that prefect should be able to cause (the message indicates that a dask worker is providing inaccurate information about its state to the scheduler). It might indicate a bug in upstream distributed or dask-jobqueue (if it has to do with the cluster setup), not sure.
s
To answer your questions: I monitor what is going on by looking at the number of HTCondor running jobs (in theory one job = 1 core in my setup) and the size of the dask cluster. • ´*Do the dask workers start correctly and connect to the scheduler?* I see an increase number of htcondor jobs running but not an increase number of workers in the cluster. If I define a maximum_jobs then the number of workers also increase in the clusters. • Do the new dask workers get allocated work? If I don't define a maximum_jobs no works get done. If I define it some work get done (not all cores) and the stops and number of workers goes down to minimum • You said you saw error messages on the workers about missing promised keys. Do those workers exit (they shouldn't from that error alone)? If I define a maximum_jobs nothing happen but if I don't i get the error and the workers exit. My guess is that the scheduler cannot communicate with the workers and because nothing is happening htcondor take over and reallocate the resouces....just a guess HERE IS THE KEY POINT: • Are your tasks memory intensive? My task are quite memory intensive. If i run a
time.sleep
task things seems to work. I get more workers, the function get mapped to the number of workers and everything is fine. I am just puzzled because I run this jobs before in another setup and had no issue with memory. I run a test doubling the memory per worker to check. This doesn't fully fix the issue.The function get mapped but only to a subset of workers, after processing around 1/10 of the data it stops and number of workers is scaled down (no error). In addition, there is still a mismatch between the dask cluster size and the number of jobs reported in htcondor, I will make sure my settings are correct and if this is the case I may miss something or the issue may be with dask-jobqueue I really appreciate the time you spent on this. Thank you so much for helping me out. Both dask and prefect made a big impact in my daily routing in analysing microscopy images and I am really thankful for all the work you are putting into them. Thanks a lot!