Rhys Mansal
05/09/2022, 1:54 PMresource_managercleanupKevin Kho
Rhys Mansal
05/09/2022, 1:59 PMemre
05/09/2022, 2:05 PMwith EMRCluster(...) as cluster:
    pass
donwstream_task(upstream_tasks=[cluster.cleanup_task])Rhys Mansal
05/09/2022, 2:05 PMKevin Kho
with EMRCluster(...) as cluster:
    pass
donwstream_task(upstream_tasks=[cluster.cleanup])Kevin Kho
Rhys Mansal
05/09/2022, 2:12 PMemre
05/09/2022, 2:22 PMEMRClusterwithcluster_def = EMRCluster(...)
with cluster_def as cluster:
    pass
donwstream_task(upstream_tasks=[cluster_def.cleanup_task])Rhys Mansal
05/09/2022, 2:24 PMKevin Kho
from prefect import resource_manager, Flow, task
import prefect
@resource_manager
class DaskCluster:
    def __init__(self, n_workers):
        self.n_workers = n_workers
    def setup(self):
        "Create a local dask cluster"
        return
    def cleanup(self, client):
        "Cleanup the local dask cluster"
        import time
        time.sleep(10)
        <http://prefect.context.logger.info|prefect.context.logger.info>("cleanup")
@task
def abc():
    <http://prefect.context.logger.info|prefect.context.logger.info>("abc")
    return
@task
def bcd():
    <http://prefect.context.logger.info|prefect.context.logger.info>("bcd")
    return
with Flow("...") as flow:
    cluster = DaskCluster(2)
    with cluster:
        x = abc()
    bcd(upstream_tasks=[cluster.cleanup_task])
flow.run()