hi, I have a prefect 2 flow. the flow uses dask ta...
# ask-community
a
hi, I have a prefect 2 flow. the flow uses dask task runner. The flow calls 3 subflows. I want to reuse the dask cluster between the parent flow and the subflows. this is especially important when I set up an ephemeral dask cluster on fargate for example - I want to set it up once and have all the subflows reuse it. how do I do that?
d
One possibility is creating the cluster (which DaskTaskRunner creates by default) outside your flows or in an outer flow, and passing the scheduler address as an argument to any flow which needs to pass the cluster down a level. It would look something like this:
Copy code
from distributed import LocalCluster
from prefect import flow
from prefect_dask import DaskTaskRunner

@flow
def subflow():
    pass

@flow
def parent_flow(cluster_address):
    subflow.with_options(task_runner=DaskTaskRunner(cluster_address))()

@flow
def outer_flow():
    cluster = LocalCluster()
    parent_flow.with_options(
        task_runner=DaskTaskRunner(cluster.scheduler_address)
    )(cluster.scheduler_address)

if __name__ == "__main__":
    outer_flow()