Has anyone run into the following issue trying to ...
# prefect-community
j
Has anyone run into the following issue trying to use the Dask Cloud Provider environment?
Copy code
Failed to load and execute Flow's environment: TypeError("object NoneType can't be used in 'await' expression")
I'm able to run the flow over a local cluster with two workers just fine, it's only when I register a flow using the Dask Cloud Provider environment that I have issues 🤔
j
Interesting! I don’t have much experience with this environment so before I go into debugging mode I’m going to gracefully @Joe Schmid to see if he has encountered this before 😄
j
Hi @Josh Lowe, I haven't seen that specific issue before but it sounds roughly like a configuration issue. One thing I recommend is to use Dask Cloud Provider directly (just from Python) to test it with the same parameters and make sure that's working. I've found that it's easier to debug issues that are related to Dask Cloud Provider (and independent of Prefect) interactively and then to use those same parameters with
DaskCloudProviderEnvironment
. If you can share a little more of your specific configuration and maybe the full stack trace of that error I'd be happy to try to help debug more.
j
Cool cool, thanks Joe! I'll have a play around with it directly and try to figure things out. Will let you know if I run into any issues!
j
@Josh Lowe sounds great, just follow instructions here: https://cloudprovider.dask.org/en/latest/
j
So I'm trying to run things manually - and I now have a slightly more informative stack trace
Copy code
File "dask_time.py", line 27, in <module>
    worker_mem=512
  File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 1108, in __init__
    super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 599, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 256, in __init__
    self.sync(self._start)
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 163, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 623, in _start
    await _cleanup_stale_resources()
  File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 1127, in _cleanup_stale_resources
    async with session.create_client("ecs") as ecs:
  File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 20, in __aenter__
    self._client = await self._coro
  File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 96, in _create_client
    credentials = await self.get_credentials()
  File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 121, in get_credentials
    'credential_provider').load_credentials())
  File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/credentials.py", line 787, in load_credentials
    creds = await provider.load()
TypeError: object NoneType can't be used in 'await' expression
distributed.deploy.spec - WARNING - Cluster closed without starting up
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 611, in close_clusters
    cluster.close(timeout=10)
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 84, in close
    return self.sync(self._close, callback_timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 163, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 391, in _close
    await self.scheduler.close()
AttributeError: 'FargateCluster' object has no attribute 'scheduler'
So this is just creating a cluster with FargateCluster(), and then running the flow with it as per the instructions. (I'm running this on an EC2 instance). My configuration for the cluster itself looks like: (edited)
Copy code
[10:08 AM] cluster = FargateCluster(                                                            
        image="prefecthq/prefect:latest",                                            
        execution_role_arn="arn:aws:iam::692350155389:role/ecsTaskExecutionRole",    
        task_role_arn="arn:aws:iam::692350155389:role/prefect-cloud-task-role-test", 
        n_workers=1,                                                                 
        scheduler_cpu=256,                                                           
        scheduler_mem=512,                                                           
        worker_cpu=256,                                                              
        worker_mem=512                                                               
)
j
@Josh Lowe ok, that is helpful. So obviously it's failing during cluster creation. How are you specifying AWS credentials, i.e. AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY? (Passing them explicitly, using an IAM instance role on the ec2 instance you're running on, etc.) I've also noticed that Dask Cloud Provider can be sensitive to the version of aiobotocore installed. Could you do a
pip list
in the environment where you're running
cluster = FargateCluster(...)
and paste the results here? You could also try
pip install -U aiobotocore
to upgrade to a newer version of that module.
j
Hey sorry for the late reply - so we ended up getting things running in the end, it turned out to be an issue with the version of
awscli
we had installed (We were running on an EC2 instance, and using the attached role to provide credentials). Thanks for your help!
j
@Josh Lowe Glad to hear it! If you have a chance, keep us posted on how it goes and whether you run into any trouble. I'm interested to hear how
DaskCloudProviderEnvironment
is working for other users.
j
Sweet! We're actually running into a few more issues, I just got into the office but I'll detail in a bit!
So we updated the AWS CLI version on the EC2 instance that runs one of our agents, and the first
await
error seems to have been fixed. But now we are running into this:
Copy code
Parameter validation failed: Missing required parameter in containerDefinitions[0].logConfiguration: "logDriver"
Which I imagine is an issue when trying to create the worker task definitions? I also can't seem to see anywhere to specify this configuration in the documentation
j
@Josh Lowe I encountered this the other day! They must have updated something in the Fargate API because I did not set logDriver prior. I ended up doing something like this:
Copy code
agent = FargateAgent(
    ...,
    containerDefinitions=[
        {
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-region": "us-east-1",
                    "awslogs-group": "mygroup",
                    "awslogs-stream-prefix": "prefect",
                },
            },
        }
    ],
    ...
)
I’m not an expert on AWS configuration though 😅
j
Aha! Yeah we currently have something similar ^ in place for our Fargate Environment, so just passing in those values and hoping for the best was going to be my next guess haha - thanks!!
Alright we're back haha - so the ^ above fixed our issues with the task definition, but now we're back to
Copy code
TypeError("object NoneType can't be used in 'await' expression")
• aiobotocore has been updated (Currently on
1.0.7
) • AWS CLI V2 has been installed on the instance where the agent runs • Appropriate permissions as outlined here (https://cloudprovider.dask.org/en/latest/index.html) have been attached to the instance where the agent runs So it seems like the first task that is spun up on our existing Fargate Cluster is having trouble spinning up another cluster/worker tasks?
j
So it seems like the first task that is spun up on our existing Fargate Cluster is having trouble spinning up another cluster/worker tasks?
@Josh Lowe I agree with that. The sequence of events goes something like this: 1. Prefect Fargate Agent is running constantly 2. Agent notices a Flow is ready to run so it registers an ECS task definition (if one doesn't already exist) and runs an ECS task for the Flow run to execute in. 3. The Flow run creates a Prefect Environment for the Flow to run in. If using RemoteEnvironment(), it gets created in the ECS task from step 2, but... if using
DaskCloudProviderEnvironment
then a lot more happens in the following steps 4. Dask Cloud Provider creates an ECS task definition and runs a task for the Dask scheduler, waiting for the scheduler to start so that it can provide the scheduler address to the workers 5. Dask Cloud Provider creates an ECS task definition and runs task(s) for Dask workers 6. Once all of the workers start, only then does the Prefect Flow run start to execute on that distributed Dask cluster There are a lot of moving parts and configuration required for all of that to happen successfully. Can you describe where you're seeing that
TypeError("object NoneType can't be used in 'await' expression")
error (Agent log, Flow run log, etc.) and provide the full stack track and surrounding log. I'm not clear on when that is happening. Also, does your Flow run successfully if you use RemoteEnvironment?
j
Cool cool, so we're seeing that error as part of the flow log (No other logs are available through the Prefect UI apart from.
Copy code
Creating Dask cluster using <class 'dask_cloudprovider.providers.aws.ecs.FargateCluster'>
We also get the same stack trace as I sent above in cloudwatch. If we keep an eye on our existing Fargate Cluster, the first ECS task spins up successfully, and once it shuts down the flow fails and we get that error. If we run things manually from the same EC2 instance that the agent runs on, (as outlined in the docs, calling
FargateCluster()
) directly, then everything works as expected. I haven't tried using remote environment yet, I'll try that and let you know
j
@Josh Lowe thanks for the update -- that makes sense. I think running flow(s) using RemoteEnvironment will be successful, which is great progress and will allow you to run many Flows on Fargate. For Flows that require extreme scalability with a truly distributed Dask cluster you might still want to work through getting DaskCloudProviderEnvironment working, but depending on your Flows maybe that's not as urgent. As a next step on DaskCloudProviderEnvironment, I'd recommend examining the configuration differences between your two environments -- the EC2 instance that works and the ECS task (that the Fargate Agent creates) that doesn't work. (It's great that you have an environment that works to compare against!) Some specific things to check: • AWS credentials (are they using the same IAM role or different?) • Python module versions • Environment variables • AWS region, VPC, & subnets