Ajay
01/29/2024, 3:19 PMfrom sklearn.datasets import make_regression
from sklearn.model_selection import ShuffleSplit
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBRegressor
from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client
from prefect_dask import DaskTaskRunner
from prefect import flow, task
from dask import config
dask_config = {
"distributed.worker.daemon": False,
"distributed.comm.timeouts.connect": 120,
"distributed.comm.timeouts.tcp": 120,
}
config.set(dask_config)
n_workers = 4
threads_per_worker = 100
memory_per_worker = 600
docker_tag = "docker_tag"
@task(name="Generate big data")
def generate_big_data():
return make_regression(n_samples=50_000, n_features=9_000, n_informative=10, random_state=42)
@task(name="Generate outer folds")
def generate_outer_folds():
return ShuffleSplit(n_splits=5, test_size=0.2, random_state=42)
@task(name="Get model per fold")
def get_model_per_fold(dataset, train_index, estimator):
X = dataset[0][train_index]
y = dataset[1][train_index]
cv_estimator = RandomizedSearchCV(
estimator=estimator,
n_iter=2,
param_distributions={'n_estimators': [100, 1000], 'max_depth': [2, 10]},
scoring="neg_mean_squared_error",
cv=5,
refit=True,
n_jobs=5,
verbose=50,
)
cv_estimator.fit(X, y)
return cv_estimator
@flow(name="Main flow")
def main_flow():
# There are other depended tasks inside these tasks, but I am not showing them here.
big_data = generate_big_data.submit()
outer_folds = generate_outer_folds.submit()
# I have to call result() here to get the actual value of the future
big_data = big_data.result()
outer_folds = outer_folds.result()
for estimator in [RandomForestRegressor(n_jobs=-1), XGBRegressor(n_jobs=-1)]:
for (train_index, test_index) in outer_folds.split(big_data[0]):
# This actually submit all the tasks to a single worker, which is not what I want.
# I want to submit these tasks to different workers. How?
get_model_per_fold.submit(big_data, train_index, estimator)
if __name__ == "__main__":
spec = get_cluster_spec(
docker_tag,
n_workers,
memory_per_worker,
threads_per_worker,
)
cluster = KubeCluster(
# `spec` includes the docker image and credentials to connect to the
# kubernetes cluster.
custom_cluster_spec=spec,
namespace="dask-operator",
scheduler_forward_port=65000,
# There is a bug in the current version of dask_kubernetes that
# requires the user to specify the resource limits, even if the
# resources are specified in the `custom_cluster_spec`.
resources=spec["spec"]["worker"]["spec"]["containers"][0]["resources"],
resource_timeout=120,
)
client = Client(cluster)
client.wait_for_workers(n_workers)
print("Dask dashboard:", client.dashboard_link)
runner = DaskTaskRunner(
address=client.scheduler.address,
)
main_flow.with_options(task_runner=runner)()
cluster.close()
client.close()
Now, my problem is, as soon as it submits get_model_per_fold
task on Kubernetes, it fails showing the bellow errors,
14:38:36.681 | INFO | Task run 'Get model per fold-0' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-0-0f337d648b904d0f8d8c95bcf80f10df-1
14:38:36.726 | INFO | Task run 'Get model per fold-1' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-1-cd2cc36bd45246fb9c9a0f82dcb5778f-1
14:38:36.765 | INFO | Task run 'Get model per fold-2' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-2-e89cd46267534dc4b7dcb135394fdf7b-1
14:38:36.799 | INFO | Task run 'Get model per fold-3' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-3-122304a4fff54c6dadd915bec66160e3-1
14:38:36.837 | INFO | Task run 'Get model per fold-4' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-4-6865629824604e7eb3f0ff7860613d7f-1
14:38:36.878 | INFO | Task run 'Get model per fold-5' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-5-aa9cf7aa188e4ea9a05a63ef36a0796b-1
14:38:36.914 | INFO | Task run 'Get model per fold-6' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-6-99d78810d0604020bd06c84392a80f71-1
14:38:36.953 | INFO | Task run 'Get model per fold-7' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-7-6bc631fbc9074bc98a8b6e74db3f0bea-1
14:38:36.991 | INFO | Task run 'Get model per fold-8' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-8-16a023e17fc24bab9d484f75fc4eaabc-1
14:38:37.027 | INFO | Task run 'Get model per fold-9' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-9-3d49df0789454912909e43ded665ea04-1
14:38:37.194 | ERROR | Flow run 'psychedelic-hoatzin' - Finished in state Failed('10/12 states failed.')
CancelledError: Get model per fold-0-fbb826dfd8d548bf8de2bae1351b401e-1
aiohttp.client_exceptions.ClientOSError: [Errno 104] Connection reset by peer
The above error I got when I set lower number of n_estimators = [10, 100]
, but when I increase it n_estimators = [100, 1000]
, it shows the below error,
ValueError: 3600000000 exceeds max_bin_len(2147483647)
• How can I solve this issue when I have multiple estimators and I want them to train in parallel? I have 256 threads per worker and 1TB memory.
• Also, how can I submit, let say 5 prefect tasks to worker-1 and the other 5 tasks to worker-2?