Hello, I am currently trying to use a DaskExecutor...
# ask-community
f
Hello, I am currently trying to use a DaskExecutor in an ECSRun to parallelize a flow. I’m following the documentation to create a temporary cluster with a specified worker image. My flow currently looks something like this:
Copy code
def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)

class LeoFlow(PrefectFlow):

    def generate_flow(self):
        with Flow(name=self.name, storage=S3(bucket="raptormaps-prefect-flows")) as flow:
            ...
        flow.executor = DaskExecutor(
            cluster_class=fargate_cluster,
            cluster_kwargs={"n_workers": 4}
        )
        return flow
In the dockerfile for the image that I’m specifying in the ECSRun, I have included the following line to install dask-cloudprovider:
Copy code
RUN pip install dask-cloudprovider[aws]
However, when I execute the flow, I am hitting the following error:
Copy code
Unexpected error: AttributeError("module 'aiobotocore' has no attribute 'get_session'",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 58, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 726, in __init__
    self.session = aiobotocore.get_session()
AttributeError: module 'aiobotocore' has no attribute 'get_session'
Is there a specific version of dask_cloudprovider that Prefect requires?
k
I know
aiobotocore==1.4.0
caused a few issues for Dask when it was released so it was advised to pin to 1.3.3. What version are you on?
f
Looks like I’m on 2.0.0. Let me try to specify the version to 1.3.3
@Kevin Kho is there a recommended version of boto3 and botocore as well? Your help cleared up the above issue but now I’m seeing the following error:
Copy code
Unexpected error: ClientError('An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 58, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 727, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 281, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 189, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
    result[0] = yield future
  File "/usr/local/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 753, in _start
    await _cleanup_stale_resources()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1414, in _cleanup_stale_resources
    taskDefinition=task_definition_arn
  File "/usr/local/lib/python3.6/site-packages/aiobotocore/client.py", line 155, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded
Some quick googling says this is related to my boto3 version. Currently my requirements look as follows:
Copy code
aiobotocore==1.3.3
boto3==1.17.55
botocore==1.20.106
k
I did see this one once. I don’t think we have recommended versions beyond the
"dask_cloudprovider[aws] >= 0.2.0"
. Unrelated though, but I think support for 3.6 will be dropped in a few months. I think the general rule here is that Prefect just spins up a cluster with your versions so as long as you are able to spin up a cluster without Prefect and it’s stable, that should work for Prefect
Have you seen this issue? that’s what you are running into right?
f
Looks similar but on different aws calls (mine is deregister theirs is describelogstream). I’m not entirely convinced that I’ve done everything necessary to make this work on the AWS side. Is there anything more that I need to do to parallelize within an ECSRun aside from the simple executor change I show above?
Just for context, my flow executes successfully using an ECS fargate agent and ECSRun without the Dask executor
k
Oh my bad I didn’t see it was ECS. What is your Prefect version?
This might help . Available in 0.15.7
upvote 1
a
@Frank Oplinger good question, you already configured Dask executor and installed dask cloud provider package, so another thing to do would be to ensure that your ECS Agent has all necessary permissions on the IAM task role to create a new ECS cluster for dask and this includes IAM and VPC permissions - for testing, you could add full access on those two services. Also, not sure where those would need to be set on the dask cloud provider side, but in general to handle AWS request throttling, you can configure retries e.g. via env variables on the machine on which you start the agent, as described here https://docs.prefect.io/orchestration/agents/ecs.html#throttling-errors-on-flow-submission you can set it on the ECS agent before starting it and check if that helps
f
prefect==0.15.9. Thank you both! Will give that a shot
Unfortunately no luck here. I’ve tried setting the AWS_RETRY_MODE and AWS_MAX_ATTEMPTS on the ecs agent before starting it. I also confirmed htat my IAM task role is permissioned correctly. One thing that is really strange that I’m seeing is, even after setting the max retry attempts to 10, I am still seeing that same error that says it only tried 4 times
Copy code
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded
I’m setting those env vars in the same way described in the linked docs. Do you guys have any other suggestions for me to try?
a
by looking at Dask cloud provider code, you get this throttling error when the cleanup step is executed (_cleanup_stale_resources), and the cluster class has a flag to disable this cleanup. Maybe you could add
skip_cleanup=True
to your FargateCluster? just to eliminate this deregistering of task definition
Copy code
def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
f
Will give that a shot, really appreciate your help
👍 1
Looks like that got me past my last issue. Hopefully one last question for you, I’m seeing a permission error thrown now indicating that my agent IAM is missing permission:
Copy code
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the CreateRole operation: User: arn:aws:iam::[user]:user/perfect_io is not authorized to perform: iam:CreateRole on resource: arn:aws:iam::[user]:role/dask-e4022a2e-6-execution-role
Is it typical for the agent to require the administrator role in aws? I feel like that’s indicating to me that something still isn’t right
k
I think
dask_cloudprovider
uses it to create roles and pass them to workers so they have access to other AWS Resources but not 100% sure. You can check the link though and verify it is needed
a
@Frank Oplinger the IAM permissions, incl iam:CreateRole are required for the dask cloudprovider to create an entirely new ECS cluster and several ECS tasks for the dask resources (dask scheduler, workers). In the same source code I sent you before, in the docstring, there is an IAM role that Dask needs. I mentioned before that for testing, you could set IAM + VPC + ECS full access just to test the flow. But for production, you would have to limit the permissions e.g. as in this docstring:
Copy code
{
  "Statement": [
    {
      "Action": [
        "ec2:AuthorizeSecurityGroupIngress",
        "ec2:CreateSecurityGroup",
        "ec2:CreateTags",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DescribeSecurityGroups",
        "ec2:DescribeSubnets",
        "ec2:DescribeVpcs",
        "ec2:DeleteSecurityGroup",
        "ecs:CreateCluster",
        "ecs:DescribeTasks",
        "ecs:ListAccountSettings",
        "ecs:RegisterTaskDefinition",
        "ecs:RunTask",
        "ecs:StopTask",
        "ecs:ListClusters",
        "ecs:DescribeClusters",
        "ecs:DeleteCluster",
        "ecs:ListTaskDefinitions",
        "ecs:DescribeTaskDefinition",
        "ecs:DeregisterTaskDefinition",
        "iam:AttachRolePolicy",
        "iam:CreateRole",
        "iam:TagRole",
        "iam:PassRole",
        "iam:DeleteRole",
        "iam:ListRoles",
        "iam:ListRoleTags",
        "iam:ListAttachedRolePolicies",
        "iam:DetachRolePolicy",
        "logs:DescribeLogGroups",
        "logs:GetLogEvents",
        "logs:CreateLogGroup",
        "logs:PutRetentionPolicy"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ]
    }
  ],
  "Version": "2012-10-17"
}
f
Perfect, thanks again to you both
👍 1
Hi again, unfortunately want to follow up here with another issue. I was able to get the dask cloud provider permissioned correctly and can see it creating ecs clusters in my aws console. However, my flows continue to fail and show the following message:
Copy code
Unexpected error: RuntimeError('Scheduler exited unexpectedly!',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 63, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 727, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 281, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 189, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
    result[0] = yield future
  File "/usr/local/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 924, in _start
    await super()._start()
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 309, in _start
    self.scheduler = await self.scheduler
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 162, in _
    await self.start()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 303, in start
    await self._set_address_from_logs()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 210, in _set_address_from_logs
    raise RuntimeError("%s exited unexpectedly!" % type(self).__name__)
RuntimeError: Scheduler exited unexpectedly!
I can see the tasks themselves being launched in the temporary cluster that dask creates and am able to get logs off them here:
Copy code
2021-11-18 12:46:04Traceback (most recent call last):
2021-11-18 12:46:04File "/usr/local/bin/dask-scheduler", line 5, in <module>
2021-11-18 12:46:04from distributed.cli.dask_scheduler import go
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/distributed/cli/dask_scheduler.py", line 122, in <module>
2021-11-18 12:46:04@click.version_option()
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/decorators.py", line 247, in decorator
2021-11-18 12:46:04_param_memo(f, OptionClass(param_decls, **option_attrs))
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/core.py", line 2465, in __init__
2021-11-18 12:46:04super().__init__(param_decls, type=type, multiple=multiple, **attrs)
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/core.py", line 2101, in __init__
2021-11-18 12:46:04) from None
2021-11-18 12:46:04ValueError: 'default' must be a list when 'multiple' is true.
Again this seems related to versions of things. My dask configuration in the image is as follows:
Copy code
dask==2021.3.0
dask-cloudprovider==0.5.0
distributed==2021.3.0
Is this anything that you guys have seen before?
k
I have not seen this before. They look like Dask specific issues. Have not seen it though
119 Views