Frank Oplinger
11/17/2021, 7:02 PMdef fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(n_workers=n_workers, image=prefect.context.image)
class LeoFlow(PrefectFlow):
def generate_flow(self):
with Flow(name=self.name, storage=S3(bucket="raptormaps-prefect-flows")) as flow:
...
flow.executor = DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={"n_workers": 4}
)
return flow
In the dockerfile for the image that I’m specifying in the ECSRun, I have included the following line to install dask-cloudprovider:
RUN pip install dask-cloudprovider[aws]
However, when I execute the flow, I am hitting the following error:
Unexpected error: AttributeError("module 'aiobotocore' has no attribute 'get_session'",)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
with self.cluster_class(**self.cluster_kwargs) as cluster:
File "/rprefect/leo_flow.py", line 58, in fargate_cluster
return FargateCluster(n_workers=n_workers, image=prefect.context.image)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 726, in __init__
self.session = aiobotocore.get_session()
AttributeError: module 'aiobotocore' has no attribute 'get_session'
Is there a specific version of dask_cloudprovider that Prefect requires?Kevin Kho
aiobotocore==1.4.0
caused a few issues for Dask when it was released so it was advised to pin to 1.3.3. What version are you on?Frank Oplinger
11/17/2021, 7:08 PMFrank Oplinger
11/17/2021, 7:41 PMUnexpected error: ClientError('An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
with self.cluster_class(**self.cluster_kwargs) as cluster:
File "/rprefect/leo_flow.py", line 58, in fargate_cluster
return FargateCluster(n_workers=n_workers, image=prefect.context.image)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 727, in __init__
super().__init__(**kwargs)
File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 281, in __init__
self.sync(self._start)
File "/usr/local/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 189, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
result[0] = yield future
File "/usr/local/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 753, in _start
await _cleanup_stale_resources()
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1414, in _cleanup_stale_resources
taskDefinition=task_definition_arn
File "/usr/local/lib/python3.6/site-packages/aiobotocore/client.py", line 155, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded
Some quick googling says this is related to my boto3 version. Currently my requirements look as follows:
aiobotocore==1.3.3
boto3==1.17.55
botocore==1.20.106
Kevin Kho
"dask_cloudprovider[aws] >= 0.2.0"
. Unrelated though, but I think support for 3.6 will be dropped in a few months. I think the general rule here is that Prefect just spins up a cluster with your versions so as long as you are able to spin up a cluster without Prefect and it’s stable, that should work for PrefectKevin Kho
Frank Oplinger
11/17/2021, 8:11 PMFrank Oplinger
11/17/2021, 8:12 PMKevin Kho
Kevin Kho
Anna Geller
Frank Oplinger
11/17/2021, 8:16 PMFrank Oplinger
11/17/2021, 8:55 PMbotocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded
I’m setting those env vars in the same way described in the linked docs. Do you guys have any other suggestions for me to try?Anna Geller
skip_cleanup=True
to your FargateCluster? just to eliminate this deregistering of task definition
def fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
Anna Geller
Frank Oplinger
11/17/2021, 9:09 PMFrank Oplinger
11/17/2021, 10:17 PMbotocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the CreateRole operation: User: arn:aws:iam::[user]:user/perfect_io is not authorized to perform: iam:CreateRole on resource: arn:aws:iam::[user]:role/dask-e4022a2e-6-execution-role
Is it typical for the agent to require the administrator role in aws? I feel like that’s indicating to me that something still isn’t rightKevin Kho
dask_cloudprovider
uses it to create roles and pass them to workers so they have access to other AWS Resources but not 100% sure. You can check the link though and verify it is neededAnna Geller
{
"Statement": [
{
"Action": [
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"ec2:DeleteSecurityGroup",
"ecs:CreateCluster",
"ecs:DescribeTasks",
"ecs:ListAccountSettings",
"ecs:RegisterTaskDefinition",
"ecs:RunTask",
"ecs:StopTask",
"ecs:ListClusters",
"ecs:DescribeClusters",
"ecs:DeleteCluster",
"ecs:ListTaskDefinitions",
"ecs:DescribeTaskDefinition",
"ecs:DeregisterTaskDefinition",
"iam:AttachRolePolicy",
"iam:CreateRole",
"iam:TagRole",
"iam:PassRole",
"iam:DeleteRole",
"iam:ListRoles",
"iam:ListRoleTags",
"iam:ListAttachedRolePolicies",
"iam:DetachRolePolicy",
"logs:DescribeLogGroups",
"logs:GetLogEvents",
"logs:CreateLogGroup",
"logs:PutRetentionPolicy"
],
"Effect": "Allow",
"Resource": [
"*"
]
}
],
"Version": "2012-10-17"
}
Frank Oplinger
11/17/2021, 10:33 PMFrank Oplinger
11/18/2021, 6:31 PMUnexpected error: RuntimeError('Scheduler exited unexpectedly!',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
with self.cluster_class(**self.cluster_kwargs) as cluster:
File "/rprefect/leo_flow.py", line 63, in fargate_cluster
return FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 727, in __init__
super().__init__(**kwargs)
File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 281, in __init__
self.sync(self._start)
File "/usr/local/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 189, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
result[0] = yield future
File "/usr/local/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 924, in _start
await super()._start()
File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 309, in _start
self.scheduler = await self.scheduler
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 162, in _
await self.start()
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 303, in start
await self._set_address_from_logs()
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 210, in _set_address_from_logs
raise RuntimeError("%s exited unexpectedly!" % type(self).__name__)
RuntimeError: Scheduler exited unexpectedly!
I can see the tasks themselves being launched in the temporary cluster that dask creates and am able to get logs off them here:
2021-11-18 12:46:04Traceback (most recent call last):
2021-11-18 12:46:04File "/usr/local/bin/dask-scheduler", line 5, in <module>
2021-11-18 12:46:04from distributed.cli.dask_scheduler import go
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/distributed/cli/dask_scheduler.py", line 122, in <module>
2021-11-18 12:46:04@click.version_option()
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/decorators.py", line 247, in decorator
2021-11-18 12:46:04_param_memo(f, OptionClass(param_decls, **option_attrs))
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/core.py", line 2465, in __init__
2021-11-18 12:46:04super().__init__(param_decls, type=type, multiple=multiple, **attrs)
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/core.py", line 2101, in __init__
2021-11-18 12:46:04) from None
2021-11-18 12:46:04ValueError: 'default' must be a list when 'multiple' is true.
Again this seems related to versions of things. My dask configuration in the image is as follows:
dask==2021.3.0
dask-cloudprovider==0.5.0
distributed==2021.3.0
Is this anything that you guys have seen before?Kevin Kho