Frank Oplinger
11/17/2021, 7:02 PMdef fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)
class LeoFlow(PrefectFlow):
    def generate_flow(self):
        with Flow(name=self.name, storage=S3(bucket="raptormaps-prefect-flows")) as flow:
            ...
        flow.executor = DaskExecutor(
            cluster_class=fargate_cluster,
            cluster_kwargs={"n_workers": 4}
        )
        return flowRUN pip install dask-cloudprovider[aws]Unexpected error: AttributeError("module 'aiobotocore' has no attribute 'get_session'",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 58, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 726, in __init__
    self.session = aiobotocore.get_session()
AttributeError: module 'aiobotocore' has no attribute 'get_session'Kevin Kho
aiobotocore==1.4.0Frank Oplinger
11/17/2021, 7:08 PMFrank Oplinger
11/17/2021, 7:41 PMUnexpected error: ClientError('An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 58, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 727, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 281, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 189, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
    result[0] = yield future
  File "/usr/local/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 753, in _start
    await _cleanup_stale_resources()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1414, in _cleanup_stale_resources
    taskDefinition=task_definition_arn
  File "/usr/local/lib/python3.6/site-packages/aiobotocore/client.py", line 155, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceededaiobotocore==1.3.3
boto3==1.17.55
botocore==1.20.106Kevin Kho
"dask_cloudprovider[aws] >= 0.2.0"Kevin Kho
Frank Oplinger
11/17/2021, 8:11 PMFrank Oplinger
11/17/2021, 8:12 PMKevin Kho
Kevin Kho
Anna Geller
Frank Oplinger
11/17/2021, 8:16 PMFrank Oplinger
11/17/2021, 8:55 PMbotocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceededAnna Geller
skip_cleanup=Truedef fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)Anna Geller
Frank Oplinger
11/17/2021, 9:09 PMFrank Oplinger
11/17/2021, 10:17 PMbotocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the CreateRole operation: User: arn:aws:iam::[user]:user/perfect_io is not authorized to perform: iam:CreateRole on resource: arn:aws:iam::[user]:role/dask-e4022a2e-6-execution-roleKevin Kho
dask_cloudproviderAnna Geller
{
  "Statement": [
    {
      "Action": [
        "ec2:AuthorizeSecurityGroupIngress",
        "ec2:CreateSecurityGroup",
        "ec2:CreateTags",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DescribeSecurityGroups",
        "ec2:DescribeSubnets",
        "ec2:DescribeVpcs",
        "ec2:DeleteSecurityGroup",
        "ecs:CreateCluster",
        "ecs:DescribeTasks",
        "ecs:ListAccountSettings",
        "ecs:RegisterTaskDefinition",
        "ecs:RunTask",
        "ecs:StopTask",
        "ecs:ListClusters",
        "ecs:DescribeClusters",
        "ecs:DeleteCluster",
        "ecs:ListTaskDefinitions",
        "ecs:DescribeTaskDefinition",
        "ecs:DeregisterTaskDefinition",
        "iam:AttachRolePolicy",
        "iam:CreateRole",
        "iam:TagRole",
        "iam:PassRole",
        "iam:DeleteRole",
        "iam:ListRoles",
        "iam:ListRoleTags",
        "iam:ListAttachedRolePolicies",
        "iam:DetachRolePolicy",
        "logs:DescribeLogGroups",
        "logs:GetLogEvents",
        "logs:CreateLogGroup",
        "logs:PutRetentionPolicy"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ]
    }
  ],
  "Version": "2012-10-17"
}Frank Oplinger
11/17/2021, 10:33 PMFrank Oplinger
11/18/2021, 6:31 PMUnexpected error: RuntimeError('Scheduler exited unexpectedly!',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 63, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 727, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 281, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/cluster.py", line 189, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
    result[0] = yield future
  File "/usr/local/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 924, in _start
    await super()._start()
  File "/usr/local/lib/python3.6/site-packages/distributed/deploy/spec.py", line 309, in _start
    self.scheduler = await self.scheduler
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 162, in _
    await self.start()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 303, in start
    await self._set_address_from_logs()
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 210, in _set_address_from_logs
    raise RuntimeError("%s exited unexpectedly!" % type(self).__name__)
RuntimeError: Scheduler exited unexpectedly!2021-11-18 12:46:04Traceback (most recent call last):
2021-11-18 12:46:04File "/usr/local/bin/dask-scheduler", line 5, in <module>
2021-11-18 12:46:04from distributed.cli.dask_scheduler import go
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/distributed/cli/dask_scheduler.py", line 122, in <module>
2021-11-18 12:46:04@click.version_option()
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/decorators.py", line 247, in decorator
2021-11-18 12:46:04_param_memo(f, OptionClass(param_decls, **option_attrs))
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/core.py", line 2465, in __init__
2021-11-18 12:46:04super().__init__(param_decls, type=type, multiple=multiple, **attrs)
2021-11-18 12:46:04File "/usr/local/lib/python3.6/site-packages/click/core.py", line 2101, in __init__
2021-11-18 12:46:04) from None
2021-11-18 12:46:04ValueError: 'default' must be a list when 'multiple' is true.dask==2021.3.0
dask-cloudprovider==0.5.0
distributed==2021.3.0Kevin Kho
