Alexandru Anghel
12/21/2022, 1:58 PMCrash detected! Execution was interrupted by an unexpected exception: AttributeError: 'KubeCluster' object has no attribute '__qualname__'Alexandru Anghel
12/21/2022, 1:59 PM@flow(
    name=f'{os.environ["FLOW_NAME"].lower()}', 
    task_runner=DaskTaskRunner(
        cluster_class=KubeCluster(
            scheduler_pod_template=make_pod_spec(
                image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
                extra_pod_config=get_extra_specs_dask_pod(),
                extra_container_config=get_extra_specs_dask_container()
                ),
            pod_template=make_pod_spec(
                image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
                extra_pod_config=get_extra_specs_dask_pod(),
                extra_container_config=get_extra_specs_dask_container()
                ), 
            deploy_mode="remote",
            scheduler_service_wait_timeout=300,
            name=f'dask-{os.environ["FLOW_NAME"].lower()}'
            ),
        adapt_kwargs={"minimum": int(os.environ["MIN_WORKERS"]), "maximum": int(os.environ["MAX_WORKERS"])}
    ),
    retries=2, 
    retry_delay_seconds=5
)Alexandru Anghel
12/21/2022, 2:02 PMRyan Peden
12/21/2022, 2:03 PMcluster_classcluster_kwargsTim-Oliver
12/21/2022, 2:10 PMDaskTaskRunner(
        cluster_class="dask_jobqueue.SLURMCluster",
        cluster_kwargs={
            "account": "dlthings",
            "queue": "cpu_long",
            "cores": 32,
            "processes": 1,
            "memory": "32 GB",
        },
        adapt_kwargs={
            "minimum": 1,
            "maximum": 4,
        },
    )Ryan Peden
12/21/2022, 2:11 PMAlexandru Anghel
12/21/2022, 2:21 PMEncountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "batch.py", line 586, in load_to_gcs
    with get_dask_client(timeout='30s') as client:
  File "/usr/local/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 100, in get_dask_client
    client_kwargs = _generate_client_kwargs(
  File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 28, in _generate_client_kwargs
    address = get_client().scheduler.address
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2768, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address providedAlexandru Anghel
12/21/2022, 2:22 PM@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list, schema: list, columns_to_encrypt: list):
    logger = get_run_logger()
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
    gcs_logs = []
    with get_dask_client(timeout='30s') as client:
        for date in distinct_dates:
            print(date)
            dfs = []
            for metric in urls[date].keys():
                <http://logger.info|logger.info>(f"For date {date} and metric {metric}:")
                <http://logger.info|logger.info>(urls[date][metric])
                dfs = dfs + [dask.delayed(parser)(url, 'range', schema) for url in urls[date][metric]]
            # printdfs)
            df = dd.from_delayed(dfs, verify_meta=False) #meta=metadf
            df['insert_date'] = now
            df['insert_date'] = df['insert_date'].astype('datetime64[ns]')
            
            # pseudonymize
            if columns_to_encrypt:
                df = df.map_partitions(pseudonymise_job, columns_to_encrypt, meta=df)
            
            gcs_path = f'gs://{os.environ["GOOGLE_CLOUD_BUCKET_DATA_LAKE"]}/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}.parquet'
            
            df = client.persist(df)
            
            df.to_parquet(
                    path = gcs_path,
                    engine='pyarrow',
                    compression='lz4',
                    schema='infer',
                    write_index=False, 
                    overwrite=True, 
                    append=False,
                    storage_options={
                        "session_kwargs": {"trust_env": True}
                    }
                    )
            lst = [flow_name, date, len(df.index), True, gcs_path, False, None]
            gcs_logs.append(lst)
        
    <http://logger.info|logger.info>(gcs_logs)  
    return gcs_logsAlexandru Anghel
12/21/2022, 2:25 PMwith get_dask_client(timeout='30s') as client:with Client(address="<tcp://dask-scheduler:8786>") as client:Alexandru Anghel
12/21/2022, 2:27 PMRyan Peden
12/21/2022, 3:05 PMtask_runner=DaskTaskRunner(
    ...other args
    client_kwargs={"set_as_default": True}
)Alexandru Anghel
12/21/2022, 3:20 PMRyan Peden
12/21/2022, 3:22 PM