cc. <@U01QEJ9PP53> I'm trying to lock down our CD...
# ask-community
c
cc. @Kevin Kho I'm trying to lock down our CDK deployment so that our Prefect Agent, Flow Runner, Executor and workers are in security groups. I.E. Agent & Runner in Sec1, Executor and Workers in Sec 2. Sec1 can talk to Sec 2, they can both talk to the outside world, but they do not allow external ingress. My issue I'm having is that Prefect seems to try to connect to the Public IP of the Dask Scheduler task in ECS, rather than the Local (within the VPC) IP of the Dask Scheduler task. Is there a way to get Prefect/Dask Executor to use the Local (AWS Private IP) of the Scheduler? It would really help trying to make our infra a bit more secure.
z
Hey @ciaran -- can you show • How you're configuring your dask executor • Logs indicating the connection you're seeing It seems likely that this is something on the Dask side -- we mostly just pass through config options here.
c
@Zanie here's the DaskExecutor definition:
Copy code
executor = DaskExecutor(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    cluster_kwargs={
        "image": worker_image,
        "vpc": outputs["vpc_output"],
        "cluster_arn": outputs["cluster_arn_output"],
        "task_role_arn": outputs["task_role_arn_output"],
        "execution_role_arn": outputs["task_execution_role_arn_output"],
        "security_groups": [outputs["dask_security_group_output"]],
        "n_workers": 1,
        "scheduler_cpu": 256,
        "scheduler_mem": 512,
        "worker_cpu": 1024,
        "worker_mem": 2048,
        "scheduler_timeout": "15 minutes",
        "tags": tags["tag_dict"],
    },
)
The log from Prefect I'm seeing is:
Copy code
Unexpected error: OSError('Timed out trying to connect to <tcp://34.219.0.113:8786> after 10 s')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect
    comm = await asyncio.wait_for(
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 317, in _start
    await super()._start()
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 73, in _start
    comm = await self.scheduler_comm.live_comm()
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 747, in live_comm
    comm = await connect(
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
    raise IOError(
OSError: Timed out trying to connect to <tcp://34.219.0.113:8786> after 10 s

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect
    comm = await asyncio.wait_for(
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 421, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 213, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 1367, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 733, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 282, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 189, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 930, in _start
    await super()._start()
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 320, in _start
    await self._close()
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 419, in _close
    await self.scheduler_comm.close(close_workers=True)
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 789, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 747, in live_comm
    comm = await connect(
  File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
    raise IOError(
OSError: Timed out trying to connect to <tcp://34.219.0.113:8786> after 10 s
Essentially I think I need Dask to advertise it's private IP rather than its public IP to Prefect
z
Have you tried setting _`fargate_use_private_ip`_ ?
c
Yep, but that breaks because the instance still needs a public ip to be able to communicate with AWS Secrets Manager
Actually sorry, not Secrets Manager, ECR. If I flip to private I see:
Copy code
ResourceInitializationError: unable to pull secrets or registry auth: execution resource retrieval failed: unable to retrieve ecr registry auth: service call has been retried 1 time(s): RequestError: send request failed caused by: Post <https://api.ecr>....
But we have a case where we want public IPs (so that we can get to the dashboard) but we just want communication between prefect & dask to be via the private ips
z
I think you'll have to open an issue in the dask cloudprovider repo, it's beyond my knowledge to provide any real advice here -- I'd just be reading the dask docs
c
@Zanie Okay we'll regroup internally and try to get a issue up!
c
@ciaran Did you ever find a solution to this? We are in the same situation.
c
Hey @Carter Kwon, we didn't. For expediency we just went without them for now. We're certainly going to back to it though! So would appreciate any insight you generate! We believe it's a Dask change required so that you can tell dask to use private ips internally, but also expose its public IP.