https://prefect.io logo
Title
j

Johnathan Nguyen

08/29/2022, 1:33 PM
Hi Community, After starting a dask cluster, I have registered my flow by executing : 'python my_flow.py'. I got scheduler_address = gateway://dask-gateway-server:8000/74c3991658104aa68665c11ac52e767a After that I can execute the flow without problem. If I recreate a new dask cluster, I got a new scheduler_address gateway://dask-gateway-server:8000/fb67ee2be228422eafb66d2e73344e29 But when I execute the flow, it does not recreate a DaskExecutor with the new scheduler_address. Is there any way to refresh the daskExecutor on running flow ? In my project, I don't want to re-register everytime I start a new dask cluster. Thank you for your help. the script my_flow.py in thread :
1
1️⃣ 1
j

Jeff Hale

08/29/2022, 3:38 PM
Hi Jonathan. Good question. I’ll look around a bit on Slack and GitHub issues as I think other folks have mentioned this issue. In the meantime, are you able to please move your code to this thread to help keep the channel tidy?
j

Johnathan Nguyen

08/29/2022, 4:22 PM
Ok, thank you for your help ! Here the code :
def generate_dask_executor(address_of_gateway: str):
    gateway = Gateway(
        address=address_of_gateway,
        auth=auth.BasicAuth(username="username", password="password"),
    )

    list_clusters = gateway.list_clusters()
    cluster_name = list_clusters[0].name
    cluster = gateway.get_cluster(cluster_name=cluster_name)

    # it returns something like this : <gateway://dask-gateway-server:8000/74c3991658104aa68665c11ac52e767a>
    scheduler_address = cluster.scheduler_address
    security = cluster.security

    return DaskExecutor(
        address=scheduler_address,
        client_kwargs={"security": security,
    )


with Flow(
    "my_flow_name",
    storage=S3(
            bucket=s3_params.get("bucket"),
            client_options=s3_params.get("client_options"),
        ),
    executor=generate_dask_executor(address_of_gateway=os.getenv("address_of_gateway")), # <http://dask-gateway-server:8000>
    run_config=DockerRun(image=prefect_flow_run_image),
) as flow:
    hw = hello_world()
    hw2 = hello_world2()

if __name__ == "__main__":
    flow_id = flow.register(project_name="my_dask_project")
a

Anna Geller

08/29/2022, 10:53 PM
if you couldn't find the solution yet, it might be worth asking in Dask forum https://dask.discourse.group/
j

Johnathan Nguyen

08/30/2022, 6:06 AM
I can do that thanks, but I think it is more a Prefect issue, because it seems that the informations of the DaskExecutor is registered in the Storage. In the following screenshot, the id of the dask cluster corresponds to the id at registration time. But it no longer exists since I stopped it and recreated another cluster. In other words how can I use a new dask executor at running time without registration ?
a

Anna Geller

08/30/2022, 11:13 AM
You have two options: use on demand Dask cluster which gets terminated once the flow run completes, or use your self hosted cluster and you can then provide the same address to several flows that need it
j

Johnathan Nguyen

08/30/2022, 11:47 AM
Ok thank you for your help, I will go for the first option.
a

Anna Geller

08/30/2022, 1:02 PM
Nice 👍