Thread
#prefect-community
    Tom Forbes

    Tom Forbes

    1 year ago
    In the documentation here it shows how you would configure Dask using a gateway rather than using a temporary cluster:
    gateway = Gateway()
    cluster = gateway.new_cluster()
    executor = DaskExecutor(
        address=cluster.scheduler_address,
        client_kwargs={"security": cluster.security}
    )
    flow.run(executor=executor)
    How is this supposed to work with flows that use Docker storage? The specific executor needs to be resolved at import time, and using the example code in the docs it would mean creating a cluster at import time.
    Kevin Kho

    Kevin Kho

    1 year ago
    You might be able to pass the the
    dask_gateway.GatewayCluster
    callable into
    DaskExecutor
    with something like
    executor = DaskExecutor(
        cluster_class="dask_gateway.GatewayCluster",
        cluster_kwargs={
            "image": "prefecthq/prefect:latest",
            "n_workers": 5,
            ...
        },
    )
    As Callable:
    executor = DaskExecutor(
        cluster_class=dask_gateway.GatewayCluster,
        cluster_kwargs={
            "image": "prefecthq/prefect:latest",
            "n_workers": 5,
            ...
        },
    )
    Tom Forbes

    Tom Forbes

    1 year ago
    Thank you! 💪