Hello Prefect community! I would like to be able t...
# ask-community
d
Hello Prefect community! I would like to be able to run a flow using the
DaskExecutor
and pass a parameter like
num_dask_workers
which would be used when initializing the
DaskExecutor
at run-time. Is it possible to parametrize a flows Executor properties?
k
Hey @Daniel Katz, unfortunately not. It’s quite tricky. There is a hack outlined in this Github issue.
z
Hey @Daniel Katz -- you can pass a callable that returns an executor as the
flow.executor
then peek at parameters in the context from there
Kevin has put doubt in my heart 😄 but I believe I've recommend this successfully before. Something like...
Copy code
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor

def dynamic_executor():
    return DaskExecutor(n_workers=prefect.context.parameters["n_workers"])
    
with Flow(..., executor=dynamic_executor) as flow:
    n_workers = Parameter("n_workers", default=5)
d
ill give it a shot right now thx both @Zanie @Kevin Kho
k
I believe Michael’s suggestion will work
d
hmm, im getting this error
Copy code
Unexpected error: AttributeError("'FunctionTask' object has no attribute 'start'")
k
Could you share you code snippet?
d
flow.py
@Kevin Kho ^
k
You need to call it:
Copy code
executor=get_dask_executor()
But it’s not working for other reasons
Yeah it looks like parameters is populated after the executor is set up
d
ok, got it
thx @Kevin Kho
k
@Zanie could confirm
z
You shouldn't call it, we'll call it at flow runtime & the parameters should be populated if run with an agent I think
I'm very confused by your error and code. It looks like you had a task decorator on your executor function
k
I replicated with the same code snippet. There was no task above the executor
z
Weird. I'm ooo at the moment. I can take a look when I'm back.
Sorry I gave you the wrong syntax 🙃
a
@Daniel Katz translating Michael’s example into your use case would be:
Copy code
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor


def dynamic_executor():
    from distributed import LocalCluster

    # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])


with Flow(
    "dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
    flow.add_task(Parameter("n_workers", default=5))
🙏 1
🙌 1
d
worked! thx @Anna Geller @Zanie
a
@Anna Geller @Zanie what if I want to switch between
DaskExecutor
LocalDaskExecutor
and
LocalExecutor
? I did:
Copy code
def dynamic_executor():#executor_type: Literal["local_dask", "cluster_dask", "default"] = "default"):
    executor_type = context.parameters["executor"]
    if executor_type == "local_dask":
        executor = LocalDaskExecutor()
    elif executor_type == "cluster_dask":
        executor = DaskExecutor('<tcp://172.18.102.9:8789>')
    elif executor_type == "default":
        executor = None
    return executor

with Flow("OVP_step2", executor = dynamic_executor) as dev_flow:
    dev_flow.add_task(Parameter("executor", default = "default"))
And the error was
'AttributeError: 'function' object has no attribute 'start'
like @Kevin Kho said, I think this is because parameters is populated after the executor is set up. Any way around this?
a
@An Hoang this dynamic_executor function should return a dask cluster class, not a DaskExecutor class - your function currently returns executor
👍 1
z
Unfortunately, you can't do this.
The syntax there is not supported, you can only pass a callable to the
cluster_class
of a
DaskExecutor
.
We'll support this in Orion in the next release.
👍 1
🙌 4
c
I take it this pattern isn't supported for the
address
parameter of
DaskExecutor
?
z
Nope.
I think you could hack it, but it'd be pretty goofy.
a
Hey just following up on this. It's not working for me. The logs in cluster creation prints and I can access the dashboard, see the scheduler but there's no workers starting up even after I wait for a long time. I even added
cluster.scale(n_workers)
inside
dynamic_LSF_dask_executor
but it also didn't work The same configurations work when I create a Dask cluster separately and then provide address at flow-creation time. Any clue as to why? What I have for the Dask executor:
Copy code
#export
import dask
import dask_jobqueue
print(f"{dask.__version__=}")
print(f"{dask_jobqueue.__version__=}")

#export
from dask_jobqueue import LSFCluster
import prefect
from prefect.executors import DaskExecutor

#export

DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE = 45
DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER = 5
DEFAULT_QUEUE = "all_corradin"
DEFAULT_MAXIMUM_WORKERS = 47 * 17

#export

def dynamic_LSF_dask_executor():
    #cores_per_machine = 45
    cores_per_machine = prefect.context.parameters["cores_per_machine"]
    
    #processes_per_worker = 5
    processes_per_worker = prefect.context.parameters["processes_per_worker"]
    
    queue = prefect.context.parameters["queue"]
    
    n_workers = prefect.context.parameters["n_workers"]
    
    workers_per_machine = round(cores_per_machine/processes_per_worker)

    cluster = LSFCluster(queue=queue,
                             cores= cores_per_machine,
                             processes = workers_per_machine,
                             n_workers = n_workers,
                             walltime='9000:00',
                             memory='500GB',
                             local_directory = "/tmp",
                             job_extra=['-o /dev/null', '-e /dev/null'],
                             )
    logging = prefect.context.get("logger")
    logging.debug(f"Dask cluster started")
    logging.debug(f"\n{cores_per_machine=}\n{processes_per_worker=}\n{queue=}\n{n_workers=}\n")
    logging.debug(f"Go to this link after running the above to see the dashboard {cluster.dashboard_link}")
    return cluster


LSF_dask_executor = DaskExecutor(cluster_class = dynamic_LSF_dask_executor, adapt_kwargs = {"maximum" : DEFAULT_MAXIMUM_WORKERS})
In my flow:
Copy code
with Flow("OVP_step2", executor =  LSF_dask_executor) as step2_flow:
    #other params...
    ....
    #parameters used in dynamic executor functions
    step2_flow.add_task(Parameter("cores_per_machine", default = DEFAULT_DASK_CLUSTER_CORES_PER_MACHINE))
    step2_flow.add_task(Parameter("processes_per_worker", default = DEFAULT_DASK_CLUSTER_PROCESSES_PER_WORKER))
    step2_flow.add_task(Parameter("n_workers", default = DEFAULT_MAXIMUM_WORKERS))
    step2_flow.add_task(Parameter("queue", default = DEFAULT_QUEUE))
k
How many workers are you adding first before the adapt?
a
n_workers
in the
LSFCluster
creation is 799. I don't see any
dask-worker
jobs submitted in my LSF HPC like it would normally do either
k
I can’t see any reason why it would be different, but it’s also the first time someone came back to be about an HPC cluster. Do you just start it with:
Copy code
cluster = LSFCluster(...)
when you do it yourself and pass the address?
a
yep! basically
cluster = dynamic_LSF_dask_executor(); cluster.scale(799)
k
It’s hard for me to isolate if it’s because of the LSF cluster or because of the callable. Let me verify the parameter approach works later. Do you Prefect logs show anything? Do you hit this line?
a
Yep, I do
When I remove the
adaptive_kwargs
, I can see my
Parameters
tasks on the dashboard now, before it was empty
k
Ah i see. Then I guess just directly use cluster.scale?
a
yay it works now! I didn't have to use
cluster.scale
from within the callable. Thank you so much for the help Kevin! Especially at this hour!!!
k
Of course! I am happy to finally have proof HPC Clusters will work as a Dask Executor.
a
~Oh wait maybe I spoke too soon. So now all the tasks are completed on the dask dashboard but the
flow.run()
is stuck forever... This never happened with the premade Dask cluster. Problem for tomorrow....~😅
k
Ah ok
a
Nevermind! Double speak too soon. It took 15 mins-ish from the tasks being done on the Dask dashboard to when 
flow.run()
 being done
k
Ah lol