Hello, I am currently trying to use a DaskExecutor in an ECSRun to parallelize a flow. I’m following...
f

Frank Oplinger

almost 4 years ago
Hello, I am currently trying to use a DaskExecutor in an ECSRun to parallelize a flow. I’m following the documentation to create a temporary cluster with a specified worker image. My flow currently looks something like this:
def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)

class LeoFlow(PrefectFlow):

    def generate_flow(self):
        with Flow(name=self.name, storage=S3(bucket="raptormaps-prefect-flows")) as flow:
            ...
        flow.executor = DaskExecutor(
            cluster_class=fargate_cluster,
            cluster_kwargs={"n_workers": 4}
        )
        return flow
In the dockerfile for the image that I’m specifying in the ECSRun, I have included the following line to install dask-cloudprovider:
RUN pip install dask-cloudprovider[aws]
However, when I execute the flow, I am hitting the following error:
Unexpected error: AttributeError("module 'aiobotocore' has no attribute 'get_session'",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:
  File "/rprefect/leo_flow.py", line 58, in fargate_cluster
    return FargateCluster(n_workers=n_workers, image=prefect.context.image)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 726, in __init__
    self.session = aiobotocore.get_session()
AttributeError: module 'aiobotocore' has no attribute 'get_session'
Is there a specific version of dask_cloudprovider that Prefect requires?