Hi everyone, I was wondering if anyone knew a way ...
# prefect-community
r
Hi everyone, I was wondering if anyone knew a way to set a
resource_manager
cleanup
method upstream of another task?
k
Hey @Rhys Mansal, what are you trying to do specifically?
r
prevent another task running until the EMR cluster I am using has been torn down.
e
Not tested, but I think something like this can work
Copy code
with EMRCluster(...) as cluster:
    pass
donwstream_task(upstream_tasks=[cluster.cleanup_task])
r
Thanks @emre, I will give it a try
k
I think so too, am testing
Copy code
with EMRCluster(...) as cluster:
    pass
donwstream_task(upstream_tasks=[cluster.cleanup])
Doesn’t seem to work for me I’ll have to dig
r
thanks @Kevin Kho
e
I found this, its kinda shabby, but
EMRCluster
only returns the setup task when used with the
with
keyword.
Copy code
cluster_def = EMRCluster(...)
with cluster_def as cluster:
    pass
donwstream_task(upstream_tasks=[cluster_def.cleanup_task])
r
It should be fine. Thank you very much @emre and @Kevin Kho for the help.
k
Ah nice! I was trying something similar and it works
Copy code
from prefect import resource_manager, Flow, task
import prefect

@resource_manager
class DaskCluster:
    def __init__(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        "Create a local dask cluster"
        return

    def cleanup(self, client):
        "Cleanup the local dask cluster"
        import time
        time.sleep(10)
        <http://prefect.context.logger.info|prefect.context.logger.info>("cleanup")

@task
def abc():
    <http://prefect.context.logger.info|prefect.context.logger.info>("abc")
    return

@task
def bcd():
    <http://prefect.context.logger.info|prefect.context.logger.info>("bcd")
    return

with Flow("...") as flow:
    cluster = DaskCluster(2)
    with cluster:
        x = abc()

    bcd(upstream_tasks=[cluster.cleanup_task])

flow.run()