Thread
#prefect-community
    Sébastien Arnaud

    Sébastien Arnaud

    1 year ago
    Is there a way to pass an existing dask client to Prefect (instead of pass a DaskExecutor which launches a cluster on its own) - asking in particular to be able to run coiled cluster commands to launch cluster, register some worker plugins, trigger a scale, then pass it to Prefect for execution.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Sébastien Arnaud! You can use Coiled like this
    executor = DaskExecutor(
        cluster_class=coiled.Cluster,
        cluster_kwargs={
            "software": "kvnkho/prefect",
            "shutdown_on_close": True,
            "name": "prefect-cluster",
        },
    )
    This spins up for flow execution and then spins down after. For a cluster that already exists, you can pass the address
    Sébastien Arnaud

    Sébastien Arnaud

    1 year ago
    Thank you @Kevin Kho I already use it like this with no problem. The trick though is that I want to be able launch the cluster with a small amount of workers, then tell it to scale afterwards (so that no to wait for the 200-300 hundred workers to come online to start the work)
    Kevin Kho

    Kevin Kho

    1 year ago
    I think you want this
    Sébastien Arnaud

    Sébastien Arnaud

    1 year ago
    Perfect, I was also looking at reusing the same cluster for multiple workflows. Thank you!
    Brad

    Brad

    1 year ago
    Hi @Sébastien Arnaud / @Kevin Kho - have either of you had issues connecting to an existing coiled cluster? I’m seeing the following TLS errors
    TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got None
    full stacktrace:
    Traceback (most recent call last):
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/lib/python3.9/contextlib.py", line 117, in __enter__
        return next(self.gen)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/executors/dask.py", line 208, in start
        with Client(self.address, **self.client_kwargs) as client:
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 760, in __init__
        self.start(timeout=timeout)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 1006, in start
        sync(self.loop, self._start, **kwargs)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/utils.py", line 338, in sync
        raise exc.with_traceback(tb)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/utils.py", line 321, in f
        result[0] = yield future
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 1096, in _start
        await self._ensure_connected(timeout=timeout)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 1153, in _ensure_connected
        comm = await connect(
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/core.py", line 285, in connect
        comm = await asyncio.wait_for(
      File "/usr/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
        return fut.result()
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 376, in connect
        kwargs = self._get_connect_args(**connection_args)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 418, in _get_connect_args
        ctx = _expect_tls_context(connection_args)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 350, in _expect_tls_context
        raise TypeError(
    TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got None
    the
    executor.start()
    /
    distributed self.start()
    looks suspect:
    File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/lib/python3.9/contextlib.py", line 117, in __enter__
        return next(self.gen)
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/prefect/executors/dask.py", line 208, in start
        with Client(self.address, **self.client_kwargs) as client:
      File "/home/brad/.cache/pypoetry/virtualenvs/betting-data-xyxHMm3k-py3.9/lib/python3.9/site-packages/distributed/client.py", line 760, in __init__
        self.start(timeout=timeout)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Brad, I haven't experienced this. Could you show how you use the Executor? Any settings on your Coiled cluster? I can try on Python 3.9 a bit later