Alexandru Anghel
12/21/2022, 1:58 PMCrash detected! Execution was interrupted by an unexpected exception: AttributeError: 'KubeCluster' object has no attribute '__qualname__'
For more context, i'll add some code in the thread.
Thanks!@flow(
name=f'{os.environ["FLOW_NAME"].lower()}',
task_runner=DaskTaskRunner(
cluster_class=KubeCluster(
scheduler_pod_template=make_pod_spec(
image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
extra_pod_config=get_extra_specs_dask_pod(),
extra_container_config=get_extra_specs_dask_container()
),
pod_template=make_pod_spec(
image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
extra_pod_config=get_extra_specs_dask_pod(),
extra_container_config=get_extra_specs_dask_container()
),
deploy_mode="remote",
scheduler_service_wait_timeout=300,
name=f'dask-{os.environ["FLOW_NAME"].lower()}'
),
adapt_kwargs={"minimum": int(os.environ["MIN_WORKERS"]), "maximum": int(os.environ["MAX_WORKERS"])}
),
retries=2,
retry_delay_seconds=5
)
Ryan Peden
12/21/2022, 2:03 PMcluster_class
needs to be just the type, not an instantiated instance of the class. You can give the task runner all the args it will need to instantiate a cluster by putting them in a dict and passing the dict to the cluster_kwargs
parameter.Tim-Oliver
12/21/2022, 2:10 PMDaskTaskRunner(
cluster_class="dask_jobqueue.SLURMCluster",
cluster_kwargs={
"account": "dlthings",
"queue": "cpu_long",
"cores": 32,
"processes": 1,
"memory": "32 GB",
},
adapt_kwargs={
"minimum": 1,
"maximum": 4,
},
)
Ryan Peden
12/21/2022, 2:11 PMAlexandru Anghel
12/21/2022, 2:21 PMEncountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "batch.py", line 586, in load_to_gcs
with get_dask_client(timeout='30s') as client:
File "/usr/local/lib/python3.9/contextlib.py", line 119, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 100, in get_dask_client
client_kwargs = _generate_client_kwargs(
File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 28, in _generate_client_kwargs
address = get_client().scheduler.address
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2768, in get_client
raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list, schema: list, columns_to_encrypt: list):
logger = get_run_logger()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
gcs_logs = []
with get_dask_client(timeout='30s') as client:
for date in distinct_dates:
print(date)
dfs = []
for metric in urls[date].keys():
<http://logger.info|logger.info>(f"For date {date} and metric {metric}:")
<http://logger.info|logger.info>(urls[date][metric])
dfs = dfs + [dask.delayed(parser)(url, 'range', schema) for url in urls[date][metric]]
# printdfs)
df = dd.from_delayed(dfs, verify_meta=False) #meta=metadf
df['insert_date'] = now
df['insert_date'] = df['insert_date'].astype('datetime64[ns]')
# pseudonymize
if columns_to_encrypt:
df = df.map_partitions(pseudonymise_job, columns_to_encrypt, meta=df)
gcs_path = f'gs://{os.environ["GOOGLE_CLOUD_BUCKET_DATA_LAKE"]}/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}.parquet'
df = client.persist(df)
df.to_parquet(
path = gcs_path,
engine='pyarrow',
compression='lz4',
schema='infer',
write_index=False,
overwrite=True,
append=False,
storage_options={
"session_kwargs": {"trust_env": True}
}
)
lst = [flow_name, date, len(df.index), True, gcs_path, False, None]
gcs_logs.append(lst)
<http://logger.info|logger.info>(gcs_logs)
return gcs_logs
with get_dask_client(timeout='30s') as client:
Like you mentioned @Ryan Peden i also have a K8s Dask cluster that i use and the task runs fine with it:
with Client(address="<tcp://dask-scheduler:8786>") as client:
Ryan Peden
12/21/2022, 3:05 PMtask_runner=DaskTaskRunner(
...other args
client_kwargs={"set_as_default": True}
)
Alexandru Anghel
12/21/2022, 3:20 PMRyan Peden
12/21/2022, 3:22 PM