Is there a way to pass an existing dask client to ...
# ask-community
s
Is there a way to pass an existing dask client to Prefect (instead of pass a DaskExecutor which launches a cluster on its own) - asking in particular to be able to run coiled cluster commands to launch cluster, register some worker plugins, trigger a scale, then pass it to Prefect for execution.
k
Hi @Sébastien Arnaud! You can use Coiled like this
Copy code
executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "kvnkho/prefect",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
    },
)
This spins up for flow execution and then spins down after. For a cluster that already exists, you can pass the address
s
Thank you @Kevin Kho I already use it like this with no problem. The trick though is that I want to be able launch the cluster with a small amount of workers, then tell it to scale afterwards (so that no to wait for the 200-300 hundred workers to come online to start the work)
k
I think you want this
s
Perfect, I was also looking at reusing the same cluster for multiple workflows. Thank you!
b
Hi @Sébastien Arnaud / @Kevin Kho - have either of you had issues connecting to an existing coiled cluster? I’m seeing the following TLS errors
Copy code
TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got None
full stacktrace:
Copy code
Traceback (most recent call last):
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/lib/python3.9/contextlib.py", line 117, in __enter__
    return next(self.gen)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/executors/dask.py", line 208, in start
    with Client(self.address, **self.client_kwargs) as client:
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 760, in __init__
    self.start(timeout=timeout)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 1006, in start
    sync(self.loop, self._start, **kwargs)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/utils.py", line 338, in sync
    raise exc.with_traceback(tb)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/utils.py", line 321, in f
    result[0] = yield future
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 1096, in _start
    await self._ensure_connected(timeout=timeout)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 1153, in _ensure_connected
    comm = await connect(
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/core.py", line 285, in connect
    comm = await asyncio.wait_for(
  File "/usr/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 376, in connect
    kwargs = self._get_connect_args(**connection_args)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 418, in _get_connect_args
    ctx = _expect_tls_context(connection_args)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 350, in _expect_tls_context
    raise TypeError(
TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got None
the
executor.start()
/
distributed self.start()
looks suspect:
Copy code
File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/lib/python3.9/contextlib.py", line 117, in __enter__
    return next(self.gen)
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/executors/dask.py", line 208, in start
    with Client(self.address, **self.client_kwargs) as client:
  File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 760, in __init__
    self.start(timeout=timeout)
k
Hey @Brad, I haven't experienced this. Could you show how you use the Executor? Any settings on your Coiled cluster? I can try on Python 3.9 a bit later