Hello friends! I am running into some issues using...
# ask-community
c
Hello friends! I am running into some issues using an ECSCluster with the DaskExecutor. A brief overview of my setup: using Prefect cloud tied in with AWS infrastructure (e.g. running prefect-agent as a service on an ECSCluster) A flow might look something like this:
Copy code
file_list = ['./file_1.txt', './file_2.txt', './file_3.txt', './file_4.txt', './file_5.txt', './file_6.txt', './file_7.txt', ..., './file_100.txt']

@task
def test_task(input_file):
    result = some_function(input_file)
    return result

def ecs_cluster(n_workers=4):
    """Start a ecs cluster using the same image as the flow run"""
    return ECSCluster(
        n_workers=n_workers, image=prefect.context.image, region_name="us-east-1"
    )


with Flow(
    "test flow",
    storage=S3(bucket="storage"),
    run_config=ECSRun(task_definition=task_definition),
) as flow:
    test_task.map(input_file=file_list)

flow.executor = DaskExecutor(
    cluster_class=ecs_cluster, cluster_kwargs={"n_workers": 10}
)
k
Hey @Chris Leber, could you move the traceback to the thread when you get a chance to not crowd the main channel? ๐Ÿ‘ About the error, I think your Dask cluster doesnโ€™t have the creds to do your task. I assume this works with LocalExecutor? Or is this failing to start the cluster?
c
Sure thing!
TRACEBACK: It is feeling like an issue may be with how AWS creds are being shared. Running this flow yields this error:
Copy code
Unexpected error: NoRegionError('You must specify a region.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 223, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "prefect/test_flows.py", line 276, in ecs_cluster
  File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 735, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 283, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 214, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 761, in _start
    await _cleanup_stale_resources()
  File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 1396, in _cleanup_stale_resources
    async with session.create_client("ecs") as ecs:
  File "/usr/local/lib/python3.8/site-packages/aiobotocore/session.py", line 37, in __aenter__
    self._client = await self._coro
  File "/usr/local/lib/python3.8/site-packages/aiobotocore/session.py", line 121, in _create_client
    client = await client_creator.create_client(
  File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 32, in create_client
    client_args = self._get_client_args(
  File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 88, in _get_client_args
    return args_creator.get_client_args(
  File "/usr/local/lib/python3.8/site-packages/aiobotocore/args.py", line 18, in get_client_args
    final_args = self.compute_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 148, in compute_client_args
    endpoint_config = self._compute_endpoint_config(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 220, in _compute_endpoint_config
    return self._resolve_endpoint(**resolve_endpoint_kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 302, in _resolve_endpoint
    return endpoint_bridge.resolve(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 430, in resolve
    resolved = self.endpoint_resolver.construct_endpoint(
  File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 133, in construct_endpoint
    result = self._endpoint_for_partition(
  File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 148, in _endpoint_for_partition
    raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.
@Kevin Kho Local and LocalDask both work; even DaskExecutor() works if I leave all defaults. Things fall apart when I try to define an ECS or Fargate cluster class. Not sure what else I would need to do to propagate the AWS creds, as my flows interact with other AWS services just fine (and my agent lives in an ECSCluster)
k
Try passing task role arn and execution role arn to
ECSRun
? or you can try passing env variables to the ECSRun or Agent for authentication of boto3?
๐Ÿ‘€ 1
You can also try:
Copy code
from dask_cloudprovider.aws import ECSCluster
cluster = ECSCluster(
    cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
    worker_gpu=1)
and I guess the region gets sent this way?
You can check task_role_arn and execution_role_arn in the ECSCluster docs here
๐Ÿ‘€ 1
c
Providing the arn of an already existing ECSCluster does not solve the issue, or change the error. Exploring your other 2 suggestions.
k
You can do execution role and task role in the Cluster creation also
Thanks for moving the traceback btw!
๐Ÿ‘ 1
upvote 1
a
@Chris Leber you could also try setting a default boto3 session with your region:
Copy code
import boto3

boto3.setup_default_session(region_name="us-east-1")
c
Thanks for the ideas @Kevin Kho and @Anna Geller Unfortunately, no resolution or change in error trace yet. I have tried asserting the region in a number of different ways (boto, os.env, region_name in cluster class) and have indicated arns for appropriate ecs clusters, execution roles, and task roles.
I also made sure I had a default profile in .aws/config that pointed to the appropriate region. In all cases, region is failing to propagate to 'inside' of the DaskExecutor, and its creation of a cluster. Quite perplexing
k
Maybe you can just do it upon cluster creation? Environment variables do not propagate so you need to pass it explicitly. Dask clusters tend to have the
env
argument so you can do:
Copy code
from dask_cloudprovider.aws import ECSCluster
cluster = ECSCluster(
    cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
    env={"AWS_REGION": "us-east-1"}
    worker_gpu=1)
So that the cluster gets the environment variable?
299 Views