https://prefect.io logo
t

Tsang Yong

10/10/2020, 4:20 AM
Hi, is there a proper way to launch an on demand k8s dask cluster i.e. KubeCluster and then run prefect tasks on it using DaskExecutor? I tried doing it but getting a
Copy code
ERROR - prefect.FlowRunner | Unexpected error: TypeError('code() takes at most 15 arguments (16 given)')
not sure if it’s a supported mode of operation.
c

Chris White

10/10/2020, 4:25 AM
Hi Tsang, yes it should work as you expect if you pass a KubeCluster to the Dask executor (see executor docs here: https://docs.prefect.io/api/latest/engine/executors.html#daskexecutor). What did you try?
t

Tsang Yong

10/10/2020, 4:30 AM
yea so I got some simple tasks setup as ‘flow’ and then try to use this following syntax to run. was expecting some pods running but I ended up with a pickle error..
Copy code
dask.config.set({'kubernetes.worker_template_path': 'my-prefect-dask-worker.yml'})

dask_executor = DaskExecutor(
    cluster_class=KubeCluster,
    cluster_kwargs={
        'name': 'dask-worker-{JUPYTERHUB_USER}-{uuid}',
        'n_workers': 3
    },
)
flow.run(executor=dask_executor)
the error trace back to
Copy code
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 600, in get_flow_run_state
    final_states = executor.wait(
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 451, in wait
    return self.client.gather(futures)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 1987, in gather
    return self.sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 833, in sync
    return sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 1852, in _gather
    raise exception.with_traceback(traceback)
  File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
TypeError: code() takes at most 15 arguments (16 given)
perhaps I have a bad version?
Copy code
distributed==2.28.0
prefect==0.13.4
c

Chris White

10/10/2020, 4:34 AM
Yea this seems like a dask / dask-kubernetes issue - perhaps search their repo for this error message?
Actually maybe try upgrading cloudpickle
t

Tsang Yong

10/10/2020, 4:39 AM
I’m already on
Copy code
cloudpickle==1.6.0
(hope it’s ok to post the full trace) and thanks for the pointer. I’ll have to keep looking…
👍 1
j

Jim Crist-Harif

10/10/2020, 1:45 PM
That's a serialization error, I suspect there's a version difference between your local process and the dask worker nodes. Things to check: • python version • cloud pickle version • prefect version • distributed version The versions of these in the dask image your workers are running should match those you're using locally.
t

Tsang Yong

10/10/2020, 4:55 PM
just recreated the environment with matching versions. it worked. thanks guys.
💯 1