Hey just following up on this. It's not working for me. The logs in cluster creation prints and I can access the dashboard, see the scheduler but there's no workers starting up even after I wait for a long time. I even added
cluster.scale(n_workers)
inside
dynamic_LSF_dask_executor
but it also didn't work
The same configurations work when I create a Dask cluster separately and then provide address at flow-creation time. Any clue as to why?
What I have for the Dask executor:
#export
import dask
import dask_jobqueue
print(f"{dask.__version__=}")
print(f"{dask_jobqueue.__version__=}")
#export
from dask_jobqueue import LSFCluster
import prefect
from prefect.executors import DaskExecutor
#export
DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE = 45
DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER = 5
DEFAULT_QUEUE = "all_corradin"
DEFAULT_MAXIMUM_WORKERS = 47 * 17
#export
def dynamic_LSF_dask_executor():
#cores_per_machine = 45
cores_per_machine = prefect.context.parameters["cores_per_machine"]
#processes_per_worker = 5
processes_per_worker = prefect.context.parameters["processes_per_worker"]
queue = prefect.context.parameters["queue"]
n_workers = prefect.context.parameters["n_workers"]
workers_per_machine = round(cores_per_machine/processes_per_worker)
cluster = LSFCluster(queue=queue,
cores= cores_per_machine,
processes = workers_per_machine,
n_workers = n_workers,
walltime='9000:00',
memory='500GB',
local_directory = "/tmp",
job_extra=['-o /dev/null', '-e /dev/null'],
)
logging = prefect.context.get("logger")
logging.debug(f"Dask cluster started")
logging.debug(f"\n{cores_per_machine=}\n{processes_per_worker=}\n{queue=}\n{n_workers=}\n")
logging.debug(f"Go to this link after running the above to see the dashboard {cluster.dashboard_link}")
return cluster
LSF_dask_executor = DaskExecutor(cluster_class = dynamic_LSF_dask_executor, adapt_kwargs = {"maximum" : DEFAULT_MAXIMUM_WORKERS})
In my flow:
with Flow("OVP_step2", executor = LSF_dask_executor) as step2_flow:
#other params...
....
#parameters used in dynamic executor functions
step2_flow.add_task(Parameter("cores_per_machine", default = DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE))
step2_flow.add_task(Parameter("processes_per_worker", default = DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER))
step2_flow.add_task(Parameter("n_workers", default = DEFAULT_MAXIMUM_WORKERS))
step2_flow.add_task(Parameter("queue", default = DEFAULT_QUEUE))