Sam Werbalowsky
10/29/2021, 2:37 PMKevin Kho
gateway = Gateway()
cluster = gateway.new_cluster()
executor = DaskExecutor(
address=cluster.scheduler_address,
client_kwargs={"security": cluster.security}
)
I think I would try passing a callable to the DaskExecutor and wrapping it.
def create_cluster():
gateway = Gateway()
cluster = gateway.new_cluster()
executor = DaskExecutor(create_cluster)
GatewayCluster
you can useAnna Geller
Sam Werbalowsky
10/29/2021, 2:51 PMThis can be either a string specifying the import path to the cluster class (e.g. "dask_cloudprovider.aws.FargateCluster"), the cluster class itself, or a function for creating a custom cluster
Kevin Kho
flow.executor = DaskExecutor(
cluster_class=GatewayCluster(),
cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
Anna Geller
import prefect
from dask_cloudprovider.aws import FargateCluster # or ECSClusterpre
def fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(n_workers=n_workers, image=prefect.context.image)
flow.executor = DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={"n_workers": 4}
)
Sam Werbalowsky
10/29/2021, 2:53 PMdask_gateway.GatewayCluster
or something in this case (psuedo-code)Kevin Kho
Sam Werbalowsky
10/29/2021, 6:18 PMdask_gateway.GatewayCluster
with proper params works - I don’t think there is a way to set the number of workers without scaling it, but I was able to set the adapt pretty easily.Kevin Kho