Thread
#prefect-community
    Rhys Mansal

    Rhys Mansal

    4 months ago
    Hi everyone, I was wondering if anyone knew a way to set a
    resource_manager
    cleanup
    method upstream of another task?
    Kevin Kho

    Kevin Kho

    4 months ago
    Hey @Rhys Mansal, what are you trying to do specifically?
    Rhys Mansal

    Rhys Mansal

    4 months ago
    prevent another task running until the EMR cluster I am using has been torn down.
    emre

    emre

    4 months ago
    Not tested, but I think something like this can work
    with EMRCluster(...) as cluster:
        pass
    donwstream_task(upstream_tasks=[cluster.cleanup_task])
    Rhys Mansal

    Rhys Mansal

    4 months ago
    Thanks @emre, I will give it a try
    Kevin Kho

    Kevin Kho

    4 months ago
    I think so too, am testing
    with EMRCluster(...) as cluster:
        pass
    donwstream_task(upstream_tasks=[cluster.cleanup])
    Doesn’t seem to work for me I’ll have to dig
    Rhys Mansal

    Rhys Mansal

    4 months ago
    thanks @Kevin Kho
    emre

    emre

    4 months ago
    I found this, its kinda shabby, but
    EMRCluster
    only returns the setup task when used with the
    with
    keyword.
    cluster_def = EMRCluster(...)
    with cluster_def as cluster:
        pass
    donwstream_task(upstream_tasks=[cluster_def.cleanup_task])
    Rhys Mansal

    Rhys Mansal

    4 months ago
    It should be fine. Thank you very much @emre and @Kevin Kho for the help.
    Kevin Kho

    Kevin Kho

    4 months ago
    Ah nice! I was trying something similar and it works
    from prefect import resource_manager, Flow, task
    import prefect
    
    @resource_manager
    class DaskCluster:
        def __init__(self, n_workers):
            self.n_workers = n_workers
    
        def setup(self):
            "Create a local dask cluster"
            return
    
        def cleanup(self, client):
            "Cleanup the local dask cluster"
            import time
            time.sleep(10)
            <http://prefect.context.logger.info|prefect.context.logger.info>("cleanup")
    
    @task
    def abc():
        <http://prefect.context.logger.info|prefect.context.logger.info>("abc")
        return
    
    @task
    def bcd():
        <http://prefect.context.logger.info|prefect.context.logger.info>("bcd")
        return
    
    with Flow("...") as flow:
        cluster = DaskCluster(2)
        with cluster:
            x = abc()
    
        bcd(upstream_tasks=[cluster.cleanup_task])
    
    flow.run()