Alexandru Anghel
12/06/2022, 4:08 PMdistributed.scheduler.KilledWorker: Attempted to run task prometheus_to_gcs-02d33e7b-92a6-4fe8-8258-ba9efaa3d609 on 3 different workers, but all those workers died while running it.
What could be the problem here? Other smaller datasets are running fine.
I am running it with adaptive workers (min 4, max 10). I am not sure though if I used correctly to dask configuration for what i am trying to do. Can you please have a look at my code in the thread?
I am able to run the same flow in Prefect 1.4.
Thanks!Alexandru Anghel
12/06/2022, 4:11 PM@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list):
logger = get_run_logger()
with get_dask_client(timeout="30s") as client:
for date in distinct_dates:
urls_filtered = [url for url in urls if f"start={date}" in url]
<http://logger.info|logger.info>(urls_filtered)
<http://logger.info|logger.info>(f"Loading data to GCS for {date}")
dfs = [dask.delayed(prometheus_to_gcs)(url, f'<gs://data-lake/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}{ind}.parquet>') for ind, url in enumerate(urls_filtered)]
<http://logger.info|logger.info>(f"delayed created")
results_dask = client.compute(dfs, sync=True)
<http://logger.info|logger.info>(f"upload completed")
wait(results_dask)
<http://logger.info|logger.info>(results_dask)
Alexandru Anghel
12/06/2022, 4:12 PM@flow(name=f'{os.environ["FLOW_NAME"].lower()}', task_runner=DaskTaskRunner(
cluster_class=KubeCluster,
cluster_kwargs={"pod_template": dask_pod_spec(os.environ["PREFECT_ORION_DASK_BASE"])},
adapt_kwargs={"minimum": int(os.environ["MIN_WORKERS"]), "maximum": int(os.environ["MAX_WORKERS"])}
)
...
gcs = load_to_gcs.submit(flow_name, urls, dates_to_ingest_gcs, wait_for=[dates_to_ingest_gcs, dates_to_ingest_bq, gcs_uris])
Alexandru Anghel
12/06/2022, 4:16 PMBianca Hoch
12/06/2022, 10:59 PMBianca Hoch
12/06/2022, 11:01 PMBianca Hoch
12/06/2022, 11:02 PMAlexandru Anghel
12/07/2022, 7:38 AMAlexandru Anghel
12/07/2022, 3:49 PM@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list):
logger = get_run_logger()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# with get_dask_client(timeout="30s") as client:
with Client(address="<tcp://dask-scheduler:8786>") as client:
for date in distinct_dates:
# date = '2022-12-01'
urls_filtered = [url for url in urls if f"start={date}" in url]
<http://logger.info|logger.info>(urls_filtered)
<http://logger.info|logger.info>(f"Loading data to GCS for {date}")
dfs = [dask.delayed(prometheus_to_gcs)(url, f'<gs://data-lake/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}{ind}.parquet>') for ind, url in enumerate(urls_filtered)]
<http://logger.info|logger.info>(f"delayed created")
results_dask = client.compute(dfs)
<http://logger.info|logger.info>(f"upload completed")
wait(results_dask)
<http://logger.info|logger.info>(results_dask)
Alexandru Anghel
12/07/2022, 3:52 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by