https://prefect.io logo
Title
t

Thomas Fredriksen

11/16/2022, 7:29 PM
Hi there, I am trying to use the
DaskTaskRunner
with the
KubeCluster
-type from
dask-kubernetes
, however I am encountering the following error:
Crash details:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 161, in start
    await self._start(exit_stack)
  File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 300, in _start
    self._client = await exit_stack.enter_async_context(
  File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1398, in __aenter__
    await self
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1213, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1276, in _ensure_connected
    comm = await connect(
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 487, in connect
    ip, port = parse_host_port(address)
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
    port = _default()
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 73, in _default
    raise ValueError(f"missing port number in address {address!r}")
ValueError: missing port number in address '<Not Connected>'
The taskrunner is set dynamically:
flow.task_runner = DaskTaskRunner(
            cluster_class=KubeCluster,
            cluster_kwargs={
                "name": f"{flow.name}",
                "namespace": "prefect",
                "image": self._storage.get_name(),
                "n_workers": 1,
                "resources": {
                    "limits": {
                        "cpu": resources.cpu,
                        "memory": resources.mem,
                    }
                },
            },
            adapt_kwargs={
                "minimum": self._min_workers,
                "maximum": self._max_workers,
            },
            client_kwargs={
                "set_as_default": True
            }
        )
Does anyone who might be causing this?
a

Anna Geller

11/16/2022, 9:03 PM
could you give more details? what's your
prefect version
?
this code looks like a method from a class, hard to see how it's defined and used
s

Sander

11/16/2022, 9:46 PM
Running into the same issue on my side.
Just posted my stack trace as well.
Prefect version on my side is 2.6.7
r

Ryan Peden

11/16/2022, 9:49 PM
Out of curiosity, what versions of the
dask
and
dask-kubernetes
packages do you have installed?
The reason I ask: there was a new release of Dask yesterday (which uses a new release of the
distributed
package in the stacktrace). So depending on which version of Dash=k you're running on the machine with Prefect and what
image
you are passing to
KubeCluster
, you might have a Dask version mismatch between the two locations. I'm not yet sure if that's the cause of the issue but it is worth a quick look.
:gratitude-thank-you: 2
t

Thomas Fredriksen

11/17/2022, 9:17 AM
I am running Prefect server version 2.6.7. I have
dask
version
2022.11.0
,
distributed
version
2022.11.0
and
dask_kubernetes
version
2022.10.1
so a version mismatch could be likely. The dask operator is version
2022.11.0
btw.
It seems that
2022.10.1
is the most recent version of
dask_kubernetes
๐Ÿ˜•
downgraded
dask
,
distributed
and
dask_kubernetes
to version
2022.10.0
, but no luck
also downgraded the dask operator to version
2022.10.0
, but still no luck
s

Sander

11/17/2022, 10:29 AM
@Thomas Fredriksen double check for me. Did you try the classic KubeCluster as well. That is still on my todo list to figure out but if you have already that would be good to know.
t

Thomas Fredriksen

11/17/2022, 10:49 AM
@Sander No, I have not tested that yet
๐Ÿ‘ 1
s

Sander

11/17/2022, 10:52 AM
I think that may work as the operator setup is quite recent and may not have been included in all dependent live yet. Once I have tested that Iโ€™ll share the results.
:gratitude-thank-you: 2
๐Ÿ™Œ 2
t

Thomas Fredriksen

11/17/2022, 10:52 AM
awesome, thank you!
s

Sander

11/17/2022, 9:56 PM
@Thomas Fredriksen it seems that the classic interface works better. I was spinning up the cluster through a kubernetesJob and need to fine tune some rbac stuff in the cluster.
However, I think there is some bug somewhere in prefect 2.6.7 though. I get the following stack trace at the end after the cluster is torn down.
[โ€ฆ]
10:01:24 PM
INFO
Executing 'load-6fb3f87d-0' immediately...
10:01:24 PM
ERROR
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'is_completed'
10:01:30 PM
load-6fb3f87d-0
ERROR
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/prefect/ephemeral_dask.py", line 46, in main
    result = load(numbers=numbers_twice)
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 360, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 204, in submit
    result = await call()
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1108, in begin_task_run
    return await orchestrate_task_run(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1172, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1463, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 285, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 285, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
    items = [visit_nested(o) for o in expr]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
    items = [visit_nested(o) for o in expr]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
    result = visit_fn(expr)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1453, in resolve_input
    if not state.is_completed() and not (
AttributeError: 'coroutine' object has no attribute 'is_completed'
10:01:30 PM
ERROR
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'
Btw. This is a copy paste from the prefect Orion log screen.
t

Thomas Fredriksen

12/07/2022, 12:16 PM
@Sander Did you by any chance retry this in any of the new releases? Have you found a functional workaround?
s

Sander

12/07/2022, 12:17 PM
Definitely. The issue was that I didnโ€™t install prefect on the dask workers. Let me forward the thread.
t

Thomas Fredriksen

12/07/2022, 12:20 PM
Wow, this looks super promising, thank you so much!
s

Sander

12/07/2022, 12:20 PM
It worked like a charm. They also fixed the incorrect handling of the dask worker errors now as well.
๐Ÿ™Œ 1
t

Thomas Fredriksen

12/07/2022, 12:21 PM
Fantastic! I will give this another try then ๐Ÿ˜›
๐Ÿš€ 2
@Sander What version of the dask gateway,
kubecluster
and
prefect-dask
did you en up using?
s

Sander

12/08/2022, 8:32 AM
I used the latest classic KubeCluster and I think latest prefect versions should be fine.
Iโ€™m planning on resetting everything up again in the coming days in a dev k8s on my Mac.
t

Thomas Fredriksen

12/08/2022, 10:02 AM
Thanks for the feedback. I was not aware KubeCluster provided the classic implementation. I ran into an attribute error
'V1PodSpec' object has no attribute '_host_users'
, not sure why this happens.
s

Sander

12/08/2022, 10:04 AM
The operator model is quite new and I donโ€™t think that supported yet.
Donโ€™t know what this error means :)
t

Thomas Fredriksen

12/08/2022, 10:11 AM
When the flow is loaded, dask appears to try to load the cluster client from the environment, which results in a `ValueError`:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/opt/prefect/flows/example_flow_dask_runtime.py", line 6, in <module>
    example_flow_dask_runtime = cloudpickle.loads(open("example_flow_dask_runtime.pickle", "rb").read())
  File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 326, in __setstate__
    self._client = distributed.get_client()
  File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 2737, in get_client
    raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
I have been countering this by overloading `DaskTaskRunner`:
class _DaskTaskRunnerWrapper(DaskTaskRunner):
    @property
    def name(self):
        return "ephemeralDask"

    def __setstate__(self, state: dict):
        try:
            super().__setstate__(state)
        except ValueError:
            # Ignore value error raised by `distributed.get_client()`
            pass
Did you encounter something similar?
correction:
prefect-dask
is attempting to load the dask client
s

Sander

12/08/2022, 10:18 AM
Not really
t

Thomas Fredriksen

12/08/2022, 12:22 PM
AttributeError
was attributed (pun intended) to how I pickle my flows before executing them
๐Ÿ˜‚ 1
๐Ÿ’ฏ 1
classic
KubeCluster
was indeed the secret to making this work. Thank you so much!
๐Ÿ™Œ 2
๐ŸŽ‰ 1