Josh Lowe
06/11/2020, 2:12 AMFailed to load and execute Flow's environment: TypeError("object NoneType can't be used in 'await' expression")
I'm able to run the flow over a local cluster with two workers just fine, it's only when I register a flow using the Dask Cloud Provider environment that I have issues 🤔josh
06/11/2020, 1:16 PMJoe Schmid
06/11/2020, 1:21 PMDaskCloudProviderEnvironment
. If you can share a little more of your specific configuration and maybe the full stack trace of that error I'd be happy to try to help debug more.Josh Lowe
06/11/2020, 8:30 PMJoe Schmid
06/11/2020, 9:00 PMJosh Lowe
06/11/2020, 10:22 PMFile "dask_time.py", line 27, in <module>
worker_mem=512
File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 1108, in __init__
super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 599, in __init__
super().__init__(**kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 256, in __init__
self.sync(self._start)
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 163, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/home/ec2-user/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 623, in _start
await _cleanup_stale_resources()
File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 1127, in _cleanup_stale_resources
async with session.create_client("ecs") as ecs:
File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 20, in __aenter__
self._client = await self._coro
File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 96, in _create_client
credentials = await self.get_credentials()
File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 121, in get_credentials
'credential_provider').load_credentials())
File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/credentials.py", line 787, in load_credentials
creds = await provider.load()
TypeError: object NoneType can't be used in 'await' expression
distributed.deploy.spec - WARNING - Cluster closed without starting up
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 611, in close_clusters
cluster.close(timeout=10)
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 84, in close
return self.sync(self._close, callback_timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 163, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/home/ec2-user/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 391, in _close
await self.scheduler.close()
AttributeError: 'FargateCluster' object has no attribute 'scheduler'
So this is just creating a cluster with FargateCluster(), and then running the flow with it as per the instructions. (I'm running this on an EC2 instance). My configuration for the cluster itself looks like:Â (edited)[10:08 AM] cluster = FargateCluster(
image="prefecthq/prefect:latest",
execution_role_arn="arn:aws:iam::692350155389:role/ecsTaskExecutionRole",
task_role_arn="arn:aws:iam::692350155389:role/prefect-cloud-task-role-test",
n_workers=1,
scheduler_cpu=256,
scheduler_mem=512,
worker_cpu=256,
worker_mem=512
)
Joe Schmid
06/12/2020, 3:21 PMpip list
in the environment where you're running cluster = FargateCluster(...)
and paste the results here? You could also try pip install -U aiobotocore
to upgrade to a newer version of that module.Josh Lowe
06/14/2020, 8:40 PMawscli
we had installed (We were running on an EC2 instance, and using the attached role to provide credentials). Thanks for your help!Joe Schmid
06/15/2020, 2:18 PMDaskCloudProviderEnvironment
is working for other users.Josh Lowe
06/15/2020, 8:41 PMawait
error seems to have been fixed. But now we are running into this:
Parameter validation failed: Missing required parameter in containerDefinitions[0].logConfiguration: "logDriver"
Which I imagine is an issue when trying to create the worker task definitions? I also can't seem to see anywhere to specify this configuration in the documentationjosh
06/15/2020, 8:52 PMagent = FargateAgent(
...,
containerDefinitions=[
{
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-region": "us-east-1",
"awslogs-group": "mygroup",
"awslogs-stream-prefix": "prefect",
},
},
}
],
...
)
I’m not an expert on AWS configuration though 😅Josh Lowe
06/15/2020, 8:54 PMTypeError("object NoneType can't be used in 'await' expression")
• aiobotocore has been updated (Currently on 1.0.7
)
• AWS CLI V2 has been installed on the instance where the agent runs
• Appropriate permissions as outlined here (https://cloudprovider.dask.org/en/latest/index.html) have been attached to the instance where the agent runs
So it seems like the first task that is spun up on our existing Fargate Cluster is having trouble spinning up another cluster/worker tasks?Joe Schmid
06/16/2020, 3:39 PMSo it seems like the first task that is spun up on our existing Fargate Cluster is having trouble spinning up another cluster/worker tasks?@Josh Lowe I agree with that. The sequence of events goes something like this: 1. Prefect Fargate Agent is running constantly 2. Agent notices a Flow is ready to run so it registers an ECS task definition (if one doesn't already exist) and runs an ECS task for the Flow run to execute in. 3. The Flow run creates a Prefect Environment for the Flow to run in. If using RemoteEnvironment(), it gets created in the ECS task from step 2, but... if using
DaskCloudProviderEnvironment
then a lot more happens in the following steps
4. Dask Cloud Provider creates an ECS task definition and runs a task for the Dask scheduler, waiting for the scheduler to start so that it can provide the scheduler address to the workers
5. Dask Cloud Provider creates an ECS task definition and runs task(s) for Dask workers
6. Once all of the workers start, only then does the Prefect Flow run start to execute on that distributed Dask cluster
There are a lot of moving parts and configuration required for all of that to happen successfully.
Can you describe where you're seeing that TypeError("object NoneType can't be used in 'await' expression")
error (Agent log, Flow run log, etc.) and provide the full stack track and surrounding log. I'm not clear on when that is happening. Also, does your Flow run successfully if you use RemoteEnvironment?Josh Lowe
06/16/2020, 8:36 PMCreating Dask cluster using <class 'dask_cloudprovider.providers.aws.ecs.FargateCluster'>
We also get the same stack trace as I sent above in cloudwatch.
If we keep an eye on our existing Fargate Cluster, the first ECS task spins up successfully, and once it shuts down the flow fails and we get that error.
If we run things manually from the same EC2 instance that the agent runs on, (as outlined in the docs, calling FargateCluster()
) directly, then everything works as expected.
I haven't tried using remote environment yet, I'll try that and let you knowJoe Schmid
06/17/2020, 2:52 PM