Thomas Fredriksen
11/16/2022, 7:29 PMDaskTaskRunner
with the KubeCluster
-type from dask-kubernetes
, however I am encountering the following error:
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 161, in start
await self._start(exit_stack)
File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 300, in _start
self._client = await exit_stack.enter_async_context(
File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1398, in __aenter__
await self
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1213, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1276, in _ensure_connected
comm = await connect(
File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
return fut.result()
File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 487, in connect
ip, port = parse_host_port(address)
File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
port = _default()
File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 73, in _default
raise ValueError(f"missing port number in address {address!r}")
ValueError: missing port number in address '<Not Connected>'
The taskrunner is set dynamically:
flow.task_runner = DaskTaskRunner(
cluster_class=KubeCluster,
cluster_kwargs={
"name": f"{flow.name}",
"namespace": "prefect",
"image": self._storage.get_name(),
"n_workers": 1,
"resources": {
"limits": {
"cpu": resources.cpu,
"memory": resources.mem,
}
},
},
adapt_kwargs={
"minimum": self._min_workers,
"maximum": self._max_workers,
},
client_kwargs={
"set_as_default": True
}
)
Does anyone who might be causing this?Anna Geller
11/16/2022, 9:03 PMprefect version
?Sander
11/16/2022, 9:46 PMRyan Peden
11/16/2022, 9:49 PMdask
and dask-kubernetes
packages do you have installed?distributed
package in the stacktrace).
So depending on which version of Dash=k you're running on the machine with Prefect and what image
you are passing to KubeCluster
, you might have a Dask version mismatch between the two locations. I'm not yet sure if that's the cause of the issue but it is worth a quick look.Thomas Fredriksen
11/17/2022, 9:17 AMdask
version 2022.11.0
, distributed
version 2022.11.0
and dask_kubernetes
version 2022.10.1
so a version mismatch could be likely. The dask operator is version 2022.11.0
btw.2022.10.1
is the most recent version of dask_kubernetes
😕dask
, distributed
and dask_kubernetes
to version 2022.10.0
, but no luck2022.10.0
, but still no luckSander
11/17/2022, 10:29 AMThomas Fredriksen
11/17/2022, 10:49 AMSander
11/17/2022, 10:52 AMThomas Fredriksen
11/17/2022, 10:52 AMSander
11/17/2022, 9:56 PM10:01:24 PM
INFO
Executing 'load-6fb3f87d-0' immediately...
10:01:24 PM
ERROR
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'is_completed'
10:01:30 PM
load-6fb3f87d-0
ERROR
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/prefect/ephemeral_dask.py", line 46, in main
result = load(numbers=numbers_twice)
File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 360, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 74, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 204, in submit
result = await call()
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1108, in begin_task_run
return await orchestrate_task_run(
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1172, in orchestrate_task_run
resolved_parameters = await resolve_inputs(parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1463, in resolve_inputs
return await run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 285, in visit_collection
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 285, in <listcomp>
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
return visit_collection(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
items = [visit_nested(o) for o in expr]
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
items = [visit_nested(o) for o in expr]
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
return visit_collection(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
result = visit_fn(expr)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1453, in resolve_input
if not state.is_completed() and not (
AttributeError: 'coroutine' object has no attribute 'is_completed'
10:01:30 PM
ERROR
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'
Thomas Fredriksen
12/07/2022, 12:16 PMSander
12/07/2022, 12:17 PMThomas Fredriksen
12/07/2022, 12:20 PMSander
12/07/2022, 12:20 PMThomas Fredriksen
12/07/2022, 12:21 PMkubecluster
and prefect-dask
did you en up using?Sander
12/08/2022, 8:32 AMThomas Fredriksen
12/08/2022, 10:02 AM'V1PodSpec' object has no attribute '_host_users'
, not sure why this happens.Sander
12/08/2022, 10:04 AMThomas Fredriksen
12/08/2022, 10:11 AMFlow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/opt/prefect/flows/example_flow_dask_runtime.py", line 6, in <module>
example_flow_dask_runtime = cloudpickle.loads(open("example_flow_dask_runtime.pickle", "rb").read())
File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 326, in __setstate__
self._client = distributed.get_client()
File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 2737, in get_client
raise ValueError("No global client found and no address provided")
ValueError: No global client found and no address provided
I have been countering this by overloading `DaskTaskRunner`:
class _DaskTaskRunnerWrapper(DaskTaskRunner):
@property
def name(self):
return "ephemeralDask"
def __setstate__(self, state: dict):
try:
super().__setstate__(state)
except ValueError:
# Ignore value error raised by `distributed.get_client()`
pass
Did you encounter something similar?prefect-dask
is attempting to load the dask clientSander
12/08/2022, 10:18 AMThomas Fredriksen
12/08/2022, 12:22 PMAttributeError
was attributed (pun intended) to how I pickle my flows before executing themKubeCluster
was indeed the secret to making this work. Thank you so much!