Josh Lowe

    Josh Lowe

    2 years ago
    Has anyone run into the following issue trying to use the Dask Cloud Provider environment?
    Failed to load and execute Flow's environment: TypeError("object NoneType can't be used in 'await' expression")
    I'm able to run the flow over a local cluster with two workers just fine, it's only when I register a flow using the Dask Cloud Provider environment that I have issues 🤔
    j

    josh

    2 years ago
    Interesting! I don’t have much experience with this environment so before I go into debugging mode I’m going to gracefully @Joe Schmid to see if he has encountered this before 😄
    j

    Joe Schmid

    2 years ago
    Hi @Josh Lowe, I haven't seen that specific issue before but it sounds roughly like a configuration issue. One thing I recommend is to use Dask Cloud Provider directly (just from Python) to test it with the same parameters and make sure that's working. I've found that it's easier to debug issues that are related to Dask Cloud Provider (and independent of Prefect) interactively and then to use those same parameters with
    DaskCloudProviderEnvironment
    . If you can share a little more of your specific configuration and maybe the full stack trace of that error I'd be happy to try to help debug more.
    Josh Lowe

    Josh Lowe

    2 years ago
    Cool cool, thanks Joe! I'll have a play around with it directly and try to figure things out. Will let you know if I run into any issues!
    j

    Joe Schmid

    2 years ago
    @Josh Lowe sounds great, just follow instructions here: https://cloudprovider.dask.org/en/latest/
    Josh Lowe

    Josh Lowe

    2 years ago
    So I'm trying to run things manually - and I now have a slightly more informative stack trace
    File "dask_time.py", line 27, in <module>
        worker_mem=512
      File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 1108, in __init__
        super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 599, in __init__
        super().__init__(**kwargs)
      File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 256, in __init__
        self.sync(self._start)
      File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 163, in sync
        return sync(self.loop, func, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
        result[0] = yield future
      File "/home/ec2-user/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
        value = future.result()
      File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 623, in _start
        await _cleanup_stale_resources()
      File "/usr/local/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 1127, in _cleanup_stale_resources
        async with session.create_client("ecs") as ecs:
      File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 20, in __aenter__
        self._client = await self._coro
      File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 96, in _create_client
        credentials = await self.get_credentials()
      File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/session.py", line 121, in get_credentials
        'credential_provider').load_credentials())
      File "/home/ec2-user/.local/lib/python3.7/site-packages/aiobotocore/credentials.py", line 787, in load_credentials
        creds = await provider.load()
    TypeError: object NoneType can't be used in 'await' expression
    distributed.deploy.spec - WARNING - Cluster closed without starting up
    Error in atexit._run_exitfuncs:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 611, in close_clusters
        cluster.close(timeout=10)
      File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 84, in close
        return self.sync(self._close, callback_timeout=timeout)
      File "/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 163, in sync
        return sync(self.loop, func, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
        result[0] = yield future
      File "/home/ec2-user/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
        value = future.result()
      File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 391, in _close
        await self.scheduler.close()
    AttributeError: 'FargateCluster' object has no attribute 'scheduler'
    So this is just creating a cluster with FargateCluster(), and then running the flow with it as per the instructions. (I'm running this on an EC2 instance). My configuration for the cluster itself looks like: (edited) 
    [10:08 AM] cluster = FargateCluster(                                                            
            image="prefecthq/prefect:latest",                                            
            execution_role_arn="arn:aws:iam::692350155389:role/ecsTaskExecutionRole",    
            task_role_arn="arn:aws:iam::692350155389:role/prefect-cloud-task-role-test", 
            n_workers=1,                                                                 
            scheduler_cpu=256,                                                           
            scheduler_mem=512,                                                           
            worker_cpu=256,                                                              
            worker_mem=512                                                               
    )
    j

    Joe Schmid

    2 years ago
    @Josh Lowe ok, that is helpful. So obviously it's failing during cluster creation. How are you specifying AWS credentials, i.e. AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY? (Passing them explicitly, using an IAM instance role on the ec2 instance you're running on, etc.) I've also noticed that Dask Cloud Provider can be sensitive to the version of aiobotocore installed. Could you do a
    pip list
    in the environment where you're running
    cluster = FargateCluster(...)
    and paste the results here? You could also try
    pip install -U aiobotocore
    to upgrade to a newer version of that module.
    Josh Lowe

    Josh Lowe

    2 years ago
    Hey sorry for the late reply - so we ended up getting things running in the end, it turned out to be an issue with the version of
    awscli
    we had installed (We were running on an EC2 instance, and using the attached role to provide credentials). Thanks for your help!
    j

    Joe Schmid

    2 years ago
    @Josh Lowe Glad to hear it! If you have a chance, keep us posted on how it goes and whether you run into any trouble. I'm interested to hear how
    DaskCloudProviderEnvironment
    is working for other users.
    Josh Lowe

    Josh Lowe

    2 years ago
    Sweet! We're actually running into a few more issues, I just got into the office but I'll detail in a bit!
    So we updated the AWS CLI version on the EC2 instance that runs one of our agents, and the first
    await
    error seems to have been fixed. But now we are running into this:
    Parameter validation failed: Missing required parameter in containerDefinitions[0].logConfiguration: "logDriver"
    Which I imagine is an issue when trying to create the worker task definitions? I also can't seem to see anywhere to specify this configuration in the documentation
    j

    josh

    2 years ago
    @Josh Lowe I encountered this the other day! They must have updated something in the Fargate API because I did not set logDriver prior. I ended up doing something like this:
    agent = FargateAgent(
        ...,
        containerDefinitions=[
            {
                "logConfiguration": {
                    "logDriver": "awslogs",
                    "options": {
                        "awslogs-region": "us-east-1",
                        "awslogs-group": "mygroup",
                        "awslogs-stream-prefix": "prefect",
                    },
                },
            }
        ],
        ...
    )
    I’m not an expert on AWS configuration though 😅
    Josh Lowe

    Josh Lowe

    2 years ago
    Aha! Yeah we currently have something similar ^ in place for our Fargate Environment, so just passing in those values and hoping for the best was going to be my next guess haha - thanks!!
    Alright we're back haha - so the ^ above fixed our issues with the task definition, but now we're back to
    TypeError("object NoneType can't be used in 'await' expression")
    • aiobotocore has been updated (Currently on
    1.0.7
    ) • AWS CLI V2 has been installed on the instance where the agent runs • Appropriate permissions as outlined here (https://cloudprovider.dask.org/en/latest/index.html) have been attached to the instance where the agent runs So it seems like the first task that is spun up on our existing Fargate Cluster is having trouble spinning up another cluster/worker tasks?
    j

    Joe Schmid

    2 years ago
    So it seems like the first task that is spun up on our existing Fargate Cluster is having trouble spinning up another cluster/worker tasks?
    @Josh Lowe I agree with that. The sequence of events goes something like this:1. Prefect Fargate Agent is running constantly 2. Agent notices a Flow is ready to run so it registers an ECS task definition (if one doesn't already exist) and runs an ECS task for the Flow run to execute in. 3. The Flow run creates a Prefect Environment for the Flow to run in. If using RemoteEnvironment(), it gets created in the ECS task from step 2, but... if using
    DaskCloudProviderEnvironment
    then a lot more happens in the following steps 4. Dask Cloud Provider creates an ECS task definition and runs a task for the Dask scheduler, waiting for the scheduler to start so that it can provide the scheduler address to the workers 5. Dask Cloud Provider creates an ECS task definition and runs task(s) for Dask workers 6. Once all of the workers start, only then does the Prefect Flow run start to execute on that distributed Dask cluster There are a lot of moving parts and configuration required for all of that to happen successfully. Can you describe where you're seeing that
    TypeError("object NoneType can't be used in 'await' expression")
    error (Agent log, Flow run log, etc.) and provide the full stack track and surrounding log. I'm not clear on when that is happening. Also, does your Flow run successfully if you use RemoteEnvironment?
    Josh Lowe

    Josh Lowe

    2 years ago
    Cool cool, so we're seeing that error as part of the flow log (No other logs are available through the Prefect UI apart from.
    Creating Dask cluster using <class 'dask_cloudprovider.providers.aws.ecs.FargateCluster'>
    We also get the same stack trace as I sent above in cloudwatch. If we keep an eye on our existing Fargate Cluster, the first ECS task spins up successfully, and once it shuts down the flow fails and we get that error. If we run things manually from the same EC2 instance that the agent runs on, (as outlined in the docs, calling
    FargateCluster()
    ) directly, then everything works as expected. I haven't tried using remote environment yet, I'll try that and let you know
    j

    Joe Schmid

    2 years ago
    @Josh Lowe thanks for the update -- that makes sense. I think running flow(s) using RemoteEnvironment will be successful, which is great progress and will allow you to run many Flows on Fargate. For Flows that require extreme scalability with a truly distributed Dask cluster you might still want to work through getting DaskCloudProviderEnvironment working, but depending on your Flows maybe that's not as urgent. As a next step on DaskCloudProviderEnvironment, I'd recommend examining the configuration differences between your two environments -- the EC2 instance that works and the ECS task (that the Fargate Agent creates) that doesn't work. (It's great that you have an environment that works to compare against!) Some specific things to check: • AWS credentials (are they using the same IAM role or different?) • Python module versions • Environment variables • AWS region, VPC, & subnets