Johnathan Nguyen
08/29/2022, 1:33 PMJeff Hale
08/29/2022, 3:38 PMJohnathan Nguyen
08/29/2022, 4:22 PMdef generate_dask_executor(address_of_gateway: str):
gateway = Gateway(
address=address_of_gateway,
auth=auth.BasicAuth(username="username", password="password"),
)
list_clusters = gateway.list_clusters()
cluster_name = list_clusters[0].name
cluster = gateway.get_cluster(cluster_name=cluster_name)
# it returns something like this : <gateway://dask-gateway-server:8000/74c3991658104aa68665c11ac52e767a>
scheduler_address = cluster.scheduler_address
security = cluster.security
return DaskExecutor(
address=scheduler_address,
client_kwargs={"security": security,
)
with Flow(
"my_flow_name",
storage=S3(
bucket=s3_params.get("bucket"),
client_options=s3_params.get("client_options"),
),
executor=generate_dask_executor(address_of_gateway=os.getenv("address_of_gateway")), # <http://dask-gateway-server:8000>
run_config=DockerRun(image=prefect_flow_run_image),
) as flow:
hw = hello_world()
hw2 = hello_world2()
if __name__ == "__main__":
flow_id = flow.register(project_name="my_dask_project")
Anna Geller
08/29/2022, 10:53 PMJohnathan Nguyen
08/30/2022, 6:06 AMAnna Geller
08/30/2022, 11:13 AMJohnathan Nguyen
08/30/2022, 11:47 AMAnna Geller
08/30/2022, 1:02 PM