s

    Sam Werbalowsky

    10 months ago
    Hello - I am mostly deployed and things are working well - running prefect server with DaskExecutor. I have created a custom wrapper around the Flow class so that I can easily set the run environment for local development vs production flows. I have a function that is creating the dask scheduler/worker through dask gateway - all works, EXCEPT that the dask cluster is also spun up on registration. Is there an easy way to prevent this? I am wondering if I have to make a custom dask class and use that as part of the executor, rather than a custom function that creates it as part of initiating the flow.
    I’m basically following the advanced dask configuration here, except that logic to create the cluster is part of the init of the flow - https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html
    Kevin Kho

    Kevin Kho

    10 months ago
    If it is spun up during registration, are you calling flow.run during registration?
    Ohhh my bad. I see what happened
    You have this as part of your script right? So the cluster is created just by running the Python script.
    gateway = Gateway()
    cluster = gateway.new_cluster()
    executor = DaskExecutor(
        address=cluster.scheduler_address,
        client_kwargs={"security": cluster.security}
    )
    I think I would try passing a callable to the DaskExecutor and wrapping it.
    def create_cluster():
        gateway = Gateway()
        cluster = gateway.new_cluster()
    
    executor = DaskExecutor(create_cluster)
    Actually this is slightly wrong. It doesn’t seem you can pass a function. It needs to be a class. I think there is a
    GatewayCluster
    you can use
    Anna Geller

    Anna Geller

    10 months ago
    Just to add: this documentation page may be helpful - it shows how to pass various options depending on your Dask setup https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster
    s

    Sam Werbalowsky

    10 months ago
    Ah - this seems like the key piece maybe
    This can be either a string specifying the import path to the cluster class (e.g. "dask_cloudprovider.aws.FargateCluster"), the cluster class itself, or a function for creating a custom cluster
    so I can maybe just try passing the function there
    Kevin Kho

    Kevin Kho

    10 months ago
    Maybe you can try this GatewayCluster
    flow.executor = DaskExecutor(
        cluster_class=GatewayCluster(),
        cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
    )
    Ah ok good catch. I guess the function might work
    Anna Geller

    Anna Geller

    10 months ago
    yeah I have one example showing this with Fargate:
    import prefect
    from dask_cloudprovider.aws import FargateCluster # or ECSClusterpre
    
    def fargate_cluster(n_workers=4):
        """Start a fargate cluster using the same image as the flow run"""
        return FargateCluster(n_workers=n_workers, image=prefect.context.image)
    
    flow.executor = DaskExecutor(
        cluster_class=fargate_cluster,
        cluster_kwargs={"n_workers": 4}
    )
    s

    Sam Werbalowsky

    10 months ago
    got it, so I think it’s probably going to be like
    dask_gateway.GatewayCluster
    or something in this case (psuedo-code)
    Kevin Kho

    Kevin Kho

    10 months ago
    Yep that’s right.
    s

    Sam Werbalowsky

    10 months ago
    Confirmed that just passing
    dask_gateway.GatewayCluster
    with proper params works - I don’t think there is a way to set the number of workers without scaling it, but I was able to set the adapt pretty easily.
    Kevin Kho

    Kevin Kho

    10 months ago
    Yeah I’m not seeing it either looking at the docs. I think you’re right