Hi there, I am trying to use the `DaskTaskRunner` ...
# prefect-community
t
Hi there, I am trying to use the
DaskTaskRunner
with the
KubeCluster
-type from
dask-kubernetes
, however I am encountering the following error:
Copy code
Crash details:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 161, in start
    await self._start(exit_stack)
  File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 300, in _start
    self._client = await exit_stack.enter_async_context(
  File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1398, in __aenter__
    await self
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1213, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1276, in _ensure_connected
    comm = await connect(
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 487, in connect
    ip, port = parse_host_port(address)
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
    port = _default()
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 73, in _default
    raise ValueError(f"missing port number in address {address!r}")
ValueError: missing port number in address '<Not Connected>'
The taskrunner is set dynamically:
Copy code
flow.task_runner = DaskTaskRunner(
            cluster_class=KubeCluster,
            cluster_kwargs={
                "name": f"{flow.name}",
                "namespace": "prefect",
                "image": self._storage.get_name(),
                "n_workers": 1,
                "resources": {
                    "limits": {
                        "cpu": resources.cpu,
                        "memory": resources.mem,
                    }
                },
            },
            adapt_kwargs={
                "minimum": self._min_workers,
                "maximum": self._max_workers,
            },
            client_kwargs={
                "set_as_default": True
            }
        )
Does anyone who might be causing this?
a
could you give more details? what's your
prefect version
?
this code looks like a method from a class, hard to see how it's defined and used
s
Running into the same issue on my side.
Just posted my stack trace as well.
Prefect version on my side is 2.6.7
r
Out of curiosity, what versions of the
dask
and
dask-kubernetes
packages do you have installed?
The reason I ask: there was a new release of Dask yesterday (which uses a new release of the
distributed
package in the stacktrace). So depending on which version of Dash=k you're running on the machine with Prefect and what
image
you are passing to
KubeCluster
, you might have a Dask version mismatch between the two locations. I'm not yet sure if that's the cause of the issue but it is worth a quick look.
gratitude thank you 2
t
I am running Prefect server version 2.6.7. I have
dask
version
2022.11.0
,
distributed
version
2022.11.0
and
dask_kubernetes
version
2022.10.1
so a version mismatch could be likely. The dask operator is version
2022.11.0
btw.
It seems that
2022.10.1
is the most recent version of
dask_kubernetes
😕
downgraded
dask
,
distributed
and
dask_kubernetes
to version
2022.10.0
, but no luck
also downgraded the dask operator to version
2022.10.0
, but still no luck
s
@Thomas Fredriksen double check for me. Did you try the classic KubeCluster as well. That is still on my todo list to figure out but if you have already that would be good to know.
t
@Sander No, I have not tested that yet
👍 1
s
I think that may work as the operator setup is quite recent and may not have been included in all dependent live yet. Once I have tested that I’ll share the results.
gratitude thank you 2
🙌 2
t
awesome, thank you!
s
@Thomas Fredriksen it seems that the classic interface works better. I was spinning up the cluster through a kubernetesJob and need to fine tune some rbac stuff in the cluster.
However, I think there is some bug somewhere in prefect 2.6.7 though. I get the following stack trace at the end after the cluster is torn down.
[…]
Copy code
10:01:24 PM
INFO
Executing 'load-6fb3f87d-0' immediately...
10:01:24 PM
ERROR
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'is_completed'
10:01:30 PM
load-6fb3f87d-0
ERROR
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/prefect/ephemeral_dask.py", line 46, in main
    result = load(numbers=numbers_twice)
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 360, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 204, in submit
    result = await call()
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1108, in begin_task_run
    return await orchestrate_task_run(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1172, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1463, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 285, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 285, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
    items = [visit_nested(o) for o in expr]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
    items = [visit_nested(o) for o in expr]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
    result = visit_fn(expr)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1453, in resolve_input
    if not state.is_completed() and not (
AttributeError: 'coroutine' object has no attribute 'is_completed'
10:01:30 PM
ERROR
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'
Btw. This is a copy paste from the prefect Orion log screen.
t
@Sander Did you by any chance retry this in any of the new releases? Have you found a functional workaround?
s
Definitely. The issue was that I didn’t install prefect on the dask workers. Let me forward the thread.
t
Wow, this looks super promising, thank you so much!
s
It worked like a charm. They also fixed the incorrect handling of the dask worker errors now as well.
🙌 1
t
Fantastic! I will give this another try then 😛
🚀 2
@Sander What version of the dask gateway,
kubecluster
and
prefect-dask
did you en up using?
s
I used the latest classic KubeCluster and I think latest prefect versions should be fine.
I’m planning on resetting everything up again in the coming days in a dev k8s on my Mac.
t
Thanks for the feedback. I was not aware KubeCluster provided the classic implementation. I ran into an attribute error
'V1PodSpec' object has no attribute '_host_users'
, not sure why this happens.
s
The operator model is quite new and I don’t think that supported yet.
Don’t know what this error means :)
t
When the flow is loaded, dask appears to try to load the cluster client from the environment, which results in a `ValueError`:
Copy code
Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/opt/prefect/flows/example_flow_dask_runtime.py", line 6, in <module>
    example_flow_dask_runtime = cloudpickle.loads(open("example_flow_dask_runtime.pickle", "rb").read())
  File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 326, in __setstate__
    self._client = distributed.get_client()
  File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 2737, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
I have been countering this by overloading `DaskTaskRunner`:
Copy code
class _DaskTaskRunnerWrapper(DaskTaskRunner):
    @property
    def name(self):
        return "ephemeralDask"

    def __setstate__(self, state: dict):
        try:
            super().__setstate__(state)
        except ValueError:
            # Ignore value error raised by `distributed.get_client()`
            pass
Did you encounter something similar?
correction:
prefect-dask
is attempting to load the dask client
s
Not really
t
AttributeError
was attributed (pun intended) to how I pickle my flows before executing them
😂 1
💯 1
classic
KubeCluster
was indeed the secret to making this work. Thank you so much!
🙌 2
🎉 1