hey everyone - I’m trying to use the new `DaskClou...
# prefect-community
j
hey everyone - I’m trying to use the new
DaskCloudProviderEnvironment
and I keep getting this error
Copy code
botocore.exceptions.NoRegionError: You must specify a region.
distributed.deploy.spec - WARNING - Cluster closed without starting up
any ideas?
l
Hi @John Ramirez! From the looks of it to me you could pass a region name as
region_name
kwarg to the
DaskCloudProviderEnvironemnt
(which should get passed all the way down to where I think this is probably blowing up for you, at https://github.com/dask/dask-cloudprovider/blob/master/dask_cloudprovider/providers/aws/ecs.py#L652) I think if you use another AWS configuring tool, like
aws configure
from
awscli
or writing up your own boto config, my guess is that boto should pull the default region from that for you instead
j
im already passing the
region_name
as an argument
j
Hi @John Ramirez, are you able to use Dask Cloud Provider directly? (i.e. does the example code in this section work for you: https://docs.prefect.io/orchestration/execution/dask_cloud_provider_environment.html#requirements)
For us, we never pass aws creds/region directly as kwargs but either use IAM roles or pick up config from env vars, e.g. in CI/CD.
j
so the issue was the default region in the aws config file
i did not realize it was not populated
j
Gotcha, good catch. Is it working now?
(Also, which Prefect agent type are you using? With the Fargate Agent, you might also need to make sure any relevant environment variables are passed along to the Fargate task for the Flow run. It can get complicated b/c there can be 4 Fargate task definitions: Prefect Fargate Agent, Prefect Flow run, Dask Scheduler, Dask Worker)
j
production in an k8s agent but right now im running my flow locally. I’m able to get the cluster up but I get this error now:
Copy code
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 73, in _close
    await self._watch_worker_status_comm.close()
AttributeError: 'FargateCluster' object has no attribute '_watch_worker_status_comm'
this is using the example from the prefect documents
l
hmm looks like that is somewhere in the
dask.distributed
code, I’ll poke around but any chance you are trying to shutdown particularly quickly after starting, it looks like that attribute itself is `await`ed during cluster start
j
@John Ramirez guessing a bit, but maybe an older version of
distributed
package? If it's easy, post the code you're using to create the cluster (removing any credential or sensitive info) and at what step in the process you see that error. I'm interested in helping to debug this!
Seems like you're making progress - mainly want to get a Dask cluster up and running successfully using Dask Cloud Provider directly and then use the same kwargs/config with
DaskCloudProviderEnvironment
.
j
Here is the code
Copy code
from dask_cloudprovider import FargateCluster

cluster = FargateCluster(
    image="daskdev/dask",
    task_role_arn="arn:aws:iam:::role/ecsTaskExecutionRole",
    execution_role_arn="arn:aws:iam:::role/ecsTaskExecutionRole",
    # n_workers=1,
    # scheduler_cpu=256,
    # scheduler_mem=512,
    # worker_cpu=256,
    # worker_mem=512,
    # scheduler_timeout="15 minutes",
)
j
@John Ramirez Thanks for the code. Could you do `pip list`and paste the output here to look at python module versions? (Once you get farther along you’ll want to switch the image to a Prefect one like
prefecthq/prefect:latest
but the dask one should be fine for testing. (If you’d prefer we could debug this via DMs and then update the community channel with the results.)
j
Copy code
aiobotocore==1.0.4
aiohttp==3.6.2
aioitertools==0.7.0
async-timeout==3.0.1
attrs==19.3.0
botocore==1.15.32
certifi==2020.4.5.1
chardet==3.0.4
click==7.1.2
cloudpickle==1.3.0
croniter==0.3.31
dask==2.15.0
dask-cloudprovider==0.2.0
distributed==2.15.2
docker==4.2.0
docutils==0.15.2
fsspec==0.7.3
HeapDict==1.0.1
idna==2.9
jmespath==0.9.5
locket==0.2.0
lz4==3.0.2
marshmallow==3.5.1
marshmallow-oneofschema==2.0.1
msgpack==1.0.0
multidict==4.7.5
mypy-extensions==0.4.3
numpy==1.18.2
pandas==1.0.3
pandas-market-calendars==1.2
partd==1.1.0
pendulum==2.0.5
prefect==0.10.7
psutil==5.7.0
pyarrow==0.17.0
python-box==4.2.3
python-dateutil==2.8.1
python-slugify==4.0.0
pytz==2019.3
pytzdata==2019.3
PyYAML==5.3.1
requests==2.23.0
ruamel.yaml==0.16.10
ruamel.yaml.clib==0.2.0
s3fs==0.4.2
six==1.14.0
sortedcontainers==2.1.0
tabulate==0.8.7
tblib==1.6.0
text-unidecode==1.3
toml==0.10.0
toolz==0.10.0
tornado==6.0.4
typing-extensions==3.7.4.2
urllib3==1.25.9
websocket-client==0.57.0
wrapt==1.12.1
yamlordereddictloader==0.4.0
yarl==1.4.2
zict==2.0.0
j
@John Ramirez That looks good. I was able to create a fresh virtualenv and successfully create a Dask cluster using Dask Cloud Provide. Sent you details in DM.
j
i saw im going to try to rebuild the venv and see it that fixes the problem
👍 1
I am still getting the same error
Copy code
RuntimeError: Scheduler failed to start
distributed.deploy.spec - WARNING - Cluster closed without starting up
Error in atexit._run_exitfuncs:
did you copy the role permissions from the dask-cloudprovider docs
j
@John Ramirez We're using custom roles. I know @Laura Lorenz (she/her) mentioned that she ended up needing to use an IAM user with admin permissions to get cluster creation to work, i.e. those particular permissions didn't seem to be sufficient. If you're able to access the ECS web console, take a look at tasks in the dask-xyz cluster and see if you can find error information.
l
Sorry to resurrect this old thread. Just curious, were you all able to get Dask Cloud Provider Environment working with Fargate tasks running in a vpc?
j
Hi @Luke Orland, I imagine that question is for @John Ramirez but I can chime in, too. We've been running flows that use DaskCloudProviderEnvironment successfully for many months now. Happy to answer any questions about our setup or config. (We do run this infrastructure in a dedicated VPC and in private subnets.)
l
Thanks, @Joe Schmid ! I'll formulate some questions and get back to you.
👍 1
j
I also should have clarified: DaskCloudProviderEnvironment runs ECS tasks for the Dask scheduler and workers using either Fargate or EC2 launch types. (Our utility/DE flows tend to use Fargate and our data science flows tend to use EC2 launch type since they may need GPUs or RAM > 30GB, which Fargate doesn't support.)