Thread
#prefect-community
    c

    Chris Leber

    11 months ago
    Hello friends! I am running into some issues using an ECSCluster with the DaskExecutor. A brief overview of my setup: using Prefect cloud tied in with AWS infrastructure (e.g. running prefect-agent as a service on an ECSCluster) A flow might look something like this:
    file_list = ['./file_1.txt', './file_2.txt', './file_3.txt', './file_4.txt', './file_5.txt', './file_6.txt', './file_7.txt', ..., './file_100.txt']
    
    @task
    def test_task(input_file):
        result = some_function(input_file)
        return result
    
    def ecs_cluster(n_workers=4):
        """Start a ecs cluster using the same image as the flow run"""
        return ECSCluster(
            n_workers=n_workers, image=prefect.context.image, region_name="us-east-1"
        )
    
    
    with Flow(
        "test flow",
        storage=S3(bucket="storage"),
        run_config=ECSRun(task_definition=task_definition),
    ) as flow:
        test_task.map(input_file=file_list)
    
    flow.executor = DaskExecutor(
        cluster_class=ecs_cluster, cluster_kwargs={"n_workers": 10}
    )
    Kevin Kho

    Kevin Kho

    11 months ago
    Hey @Chris Leber, could you move the traceback to the thread when you get a chance to not crowd the main channel? ๐Ÿ‘ About the error, I think your Dask cluster doesnโ€™t have the creds to do your task. I assume this works with LocalExecutor? Or is this failing to start the cluster?
    c

    Chris Leber

    11 months ago
    Sure thing!
    TRACEBACK: It is feeling like an issue may be with how AWS creds are being shared. Running this flow yields this error:
    Unexpected error: NoRegionError('You must specify a region.')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
        return next(self.gen)
      File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 223, in start
        with self.cluster_class(**self.cluster_kwargs) as cluster:
      File "prefect/test_flows.py", line 276, in ecs_cluster
      File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 735, in __init__
        super().__init__(**kwargs)
      File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 283, in __init__
        self.sync(self._start)
      File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 214, in sync
        return sync(self.loop, func, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
        result[0] = yield future
      File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 761, in _start
        await _cleanup_stale_resources()
      File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 1396, in _cleanup_stale_resources
        async with session.create_client("ecs") as ecs:
      File "/usr/local/lib/python3.8/site-packages/aiobotocore/session.py", line 37, in __aenter__
        self._client = await self._coro
      File "/usr/local/lib/python3.8/site-packages/aiobotocore/session.py", line 121, in _create_client
        client = await client_creator.create_client(
      File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 32, in create_client
        client_args = self._get_client_args(
      File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 88, in _get_client_args
        return args_creator.get_client_args(
      File "/usr/local/lib/python3.8/site-packages/aiobotocore/args.py", line 18, in get_client_args
        final_args = self.compute_client_args(
      File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 148, in compute_client_args
        endpoint_config = self._compute_endpoint_config(
      File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 220, in _compute_endpoint_config
        return self._resolve_endpoint(**resolve_endpoint_kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 302, in _resolve_endpoint
        return endpoint_bridge.resolve(
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 430, in resolve
        resolved = self.endpoint_resolver.construct_endpoint(
      File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 133, in construct_endpoint
        result = self._endpoint_for_partition(
      File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 148, in _endpoint_for_partition
        raise NoRegionError()
    botocore.exceptions.NoRegionError: You must specify a region.
    @Kevin Kho Local and LocalDask both work; even DaskExecutor() works if I leave all defaults. Things fall apart when I try to define an ECS or Fargate cluster class. Not sure what else I would need to do to propagate the AWS creds, as my flows interact with other AWS services just fine (and my agent lives in an ECSCluster)
    Kevin Kho

    Kevin Kho

    11 months ago
    Try passing task role arn and execution role arn to
    ECSRun
    ? or you can try passing env variables to the ECSRun or Agent for authentication of boto3?
    You can also try:
    from dask_cloudprovider.aws import ECSCluster
    cluster = ECSCluster(
        cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
        worker_gpu=1)
    and I guess the region gets sent this way?
    You can check task_role_arn and execution_role_arn in the ECSCluster docs here
    c

    Chris Leber

    11 months ago
    Providing the arn of an already existing ECSCluster does not solve the issue, or change the error. Exploring your other 2 suggestions.
    Kevin Kho

    Kevin Kho

    11 months ago
    You can do execution role and task role in the Cluster creation also
    Thanks for moving the traceback btw!
    Anna Geller

    Anna Geller

    11 months ago
    @Chris Leber you could also try setting a default boto3 session with your region:
    import boto3
    
    boto3.setup_default_session(region_name="us-east-1")
    c

    Chris Leber

    11 months ago
    Thanks for the ideas @Kevin Kho and @Anna Geller Unfortunately, no resolution or change in error trace yet. I have tried asserting the region in a number of different ways (boto, os.env, region_name in cluster class) and have indicated arns for appropriate ecs clusters, execution roles, and task roles.
    I also made sure I had a default profile in .aws/config that pointed to the appropriate region. In all cases, region is failing to propagate to 'inside' of the DaskExecutor, and its creation of a cluster. Quite perplexing
    Kevin Kho

    Kevin Kho

    11 months ago
    Maybe you can just do it upon cluster creation? Environment variables do not propagate so you need to pass it explicitly. Dask clusters tend to have the
    env
    argument so you can do:
    from dask_cloudprovider.aws import ECSCluster
    cluster = ECSCluster(
        cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
        env={"AWS_REGION": "us-east-1"}
        worker_gpu=1)
    So that the cluster gets the environment variable?