https://prefect.io logo
Title
a

Alexandru Anghel

12/21/2022, 1:58 PM
Hello guys, I get the following error when trying to run a flow using the DaskTaskRunner using KubeCluster:
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'KubeCluster' object has no attribute '__qualname__'
For more context, i'll add some code in the thread. Thanks!
1
My flow definition looks like this:
@flow(
    name=f'{os.environ["FLOW_NAME"].lower()}', 
    task_runner=DaskTaskRunner(
        cluster_class=KubeCluster(
            scheduler_pod_template=make_pod_spec(
                image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
                extra_pod_config=get_extra_specs_dask_pod(),
                extra_container_config=get_extra_specs_dask_container()
                ),
            pod_template=make_pod_spec(
                image='myrepo/prefect-flows/prefect-orion/dask/orion-dask-base:v0.1',
                extra_pod_config=get_extra_specs_dask_pod(),
                extra_container_config=get_extra_specs_dask_container()
                ), 
            deploy_mode="remote",
            scheduler_service_wait_timeout=300,
            name=f'dask-{os.environ["FLOW_NAME"].lower()}'
            ),
        adapt_kwargs={"minimum": int(os.environ["MIN_WORKERS"]), "maximum": int(os.environ["MAX_WORKERS"])}
    ),
    retries=2, 
    retry_delay_seconds=5
)
I'm running on latest version, 2.7.3
r

Ryan Peden

12/21/2022, 2:03 PM
I believe
cluster_class
needs to be just the type, not an instantiated instance of the class. You can give the task runner all the args it will need to instantiate a cluster by putting them in a dict and passing the dict to the
cluster_kwargs
parameter.
t

Tim-Oliver

12/21/2022, 2:10 PM
Indeed, you should provide just the type e.g.
DaskTaskRunner(
        cluster_class="dask_jobqueue.SLURMCluster",
        cluster_kwargs={
            "account": "dlthings",
            "queue": "cpu_long",
            "cores": 32,
            "processes": 1,
            "memory": "32 GB",
        },
        adapt_kwargs={
            "minimum": 1,
            "maximum": 4,
        },
    )
👍 1
r

Ryan Peden

12/21/2022, 2:11 PM
One thing to consider is whether you need to spin up a separate Dask cluster for every flow run. That's what your code is doing now, and it will add a fair amount of overhead to every flow run. It might be there your use case requires a single cluster per flow. But you could alternatively spin up a Dask cluster in K8s once and then have all your flows use it. Keeping a Dask cluster around might be too costly if your flows run infrequently, but it could work to your advantage if you expect you'll always have flows running that need to use Dask.
a

Alexandru Anghel

12/21/2022, 2:21 PM
Thanks @Ryan Peden and @Tim-Oliver. That fixed the crash. Now i have another error in my Dask task:
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "batch.py", line 586, in load_to_gcs
    with get_dask_client(timeout='30s') as client:
  File "/usr/local/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 100, in get_dask_client
    client_kwargs = _generate_client_kwargs(
  File "/usr/local/lib/python3.9/site-packages/prefect_dask/utils.py", line 28, in _generate_client_kwargs
    address = get_client().scheduler.address
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2768, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
My Dask task looks like this:
@task(name='Load to GCS', retries=2, retry_delay_seconds=10)
def load_to_gcs(flow_name: str, urls: list, distinct_dates: list, schema: list, columns_to_encrypt: list):
    logger = get_run_logger()
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]

    gcs_logs = []


    with get_dask_client(timeout='30s') as client:
        for date in distinct_dates:
            print(date)
            dfs = []
            for metric in urls[date].keys():

                <http://logger.info|logger.info>(f"For date {date} and metric {metric}:")
                <http://logger.info|logger.info>(urls[date][metric])

                dfs = dfs + [dask.delayed(parser)(url, 'range', schema) for url in urls[date][metric]]

            # printdfs)
            df = dd.from_delayed(dfs, verify_meta=False) #meta=metadf

            df['insert_date'] = now
            df['insert_date'] = df['insert_date'].astype('datetime64[ns]')
            
            # pseudonymize
            if columns_to_encrypt:
                df = df.map_partitions(pseudonymise_job, columns_to_encrypt, meta=df)
            
            gcs_path = f'gs://{os.environ["GOOGLE_CLOUD_BUCKET_DATA_LAKE"]}/prometheus/{flow_name}/{date[0:4]}/{date[5:7]}/{date[8:10]}/{flow_name}.parquet'
            
            df = client.persist(df)
            
            df.to_parquet(
                    path = gcs_path,
                    engine='pyarrow',
                    compression='lz4',
                    schema='infer',
                    write_index=False, 
                    overwrite=True, 
                    append=False,
                    storage_options={

                        "session_kwargs": {"trust_env": True}
                    }
                    )
            lst = [flow_name, date, len(df.index), True, gcs_path, False, None]
            gcs_logs.append(lst)
        
    <http://logger.info|logger.info>(gcs_logs)  
    return gcs_logs
My guess is the problem is with this line:
with get_dask_client(timeout='30s') as client:
Like you mentioned @Ryan Peden i also have a K8s Dask cluster that i use and the task runs fine with it:
with Client(address="<tcp://dask-scheduler:8786>") as client:
My understanding is that we need to use get_dask_client (for scenarios where we spin up a cluster for each flow) in order to use all Dask workers, otherwise the flow is running in only one worker.
r

Ryan Peden

12/21/2022, 3:05 PM
Here's a workaround that might help. When creating your `DaskTaskRunner`:
task_runner=DaskTaskRunner(
    ...other args
    client_kwargs={"set_as_default": True}
)
a

Alexandru Anghel

12/21/2022, 3:20 PM
Yes, that was the missing piece of the puzzle 👏 Thank you very much, @Ryan Peden!
r

Ryan Peden

12/21/2022, 3:22 PM
You're welcome! 😄 I'm happy it helped.