Hi folks, question about the `ResourceManager` an...
# ask-community
m
Hi folks, question about the
ResourceManager
and using
set_upstream
How can I ensure that a task is run after the resource manager cleanup ? (Please see the thread for more details)
this is the canonical example provided in the docs
Copy code
with Flow("dask-example") as flow:
    n_workers = Parameter("n_workers", default=None)
    out_path = Parameter("out_path", default="summary.csv")

    with DaskCluster(n_workers=n_workers) as client:
        # These tasks rely on a dask cluster to run, so we create them inside
        # the `DaskCluster` resource manager
        df = load_data()
        summary = summarize(df, client)

    # This task doesn't rely on the dask cluster to run, so it doesn't need to
    # be under the `DaskCluster` context
    write_csv(summary, out_path)
I would like
write_csv
to only run after
summarize
is run and more importantly after the
DaskCluster.cleanup
is run ….
using set_upstream against summary is not sufficent:
Copy code
with Flow("dask-example") as flow:
    n_workers = Parameter("n_workers", default=None)
    out_path = Parameter("out_path", default="summary.csv")

    with DaskCluster(n_workers=n_workers) as client:
        # These tasks rely on a dask cluster to run, so we create them inside
        # the `DaskCluster` resource manager
        df = load_data()
        summary = summarize(df, client)

    # This task doesn't rely on the dask cluster to run, so it doesn't need to
    # be under the `DaskCluster` context
    written = write_csv(summary, out_path)
    written.set_upstream(summary)
Given this won’t set
cleanup_task
as an upstream to
written
looks to me that one solution is:
Copy code
with Flow("dask-example") as flow:
    n_workers = Parameter("n_workers", default=None)
    out_path = Parameter("out_path", default="summary.csv")
    
    client = DaskCluster(n_workers=n_workers)
    with client:
        # These tasks rely on a dask cluster to run, so we create them inside
        # the `DaskCluster` resource manager
        df = load_data()
        summary = summarize(df, client)

    # This task doesn't rely on the dask cluster to run, so it doesn't need to
    # be under the `DaskCluster` context
    written = write_csv(summary, out_path)
    written.set_upstream([summary, client.cleanup_task])
is this the recommended approach ?
k
Did that work for you? I think that looks good to me
m
Apologies for the very late reply - yes it worked, thank you