Chris Leber
10/04/2021, 9:30 PMfile_list = ['./file_1.txt', './file_2.txt', './file_3.txt', './file_4.txt', './file_5.txt', './file_6.txt', './file_7.txt', ..., './file_100.txt']
@task
def test_task(input_file):
result = some_function(input_file)
return result
def ecs_cluster(n_workers=4):
"""Start a ecs cluster using the same image as the flow run"""
return ECSCluster(
n_workers=n_workers, image=prefect.context.image, region_name="us-east-1"
)
with Flow(
"test flow",
storage=S3(bucket="storage"),
run_config=ECSRun(task_definition=task_definition),
) as flow:
test_task.map(input_file=file_list)
flow.executor = DaskExecutor(
cluster_class=ecs_cluster, cluster_kwargs={"n_workers": 10}
)
Kevin Kho
Chris Leber
10/04/2021, 10:53 PMChris Leber
10/04/2021, 10:54 PMUnexpected error: NoRegionError('You must specify a region.')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 223, in start
with self.cluster_class(**self.cluster_kwargs) as cluster:
File "prefect/test_flows.py", line 276, in ecs_cluster
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 735, in __init__
super().__init__(**kwargs)
File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 283, in __init__
self.sync(self._start)
File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 214, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
result[0] = yield future
File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 761, in _start
await _cleanup_stale_resources()
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 1396, in _cleanup_stale_resources
async with session.create_client("ecs") as ecs:
File "/usr/local/lib/python3.8/site-packages/aiobotocore/session.py", line 37, in __aenter__
self._client = await self._coro
File "/usr/local/lib/python3.8/site-packages/aiobotocore/session.py", line 121, in _create_client
client = await client_creator.create_client(
File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 32, in create_client
client_args = self._get_client_args(
File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 88, in _get_client_args
return args_creator.get_client_args(
File "/usr/local/lib/python3.8/site-packages/aiobotocore/args.py", line 18, in get_client_args
final_args = self.compute_client_args(
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 148, in compute_client_args
endpoint_config = self._compute_endpoint_config(
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 220, in _compute_endpoint_config
return self._resolve_endpoint(**resolve_endpoint_kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 302, in _resolve_endpoint
return endpoint_bridge.resolve(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 430, in resolve
resolved = self.endpoint_resolver.construct_endpoint(
File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 133, in construct_endpoint
result = self._endpoint_for_partition(
File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 148, in _endpoint_for_partition
raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.
Chris Leber
10/04/2021, 11:00 PMKevin Kho
ECSRun
? or you can try passing env variables to the ECSRun or Agent for authentication of boto3?Kevin Kho
from dask_cloudprovider.aws import ECSCluster
cluster = ECSCluster(
cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
worker_gpu=1)
and I guess the region gets sent this way?Kevin Kho
Chris Leber
10/04/2021, 11:15 PMKevin Kho
Kevin Kho
Anna Geller
import boto3
boto3.setup_default_session(region_name="us-east-1")
Chris Leber
10/05/2021, 11:29 PMChris Leber
10/05/2021, 11:31 PMKevin Kho
env
argument so you can do:
from dask_cloudprovider.aws import ECSCluster
cluster = ECSCluster(
cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
env={"AWS_REGION": "us-east-1"}
worker_gpu=1)
So that the cluster gets the environment variable?