https://prefect.io logo
Title
s

Sam Werbalowsky

10/29/2021, 2:37 PM
Hello - I am mostly deployed and things are working well - running prefect server with DaskExecutor. I have created a custom wrapper around the Flow class so that I can easily set the run environment for local development vs production flows. I have a function that is creating the dask scheduler/worker through dask gateway - all works, EXCEPT that the dask cluster is also spun up on registration. Is there an easy way to prevent this? I am wondering if I have to make a custom dask class and use that as part of the executor, rather than a custom function that creates it as part of initiating the flow.
I’m basically following the advanced dask configuration here, except that logic to create the cluster is part of the init of the flow - https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html
k

Kevin Kho

10/29/2021, 2:44 PM
If it is spun up during registration, are you calling flow.run during registration?
Ohhh my bad. I see what happened
You have this as part of your script right? So the cluster is created just by running the Python script.
gateway = Gateway()
cluster = gateway.new_cluster()
executor = DaskExecutor(
    address=cluster.scheduler_address,
    client_kwargs={"security": cluster.security}
)
I think I would try passing a callable to the DaskExecutor and wrapping it.
def create_cluster():
    gateway = Gateway()
    cluster = gateway.new_cluster()

executor = DaskExecutor(create_cluster)
:upvote: 1
Actually this is slightly wrong. It doesn’t seem you can pass a function. It needs to be a class. I think there is a
GatewayCluster
you can use
a

Anna Geller

10/29/2021, 2:49 PM
Just to add: this documentation page may be helpful - it shows how to pass various options depending on your Dask setup https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster
s

Sam Werbalowsky

10/29/2021, 2:51 PM
Ah - this seems like the key piece maybe
This can be either a string specifying the import path to the cluster class (e.g. "dask_cloudprovider.aws.FargateCluster"), the cluster class itself, or a function for creating a custom cluster
:upvote: 1
so I can maybe just try passing the function there
k

Kevin Kho

10/29/2021, 2:51 PM
Maybe you can try this GatewayCluster
flow.executor = DaskExecutor(
    cluster_class=GatewayCluster(),
    cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
Ah ok good catch. I guess the function might work
a

Anna Geller

10/29/2021, 2:52 PM
yeah I have one example showing this with Fargate:
import prefect
from dask_cloudprovider.aws import FargateCluster # or ECSClusterpre

def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)

flow.executor = DaskExecutor(
    cluster_class=fargate_cluster,
    cluster_kwargs={"n_workers": 4}
)
:thank-you: 1
s

Sam Werbalowsky

10/29/2021, 2:53 PM
got it, so I think it’s probably going to be like
dask_gateway.GatewayCluster
or something in this case (psuedo-code)
k

Kevin Kho

10/29/2021, 2:55 PM
Yep that’s right.
:thank-you: 1
s

Sam Werbalowsky

10/29/2021, 6:18 PM
Confirmed that just passing
dask_gateway.GatewayCluster
with proper params works - I don’t think there is a way to set the number of workers without scaling it, but I was able to set the adapt pretty easily.
👍 1
k

Kevin Kho

10/29/2021, 6:22 PM
Yeah I’m not seeing it either looking at the docs. I think you’re right