Daniel Katz
11/09/2021, 8:17 PMDaskExecutor
and pass a parameter like num_dask_workers
which would be used when initializing the DaskExecutor
at run-time. Is it possible to parametrize a flows Executor properties?Kevin Kho
Zanie
flow.executor
then peek at parameters in the context from thereZanie
Zanie
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
return DaskExecutor(n_workers=prefect.context.parameters["n_workers"])
with Flow(..., executor=dynamic_executor) as flow:
n_workers = Parameter("n_workers", default=5)
Daniel Katz
11/09/2021, 8:23 PMKevin Kho
Daniel Katz
11/09/2021, 8:35 PMUnexpected error: AttributeError("'FunctionTask' object has no attribute 'start'")
Kevin Kho
Daniel Katz
11/09/2021, 8:42 PMDaniel Katz
11/09/2021, 8:43 PMKevin Kho
executor=get_dask_executor()
Kevin Kho
Kevin Kho
Daniel Katz
11/09/2021, 8:52 PMDaniel Katz
11/09/2021, 8:52 PMKevin Kho
Zanie
Zanie
Kevin Kho
Zanie
Zanie
Zanie
Anna Geller
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow(
"dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
flow.add_task(Parameter("n_workers", default=5))
Daniel Katz
11/09/2021, 10:31 PMAn Hoang
11/11/2021, 5:34 PMDaskExecutor
LocalDaskExecutor
and LocalExecutor
?
I did:
def dynamic_executor():#executor_type: Literal["local_dask", "cluster_dask", "default"] = "default"):
executor_type = context.parameters["executor"]
if executor_type == "local_dask":
executor = LocalDaskExecutor()
elif executor_type == "cluster_dask":
executor = DaskExecutor('<tcp://172.18.102.9:8789>')
elif executor_type == "default":
executor = None
return executor
with Flow("OVP_step2", executor = dynamic_executor) as dev_flow:
dev_flow.add_task(Parameter("executor", default = "default"))
And the error was 'AttributeError: 'function' object has no attribute 'start'
like @Kevin Kho said, I think this is because parameters is populated after the executor is set up. Any way around this?Anna Geller
Zanie
Zanie
cluster_class
of a DaskExecutor
.Zanie
Chris Arderne
11/12/2021, 4:04 PMaddress
parameter of DaskExecutor
?Zanie
Zanie
An Hoang
01/06/2022, 1:46 AMcluster.scale(n_workers)
inside dynamic_LSF_dask_executor
but it also didn't work
The same configurations work when I create a Dask cluster separately and then provide address at flow-creation time. Any clue as to why?
What I have for the Dask executor:
#export
import dask
import dask_jobqueue
print(f"{dask.__version__=}")
print(f"{dask_jobqueue.__version__=}")
#export
from dask_jobqueue import LSFCluster
import prefect
from prefect.executors import DaskExecutor
#export
DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE = 45
DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER = 5
DEFAULT_QUEUE = "all_corradin"
DEFAULT_MAXIMUM_WORKERS = 47 * 17
#export
def dynamic_LSF_dask_executor():
#cores_per_machine = 45
cores_per_machine = prefect.context.parameters["cores_per_machine"]
#processes_per_worker = 5
processes_per_worker = prefect.context.parameters["processes_per_worker"]
queue = prefect.context.parameters["queue"]
n_workers = prefect.context.parameters["n_workers"]
workers_per_machine = round(cores_per_machine/processes_per_worker)
cluster = LSFCluster(queue=queue,
cores= cores_per_machine,
processes = workers_per_machine,
n_workers = n_workers,
walltime='9000:00',
memory='500GB',
local_directory = "/tmp",
job_extra=['-o /dev/null', '-e /dev/null'],
)
logging = prefect.context.get("logger")
logging.debug(f"Dask cluster started")
logging.debug(f"\n{cores_per_machine=}\n{processes_per_worker=}\n{queue=}\n{n_workers=}\n")
logging.debug(f"Go to this link after running the above to see the dashboard {cluster.dashboard_link}")
return cluster
LSF_dask_executor = DaskExecutor(cluster_class = dynamic_LSF_dask_executor, adapt_kwargs = {"maximum" : DEFAULT_MAXIMUM_WORKERS})
In my flow:
with Flow("OVP_step2", executor = LSF_dask_executor) as step2_flow:
#other params...
....
#parameters used in dynamic executor functions
step2_flow.add_task(Parameter("cores_per_machine", default = DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE))
step2_flow.add_task(Parameter("processes_per_worker", default = DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER))
step2_flow.add_task(Parameter("n_workers", default = DEFAULT_MAXIMUM_WORKERS))
step2_flow.add_task(Parameter("queue", default = DEFAULT_QUEUE))
Kevin Kho
An Hoang
01/06/2022, 1:58 AMn_workers
in the LSFCluster
creation is 799. I don't see any dask-worker
jobs submitted in my LSF HPC like it would normally do eitherKevin Kho
cluster = LSFCluster(...)
when you do it yourself and pass the address?An Hoang
01/06/2022, 2:09 AMcluster = dynamic_LSF_dask_executor(); cluster.scale(799)
Kevin Kho
An Hoang
01/06/2022, 2:22 AMAn Hoang
01/06/2022, 2:28 AMadaptive_kwargs
, I can see my Parameters
tasks on the dashboard now, before it was emptyAn Hoang
01/06/2022, 2:28 AMKevin Kho
An Hoang
01/06/2022, 2:43 AMcluster.scale
from within the callable. Thank you so much for the help Kevin! Especially at this hour!!!Kevin Kho
An Hoang
01/06/2022, 2:56 AMflow.run()
is stuck forever... This never happened with the premade Dask cluster. Problem for tomorrow....~😅Kevin Kho
An Hoang
01/06/2022, 2:58 AMflow.run()
being doneKevin Kho