Hi guys! I'm running a flow using the Orion dask e...
# prefect-community
a
Hi guys! I'm running a flow using the Orion dask executor on Kubernetes but i get an error on very large datasets:
Copy code
distributed.scheduler.KilledWorker: Attempted to run task prometheus_to_gcs-02d33e7b-92a6-4fe8-8258-ba9efaa3d609 on 3 different workers, but all those workers died while running it.
What could be the problem here? Other smaller datasets are running fine. I am running it with adaptive workers (min 4, max 10). I am not sure though if I used correctly to dask configuration for what i am trying to do. Can you please have a look at my code in the thread? I am able to run the same flow in Prefect 1.4. Thanks!
Copy code
@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list):
    logger = get_run_logger()

    with get_dask_client(timeout="30s") as client:
        for date in distinct_dates:
            urls_filtered = [url for url in urls if f"start={date}" in url]
            <http://logger.info|logger.info>(urls_filtered)
            <http://logger.info|logger.info>(f"Loading data to GCS for {date}")
            dfs = [dask.delayed(prometheus_to_gcs)(url, f'<gs://data-lake/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}{ind}.parquet>') for ind, url in enumerate(urls_filtered)]

            <http://logger.info|logger.info>(f"delayed created")
            results_dask = client.compute(dfs, sync=True)
      
            <http://logger.info|logger.info>(f"upload completed")
            
            wait(results_dask)

            <http://logger.info|logger.info>(results_dask)
Copy code
@flow(name=f'{os.environ["FLOW_NAME"].lower()}', task_runner=DaskTaskRunner(
            cluster_class=KubeCluster,
            cluster_kwargs={"pod_template": dask_pod_spec(os.environ["PREFECT_ORION_DASK_BASE"])},
            adapt_kwargs={"minimum": int(os.environ["MIN_WORKERS"]), "maximum": int(os.environ["MAX_WORKERS"])}
    )
...
gcs = load_to_gcs.submit(flow_name, urls, dates_to_ingest_gcs, wait_for=[dates_to_ingest_gcs, dates_to_ingest_bq, gcs_uris])
To give a bit of context, i'm creating a delayed list of prometheus_to_gcs functions (this contains rest calls to the prometheus api, some parsing, conversion to dask dataframe and finally sending the dataframe to GCS as parquet). I am not sure if it runs on all workers. I see logs in only one of the workers.
b
Hello Alexandru. At first glance it appears that there maybe resource contention here (given that smaller datasets are processed successfully). Did a bit of web surfing and found this stackoverflow article.
Tinkering with the adaptive scaling could be a good first step. Maybe increasing the maximum number of workers will help?
Just for clarification, you don't encounter this issue in prefect 1.4? even when processing larger datasets?
a
Hey @Bianca Hoch, the issue is not present in Prefect 1.4 indeed. Thanks for the tips, I will try more scenarios today.
@Bianca Hoch I managed to run the heavy workload, but by using a standalone Dask cluster deployed in K8s and using the SequentialTaskRunner. Basically, i am only sending the heavy stuff to Dask while the rest runs in the pod. Not sure if it's the best approach, but it's the only one that worked for me so far. I couldn't find a solution to make the DaskRunner + KubeCluster treat the large datasets. My task looks like this for the moment:
Copy code
@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list):
    logger = get_run_logger()
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    # with get_dask_client(timeout="30s") as client:
    with Client(address="<tcp://dask-scheduler:8786>") as client:
        for date in distinct_dates:
        
            # date = '2022-12-01'
            urls_filtered = [url for url in urls if f"start={date}" in url]
            <http://logger.info|logger.info>(urls_filtered)

            <http://logger.info|logger.info>(f"Loading data to GCS for {date}")

            dfs = [dask.delayed(prometheus_to_gcs)(url, f'<gs://data-lake/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}{ind}.parquet>') for ind, url in enumerate(urls_filtered)]

            <http://logger.info|logger.info>(f"delayed created")

            results_dask = client.compute(dfs)
            
            <http://logger.info|logger.info>(f"upload completed")
            
            wait(results_dask)

            <http://logger.info|logger.info>(results_dask)
One strange behavior is that my flow run is shown as Crashed, while running and even after all tasks are completed successfully. Any ideas what might happen here?