I'm trying to get a DaskExecutor on ECS/Fargate to...
# prefect-community
m
I'm trying to get a DaskExecutor on ECS/Fargate to work. And having issues. The flow hangs on
Creating a new Dask cluster with None.fargate_cluster...
. I'm running on prefect==0.15.6. My code is attached. The trouble is that it just hangs and I have no clue what it's "trying" to do.
Copy code
def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster( 
                    n_workers=n_workers, 
                    image=prefect.context.image, 
                    cluster_arn= "arn:aws:ecs:us-west-2:524279393077:cluster/staging-prefect-cluster"
                    )


with Flow(
    flow_name,
    **get_configs(
        file_path=os.path.abspath(sys.argv[0]), 
        flow_name=flow_name, 
        schedule=sched, 
        executor = DaskExecutor(
                cluster_class=fargate_cluster,
                cluster_kwargs={"n_workers": 4}
            ),
        run_config=ECSRun(
            env={
                "PREFECT__LOGGING__LEVEL": "DEBUG",
                "PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3', 'paramiko', 'awswrangler', 'pandas']",
            },
            cpu="4 vcpu",
            memory="30 GB",
            labels=['v2']
        ),
    )
) as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)
k
What is
prefect.context.image
? How does it get supplied?
k
Wow that’s news to me
m
🤷 Let me see what's actually there.
k
Well you can do
prefecthq/prefect
for a quick test since that is public
m
Ya, the value is
<http://xxx.dkr.ecr.us-west-2.amazonaws.com/staging-prefect-example-dask-cluster:2022-04-27t13-58-56-475868-00-00|xxx.dkr.ecr.us-west-2.amazonaws.com/staging-prefect-example-dask-cluster:2022-04-27t13-58-56-475868-00-00>
k
So I don’t think we have any insight unless we get logs from Fargate which we need to configure. So
FargateCluster
is a subclass of
ECSCluster
which takes in a parameter _*`cloudwatch_logs_group`*_ here I think we need to try supplying a logs group so we can get some error tracking in CloudWatch
m
I alerted the
fargate_cluster
function to this:
Copy code
def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    
    return FargateCluster( 
                    n_workers=n_workers, 
                    image="prefecthq/prefect",  #prefect.context.image, 
                    cluster_arn= "arn:aws:ecs:us-west-2:xxx:cluster/staging-prefect-cluster",
                    cloudwatch_logs_group="/fargate/service/staging-prefect-agent"
                    )
That specified log group already exists (happens to be the one that the agent writes to). .... And no logs for the flow show up
k
Can you try passing it through
cluster_kwargs
instead? No idea if it’ll make a difference
m
It didnt like your idea:
Copy code
State Message:
Unexpected error: TypeError("fargate_cluster() got an unexpected keyword argument 'cloudwatch_logs_group'")
k
Ah your function needs to accept it, but never mind that just kinda indicates it goes to the same place
I think if I had to diagnose, I’d try just spinning up the Fargate cluster directly and seeing if it works?
Copy code
from dask_cloudprovider.aws import FargateCluster
cluster = FargateCluster()
and then use your kwargs? this is rough
m
So essential replace
def fargate_cluster()
with
Copy code
def fargate_cluster():
    return FargateCluster()
is that right?
k
No I mean, try to start it completely outside a Prefect flow and see if you get an error
m
Alright... so running python from my local command line:
Copy code
Python 3.9.7 (default, Feb  4 2022, 10:07:17)
[Clang 12.0.0 (clang-1200.0.32.28)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster()
and it just hangs... ... so what is it doing?
k
I don’t know. Looking here
….well that guy had to no workers but I think you had.
This has a bunch of suggestions to check
m
👍 im increasing hte logging level too
k
It’s crazy we don’t even get an error though. The Dask Discourse might be of better help than me also
m
I see whats going on. For some reason, it wants to descrive every task we have registered. Here is just one request
Copy code
DEBUG:botocore.endpoint:Making request for OperationModel(name=DescribeTaskDefinition) with params: {'url_path': '/', 'query_string': '', 'method': 'POST', 'headers': {'X-Amz-Target': 'AmazonEC2ContainerServiceV20141113.DescribeTaskDefinition', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Botocore/1.24.21 Python/3.9.7 Darwin/19.6.0'}, 'body': b'{"taskDefinition": "arn:aws:ecs:us-west-2:xxx:task-definition/dev-webserver:202", "include": ["TAGS"]}', 'url': '<https://ecs.us-west-2.amazonaws.com/>', 'context': {'client_region': 'us-west-2', 'client_config': <aiobotocore.config.AioConfig object at 0x112993d60>, 'has_streaming_input': False, 'auth_type': None}}
And well, we have tons of task definitions -- bc we keep all the versions in our ECR repo.
k
Ahh. I think you can open an issue on their repo to ask if it can be avoided?
m
107 Views