<#CL09KU1K7|> I am trying to run prefect flow on ...
# ask-community
a
#CL09KU1K7 I am trying to run prefect flow on Kubernetes. I have bigger dataset and want to train multiple estimators in parallel. See the below code for more in details.
Copy code
from sklearn.datasets import make_regression
from sklearn.model_selection import ShuffleSplit
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBRegressor

from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client
from prefect_dask import DaskTaskRunner
from prefect import flow, task

from dask import config
dask_config = {
    "distributed.worker.daemon": False,
    "distributed.comm.timeouts.connect": 120,
    "distributed.comm.timeouts.tcp": 120,
}
config.set(dask_config)

n_workers = 4
threads_per_worker = 100
memory_per_worker = 600
docker_tag = "docker_tag"

            
@task(name="Generate big data")
def generate_big_data():
    return make_regression(n_samples=50_000, n_features=9_000, n_informative=10, random_state=42)


@task(name="Generate outer folds")
def generate_outer_folds():
    return ShuffleSplit(n_splits=5, test_size=0.2, random_state=42)


@task(name="Get model per fold")
def get_model_per_fold(dataset, train_index, estimator):

    X = dataset[0][train_index]
    y = dataset[1][train_index]

    cv_estimator = RandomizedSearchCV(
        estimator=estimator,
        n_iter=2,
        param_distributions={'n_estimators': [100, 1000], 'max_depth': [2, 10]},
        scoring="neg_mean_squared_error",
        cv=5,
        refit=True,
        n_jobs=5,
        verbose=50,
    )

    cv_estimator.fit(X, y)

    return cv_estimator


@flow(name="Main flow")
def main_flow():

    # There are other depended tasks inside these tasks, but I am not showing them here.
    big_data = generate_big_data.submit()
    outer_folds = generate_outer_folds.submit()

    # I have to call result() here to get the actual value of the future
    big_data = big_data.result()
    outer_folds = outer_folds.result()

    for estimator in [RandomForestRegressor(n_jobs=-1), XGBRegressor(n_jobs=-1)]:

        for (train_index, test_index) in outer_folds.split(big_data[0]):

            # This actually submit all the tasks to a single worker, which is not what I want.
            # I want to submit these tasks to different workers. How?
            get_model_per_fold.submit(big_data, train_index, estimator)

if __name__ == "__main__":

    spec = get_cluster_spec(
        docker_tag,
        n_workers,
        memory_per_worker,
        threads_per_worker,
    )

    cluster = KubeCluster(
        # `spec` includes the docker image and credentials to connect to the
        # kubernetes cluster.
        custom_cluster_spec=spec,
        namespace="dask-operator",
        scheduler_forward_port=65000,
        # There is a bug in the current version of dask_kubernetes that
        # requires the user to specify the resource limits, even if the
        # resources are specified in the `custom_cluster_spec`.
        resources=spec["spec"]["worker"]["spec"]["containers"][0]["resources"],
        resource_timeout=120,
    )

    client = Client(cluster)
    client.wait_for_workers(n_workers)

    print("Dask dashboard:", client.dashboard_link)

    runner = DaskTaskRunner(
        address=client.scheduler.address,
    )
    
    main_flow.with_options(task_runner=runner)()

    cluster.close()
    client.close()
Now, my problem is, as soon as it submits
get_model_per_fold
task on Kubernetes, it fails showing the bellow errors,
Copy code
14:38:36.681 | INFO    | Task run 'Get model per fold-0' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-0-0f337d648b904d0f8d8c95bcf80f10df-1
14:38:36.726 | INFO    | Task run 'Get model per fold-1' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-1-cd2cc36bd45246fb9c9a0f82dcb5778f-1
14:38:36.765 | INFO    | Task run 'Get model per fold-2' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-2-e89cd46267534dc4b7dcb135394fdf7b-1
14:38:36.799 | INFO    | Task run 'Get model per fold-3' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-3-122304a4fff54c6dadd915bec66160e3-1
14:38:36.837 | INFO    | Task run 'Get model per fold-4' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-4-6865629824604e7eb3f0ff7860613d7f-1
14:38:36.878 | INFO    | Task run 'Get model per fold-5' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-5-aa9cf7aa188e4ea9a05a63ef36a0796b-1
14:38:36.914 | INFO    | Task run 'Get model per fold-6' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-6-99d78810d0604020bd06c84392a80f71-1
14:38:36.953 | INFO    | Task run 'Get model per fold-7' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-7-6bc631fbc9074bc98a8b6e74db3f0bea-1
14:38:36.991 | INFO    | Task run 'Get model per fold-8' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-8-16a023e17fc24bab9d484f75fc4eaabc-1
14:38:37.027 | INFO    | Task run 'Get model per fold-9' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: Get model per fold-9-3d49df0789454912909e43ded665ea04-1
14:38:37.194 | ERROR   | Flow run 'psychedelic-hoatzin' - Finished in state Failed('10/12 states failed.')
Copy code
CancelledError: Get model per fold-0-fbb826dfd8d548bf8de2bae1351b401e-1

aiohttp.client_exceptions.ClientOSError: [Errno 104] Connection reset by peer
The above error I got when I set lower number of
n_estimators = [10, 100]
, but when I increase it
n_estimators = [100, 1000]
, it shows the below error,
Copy code
ValueError: 3600000000 exceeds max_bin_len(2147483647)
• How can I solve this issue when I have multiple estimators and I want them to train in parallel? I have 256 threads per worker and 1TB memory. • Also, how can I submit, let say 5 prefect tasks to worker-1 and the other 5 tasks to worker-2?