Rhys Mansal
resource_manager
cleanup
Kevin Kho
emre
with EMRCluster(...) as cluster: pass donwstream_task(upstream_tasks=[cluster.cleanup_task])
with EMRCluster(...) as cluster: pass donwstream_task(upstream_tasks=[cluster.cleanup])
EMRCluster
with
cluster_def = EMRCluster(...) with cluster_def as cluster: pass donwstream_task(upstream_tasks=[cluster_def.cleanup_task])
from prefect import resource_manager, Flow, task import prefect @resource_manager class DaskCluster: def __init__(self, n_workers): self.n_workers = n_workers def setup(self): "Create a local dask cluster" return def cleanup(self, client): "Cleanup the local dask cluster" import time time.sleep(10) <http://prefect.context.logger.info|prefect.context.logger.info>("cleanup") @task def abc(): <http://prefect.context.logger.info|prefect.context.logger.info>("abc") return @task def bcd(): <http://prefect.context.logger.info|prefect.context.logger.info>("bcd") return with Flow("...") as flow: cluster = DaskCluster(2) with cluster: x = abc() bcd(upstream_tasks=[cluster.cleanup_task]) flow.run()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.