Rhys Mansal
05/09/2022, 1:54 PMresource_manager
cleanup
method upstream of another task?Kevin Kho
05/09/2022, 1:58 PMRhys Mansal
05/09/2022, 1:59 PMemre
05/09/2022, 2:05 PMwith EMRCluster(...) as cluster:
pass
donwstream_task(upstream_tasks=[cluster.cleanup_task])
Rhys Mansal
05/09/2022, 2:05 PMKevin Kho
05/09/2022, 2:06 PMwith EMRCluster(...) as cluster:
pass
donwstream_task(upstream_tasks=[cluster.cleanup])
Rhys Mansal
05/09/2022, 2:12 PMemre
05/09/2022, 2:22 PMEMRCluster
only returns the setup task when used with the with
keyword.
cluster_def = EMRCluster(...)
with cluster_def as cluster:
pass
donwstream_task(upstream_tasks=[cluster_def.cleanup_task])
Rhys Mansal
05/09/2022, 2:24 PMKevin Kho
05/09/2022, 2:25 PMfrom prefect import resource_manager, Flow, task
import prefect
@resource_manager
class DaskCluster:
def __init__(self, n_workers):
self.n_workers = n_workers
def setup(self):
"Create a local dask cluster"
return
def cleanup(self, client):
"Cleanup the local dask cluster"
import time
time.sleep(10)
<http://prefect.context.logger.info|prefect.context.logger.info>("cleanup")
@task
def abc():
<http://prefect.context.logger.info|prefect.context.logger.info>("abc")
return
@task
def bcd():
<http://prefect.context.logger.info|prefect.context.logger.info>("bcd")
return
with Flow("...") as flow:
cluster = DaskCluster(2)
with cluster:
x = abc()
bcd(upstream_tasks=[cluster.cleanup_task])
flow.run()