https://prefect.io logo
r

Riley Hun

01/11/2021, 6:43 PM
Hello - Sorry if this is a bad question, but we want to use Dask Gateway as the executor for our Prefect Flows, but I believe the clusters generated by Dask Gateway delete themselves when in idle. So, if we have a flow that runs maybe once a day, the flow will likely fail the following day because there's no Dask Cluster to execute it (it shut down when not in use). Is there a workaround for this such that we can tell Prefect to start up a cluster from Dask Gateway each time it runs its flow, or do we have to ensure that the clusters from Dask Gateway do not delete themselves when in idle?
k

Kyle Moon-Wright

01/11/2021, 7:02 PM
Hey @Riley Hun, Does the DaskExecutor not meet your needs? AFAIK Dask clusters were designed to be ephemeral and can be spun up dynamically per Flow Run with the DaskExecutor. In either case, you can certainly use
dask_gateway
to spin up a cluster manually or for advanced configurations - check out this advanced tutorial demonstrating both methods (Gateway method is under
Advanced Dask Configuration
).
r

Riley Hun

01/11/2021, 8:01 PM
Okay thanks @Kyle Moon-Wright - maybe I'm just second guessing myself. I did check out the docs and did something like this in my deploy script. I'm just fearful that it will only spin up the dask cluster once and not for each time the flow runs
Copy code
secret_client = secretmanager.SecretManagerServiceClient()
response = secret_client.access_secret_version(
    name="<path to secrets>")
secrets = json.loads(response.payload.data.decode('UTF-8'))

auth = BasicAuth(password=secrets['dask_gateway_secret'])
gateway = Gateway(
    address=args.dask_proxy_address,
    auth=auth
)
options = gateway.cluster_options()
options.image = args.dask_gateway_docker_image
cluster = gateway.new_cluster(options)

for flow in flows:
    flow.storage = storage
    flow.environment.executor = DaskExecutor(
        address=args.dask_scheduler_address,
        client_kwargs={'security': cluster.security}
    )
    path = storage.add_flow(flow)
    print(f'storing flow {flow.name} at {path} at the image.')
k

Kyle Moon-Wright

01/11/2021, 8:24 PM
Hmm, this looks good to me but let us know if you see anything strange. Each flow here will certainly have it’s own executor to spin up its cluster in this case, however it might get stuck if the previous cluster wasn’t spun down properly (using the same address), but that might be a later issue.