Hi Community, After starting a dask cluster, I ha...
# prefect-community
j
Hi Community, After starting a dask cluster, I have registered my flow by executing : 'python my_flow.py'. I got scheduler_address = gateway://dask-gateway-server:8000/74c3991658104aa68665c11ac52e767a After that I can execute the flow without problem. If I recreate a new dask cluster, I got a new scheduler_address gateway://dask-gateway-server:8000/fb67ee2be228422eafb66d2e73344e29 But when I execute the flow, it does not recreate a DaskExecutor with the new scheduler_address. Is there any way to refresh the daskExecutor on running flow ? In my project, I don't want to re-register everytime I start a new dask cluster. Thank you for your help. the script my_flow.py in thread :
1
1️⃣ 1
j
Hi Jonathan. Good question. I’ll look around a bit on Slack and GitHub issues as I think other folks have mentioned this issue. In the meantime, are you able to please move your code to this thread to help keep the channel tidy?
j
Ok, thank you for your help ! Here the code :
Copy code
def generate_dask_executor(address_of_gateway: str):
    gateway = Gateway(
        address=address_of_gateway,
        auth=auth.BasicAuth(username="username", password="password"),
    )

    list_clusters = gateway.list_clusters()
    cluster_name = list_clusters[0].name
    cluster = gateway.get_cluster(cluster_name=cluster_name)

    # it returns something like this : <gateway://dask-gateway-server:8000/74c3991658104aa68665c11ac52e767a>
    scheduler_address = cluster.scheduler_address
    security = cluster.security

    return DaskExecutor(
        address=scheduler_address,
        client_kwargs={"security": security,
    )


with Flow(
    "my_flow_name",
    storage=S3(
            bucket=s3_params.get("bucket"),
            client_options=s3_params.get("client_options"),
        ),
    executor=generate_dask_executor(address_of_gateway=os.getenv("address_of_gateway")), # <http://dask-gateway-server:8000>
    run_config=DockerRun(image=prefect_flow_run_image),
) as flow:
    hw = hello_world()
    hw2 = hello_world2()

if __name__ == "__main__":
    flow_id = flow.register(project_name="my_dask_project")
a
if you couldn't find the solution yet, it might be worth asking in Dask forum https://dask.discourse.group/
j
I can do that thanks, but I think it is more a Prefect issue, because it seems that the informations of the DaskExecutor is registered in the Storage. In the following screenshot, the id of the dask cluster corresponds to the id at registration time. But it no longer exists since I stopped it and recreated another cluster. In other words how can I use a new dask executor at running time without registration ?
a
You have two options: use on demand Dask cluster which gets terminated once the flow run completes, or use your self hosted cluster and you can then provide the same address to several flows that need it
j
Ok thank you for your help, I will go for the first option.
a
Nice 👍