Vadym Dytyniak
05/26/2022, 8:47 AMKevin Kho
Vadym Dytyniak
05/26/2022, 2:13 PMKevin Kho
Vadym Dytyniak
05/26/2022, 2:24 PMKevin Kho
Vadym Dytyniak
05/26/2022, 2:28 PMclass CreateClusterTask(Task):
# pylint: disable=arguments-differ
def run(
self,
context: Flow,
name: str,
dependencies: list[str] | None = None,
**cluster_kwargs: Any,
) -> str:
manager = ClusterManager(dependencies)
cluster = manager(name, **cluster_kwargs)
if context.terminal_state_handler is not None:
current_terminal_state_handler = context.terminal_state_handler
def terminal_state_handler(*args: Any) -> None:
current_terminal_state_handler(*args)
manager.teardown(*args)
context.terminal_state_handler = terminal_state_handler
else:
context.terminal_state_handler = manager.teardown
<http://self.logger.info|self.logger.info>('Dashboard link: %s', cluster.dashboard_link)
return cluster.scheduler_address # type: ignore
Kevin Kho
Vadym Dytyniak
05/26/2022, 3:08 PMKevin Kho
Vadym Dytyniak
05/26/2022, 3:13 PMKevin Kho
Vadym Dytyniak
05/26/2022, 3:18 PMKevin Kho
Vadym Dytyniak
05/26/2022, 4:13 PMKevin Kho
with DaskCluster(n_workers=n_workers) as client:
# These tasks rely on a dask cluster to run, so we create them inside
# the `DaskCluster` resource manager
df = load_data()
summary = summarize(df, client)
pass it down and those guys inside can use the client. I believe the load_data should also use the cluster