r

    Riley Hun

    1 year ago
    Hello - Sorry if this is a bad question, but we want to use Dask Gateway as the executor for our Prefect Flows, but I believe the clusters generated by Dask Gateway delete themselves when in idle. So, if we have a flow that runs maybe once a day, the flow will likely fail the following day because there's no Dask Cluster to execute it (it shut down when not in use). Is there a workaround for this such that we can tell Prefect to start up a cluster from Dask Gateway each time it runs its flow, or do we have to ensure that the clusters from Dask Gateway do not delete themselves when in idle?
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Hey @Riley Hun, Does the DaskExecutor not meet your needs? AFAIK Dask clusters were designed to be ephemeral and can be spun up dynamically per Flow Run with the DaskExecutor. In either case, you can certainly use
    dask_gateway
    to spin up a cluster manually or for advanced configurations - check out this advanced tutorial demonstrating both methods (Gateway method is under
    Advanced Dask Configuration
    ).
    r

    Riley Hun

    1 year ago
    Okay thanks @Kyle Moon-Wright - maybe I'm just second guessing myself. I did check out the docs and did something like this in my deploy script. I'm just fearful that it will only spin up the dask cluster once and not for each time the flow runs
    secret_client = secretmanager.SecretManagerServiceClient()
    response = secret_client.access_secret_version(
        name="<path to secrets>")
    secrets = json.loads(response.payload.data.decode('UTF-8'))
    
    auth = BasicAuth(password=secrets['dask_gateway_secret'])
    gateway = Gateway(
        address=args.dask_proxy_address,
        auth=auth
    )
    options = gateway.cluster_options()
    options.image = args.dask_gateway_docker_image
    cluster = gateway.new_cluster(options)
    
    for flow in flows:
        flow.storage = storage
        flow.environment.executor = DaskExecutor(
            address=args.dask_scheduler_address,
            client_kwargs={'security': cluster.security}
        )
        path = storage.add_flow(flow)
        print(f'storing flow {flow.name} at {path} at the image.')
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Hmm, this looks good to me but let us know if you see anything strange. Each flow here will certainly have it’s own executor to spin up its cluster in this case, however it might get stuck if the previous cluster wasn’t spun down properly (using the same address), but that might be a later issue.