https://prefect.io logo
Title
r

Rhys Mansal

05/09/2022, 1:54 PM
Hi everyone, I was wondering if anyone knew a way to set a
resource_manager
cleanup
method upstream of another task?
k

Kevin Kho

05/09/2022, 1:58 PM
Hey @Rhys Mansal, what are you trying to do specifically?
r

Rhys Mansal

05/09/2022, 1:59 PM
prevent another task running until the EMR cluster I am using has been torn down.
e

emre

05/09/2022, 2:05 PM
Not tested, but I think something like this can work
with EMRCluster(...) as cluster:
    pass
donwstream_task(upstream_tasks=[cluster.cleanup_task])
r

Rhys Mansal

05/09/2022, 2:05 PM
Thanks @emre, I will give it a try
k

Kevin Kho

05/09/2022, 2:06 PM
I think so too, am testing
with EMRCluster(...) as cluster:
    pass
donwstream_task(upstream_tasks=[cluster.cleanup])
Doesn’t seem to work for me I’ll have to dig
r

Rhys Mansal

05/09/2022, 2:12 PM
thanks @Kevin Kho
e

emre

05/09/2022, 2:22 PM
I found this, its kinda shabby, but
EMRCluster
only returns the setup task when used with the
with
keyword.
cluster_def = EMRCluster(...)
with cluster_def as cluster:
    pass
donwstream_task(upstream_tasks=[cluster_def.cleanup_task])
r

Rhys Mansal

05/09/2022, 2:24 PM
It should be fine. Thank you very much @emre and @Kevin Kho for the help.
k

Kevin Kho

05/09/2022, 2:25 PM
Ah nice! I was trying something similar and it works
from prefect import resource_manager, Flow, task
import prefect

@resource_manager
class DaskCluster:
    def __init__(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        "Create a local dask cluster"
        return

    def cleanup(self, client):
        "Cleanup the local dask cluster"
        import time
        time.sleep(10)
        <http://prefect.context.logger.info|prefect.context.logger.info>("cleanup")

@task
def abc():
    <http://prefect.context.logger.info|prefect.context.logger.info>("abc")
    return

@task
def bcd():
    <http://prefect.context.logger.info|prefect.context.logger.info>("bcd")
    return

with Flow("...") as flow:
    cluster = DaskCluster(2)
    with cluster:
        x = abc()

    bcd(upstream_tasks=[cluster.cleanup_task])

flow.run()