Hello guys, I get the following error when trying ...
# ask-community
a
Hello guys, I get the following error when trying to run a flow using the DaskTaskRunner using KubeCluster:
Copy code
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'KubeCluster' object has no attribute '__qualname__'
For more context, i'll add some code in the thread. Thanks!
1
My flow definition looks like this:
Copy code
@flow(
    name=f'{os.environ["FLOW_NAME"].lower()}', 
    task_runner=DaskTaskRunner(
        cluster_class=KubeCluster(
            scheduler_pod_template=make_pod_spec(
                image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
                extra_pod_config=get_extra_specs_dask_pod(),
                extra_container_config=get_extra_specs_dask_container()
                ),
            pod_template=make_pod_spec(
                image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
                extra_pod_config=get_extra_specs_dask_pod(),
                extra_container_config=get_extra_specs_dask_container()
                ), 
            deploy_mode="remote",
            scheduler_service_wait_timeout=300,
            name=f'dask-{os.environ["FLOW_NAME"].lower()}'
            ),
        adapt_kwargs={"minimum": int(os.environ["MIN_WORKERS"]), "maximum": int(os.environ["MAX_WORKERS"])}
    ),
    retries=2, 
    retry_delay_seconds=5
)
I'm running on latest version, 2.7.3
r
I believe
cluster_class
needs to be just the type, not an instantiated instance of the class. You can give the task runner all the args it will need to instantiate a cluster by putting them in a dict and passing the dict to the
cluster_kwargs
parameter.
t
Indeed, you should provide just the type e.g.
Copy code
DaskTaskRunner(
        cluster_class="dask_jobqueue.SLURMCluster",
        cluster_kwargs={
            "account": "dlthings",
            "queue": "cpu_long",
            "cores": 32,
            "processes": 1,
            "memory": "32 GB",
        },
        adapt_kwargs={
            "minimum": 1,
            "maximum": 4,
        },
    )
👍 1
r
One thing to consider is whether you need to spin up a separate Dask cluster for every flow run. That's what your code is doing now, and it will add a fair amount of overhead to every flow run. It might be there your use case requires a single cluster per flow. But you could alternatively spin up a Dask cluster in K8s once and then have all your flows use it. Keeping a Dask cluster around might be too costly if your flows run infrequently, but it could work to your advantage if you expect you'll always have flows running that need to use Dask.
a
Thanks @Ryan Peden and @Tim-Oliver. That fixed the crash. Now i have another error in my Dask task:
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "batch.py", line 586, in load_to_gcs
    with get_dask_client(timeout='30s') as client:
  File "/usr/local/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 100, in get_dask_client
    client_kwargs = _generate_client_kwargs(
  File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 28, in _generate_client_kwargs
    address = get_client().scheduler.address
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2768, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
My Dask task looks like this:
Copy code
@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list, schema: list, columns_to_encrypt: list):
    logger = get_run_logger()
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]

    gcs_logs = []


    with get_dask_client(timeout='30s') as client:
        for date in distinct_dates:
            print(date)
            dfs = []
            for metric in urls[date].keys():

                <http://logger.info|logger.info>(f"For date {date} and metric {metric}:")
                <http://logger.info|logger.info>(urls[date][metric])

                dfs = dfs + [dask.delayed(parser)(url, 'range', schema) for url in urls[date][metric]]

            # printdfs)
            df = dd.from_delayed(dfs, verify_meta=False) #meta=metadf

            df['insert_date'] = now
            df['insert_date'] = df['insert_date'].astype('datetime64[ns]')
            
            # pseudonymize
            if columns_to_encrypt:
                df = df.map_partitions(pseudonymise_job, columns_to_encrypt, meta=df)
            
            gcs_path = f'gs://{os.environ["GOOGLE_CLOUD_BUCKET_DATA_LAKE"]}/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}.parquet'
            
            df = client.persist(df)
            
            df.to_parquet(
                    path = gcs_path,
                    engine='pyarrow',
                    compression='lz4',
                    schema='infer',
                    write_index=False, 
                    overwrite=True, 
                    append=False,
                    storage_options={

                        "session_kwargs": {"trust_env": True}
                    }
                    )
            lst = [flow_name, date, len(df.index), True, gcs_path, False, None]
            gcs_logs.append(lst)
        
    <http://logger.info|logger.info>(gcs_logs)  
    return gcs_logs
My guess is the problem is with this line:
Copy code
with get_dask_client(timeout='30s') as client:
Like you mentioned @Ryan Peden i also have a K8s Dask cluster that i use and the task runs fine with it:
Copy code
with Client(address="<tcp://dask-scheduler:8786>") as client:
My understanding is that we need to use get_dask_client (for scenarios where we spin up a cluster for each flow) in order to use all Dask workers, otherwise the flow is running in only one worker.
r
Here's a workaround that might help. When creating your `DaskTaskRunner`:
Copy code
task_runner=DaskTaskRunner(
    ...other args
    client_kwargs={"set_as_default": True}
)
a
Yes, that was the missing piece of the puzzle 👏 Thank you very much, @Ryan Peden!
r
You're welcome! 😄 I'm happy it helped.