Thread
#prefect-community
    Marc Lipoff

    Marc Lipoff

    5 months ago
    I'm trying to get a DaskExecutor on ECS/Fargate to work. And having issues. The flow hangs on
    Creating a new Dask cluster with None.fargate_cluster...
    . I'm running on prefect==0.15.6. My code is attached. The trouble is that it just hangs and I have no clue what it's "trying" to do.
    def fargate_cluster(n_workers=4):
        """Start a fargate cluster using the same image as the flow run"""
        return FargateCluster( 
                        n_workers=n_workers, 
                        image=prefect.context.image, 
                        cluster_arn= "arn:aws:ecs:us-west-2:524279393077:cluster/staging-prefect-cluster"
                        )
    
    
    with Flow(
        flow_name,
        **get_configs(
            file_path=os.path.abspath(sys.argv[0]), 
            flow_name=flow_name, 
            schedule=sched, 
            executor = DaskExecutor(
                    cluster_class=fargate_cluster,
                    cluster_kwargs={"n_workers": 4}
                ),
            run_config=ECSRun(
                env={
                    "PREFECT__LOGGING__LEVEL": "DEBUG",
                    "PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3', 'paramiko', 'awswrangler', 'pandas']",
                },
                cpu="4 vcpu",
                memory="30 GB",
                labels=['v2']
            ),
        )
    ) as flow:
        incs = inc.map(x=range(100))
        decs = dec.map(x=range(100))
        adds = add.map(x=incs, y=decs)
        total = list_sum(adds)
    Kevin Kho

    Kevin Kho

    5 months ago
    What is
    prefect.context.image
    ? How does it get supplied?
    Kevin Kho

    Kevin Kho

    5 months ago
    Wow that’s news to me
    Marc Lipoff

    Marc Lipoff

    5 months ago
    🤷 Let me see what's actually there.
    Kevin Kho

    Kevin Kho

    5 months ago
    Well you can do
    prefecthq/prefect
    for a quick test since that is public
    Marc Lipoff

    Marc Lipoff

    5 months ago
    Ya, the value is
    <http://xxx.dkr.ecr.us-west-2.amazonaws.com/staging-prefect-example-dask-cluster:2022-04-27t13-58-56-475868-00-00|xxx.dkr.ecr.us-west-2.amazonaws.com/staging-prefect-example-dask-cluster:2022-04-27t13-58-56-475868-00-00>
    Kevin Kho

    Kevin Kho

    5 months ago
    So I don’t think we have any insight unless we get logs from Fargate which we need to configure. So
    FargateCluster
    is a subclass of
    ECSCluster
    which takes in a parameter cloudwatch_logs_group here I think we need to try supplying a logs group so we can get some error tracking in CloudWatch
    Marc Lipoff

    Marc Lipoff

    5 months ago
    I alerted the
    fargate_cluster
    function to this:
    def fargate_cluster(n_workers=4):
        """Start a fargate cluster using the same image as the flow run"""
        
        return FargateCluster( 
                        n_workers=n_workers, 
                        image="prefecthq/prefect",  #prefect.context.image, 
                        cluster_arn= "arn:aws:ecs:us-west-2:xxx:cluster/staging-prefect-cluster",
                        cloudwatch_logs_group="/fargate/service/staging-prefect-agent"
                        )
    That specified log group already exists (happens to be the one that the agent writes to). .... And no logs for the flow show up
    Kevin Kho

    Kevin Kho

    5 months ago
    Can you try passing it through
    cluster_kwargs
    instead? No idea if it’ll make a difference
    Marc Lipoff

    Marc Lipoff

    5 months ago
    It didnt like your idea:
    State Message:
    Unexpected error: TypeError("fargate_cluster() got an unexpected keyword argument 'cloudwatch_logs_group'")
    Kevin Kho

    Kevin Kho

    5 months ago
    Ah your function needs to accept it, but never mind that just kinda indicates it goes to the same place
    I think if I had to diagnose, I’d try just spinning up the Fargate cluster directly and seeing if it works?
    from dask_cloudprovider.aws import FargateCluster
    cluster = FargateCluster()
    and then use your kwargs? this is rough
    Marc Lipoff

    Marc Lipoff

    5 months ago
    So essential replace
    def fargate_cluster()
    with
    def fargate_cluster():
        return FargateCluster()
    is that right?
    Kevin Kho

    Kevin Kho

    5 months ago
    No I mean, try to start it completely outside a Prefect flow and see if you get an error
    Marc Lipoff

    Marc Lipoff

    5 months ago
    Alright... so running python from my local command line:
    Python 3.9.7 (default, Feb  4 2022, 10:07:17)
    [Clang 12.0.0 (clang-1200.0.32.28)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from dask_cloudprovider.aws import FargateCluster
    >>> cluster = FargateCluster()
    and it just hangs... ... so what is it doing?
    Kevin Kho

    Kevin Kho

    5 months ago
    I don’t know. Looking here
    ….well that guy had to no workers but I think you had.
    This has a bunch of suggestions to check
    Marc Lipoff

    Marc Lipoff

    5 months ago
    👍 im increasing hte logging level too
    Kevin Kho

    Kevin Kho

    5 months ago
    It’s crazy we don’t even get an error though. The Dask Discourse might be of better help than me also
    Marc Lipoff

    Marc Lipoff

    5 months ago
    I see whats going on. For some reason, it wants to descrive every task we have registered. Here is just one request
    DEBUG:botocore.endpoint:Making request for OperationModel(name=DescribeTaskDefinition) with params: {'url_path': '/', 'query_string': '', 'method': 'POST', 'headers': {'X-Amz-Target': 'AmazonEC2ContainerServiceV20141113.DescribeTaskDefinition', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Botocore/1.24.21 Python/3.9.7 Darwin/19.6.0'}, 'body': b'{"taskDefinition": "arn:aws:ecs:us-west-2:xxx:task-definition/dev-webserver:202", "include": ["TAGS"]}', 'url': '<https://ecs.us-west-2.amazonaws.com/>', 'context': {'client_region': 'us-west-2', 'client_config': <aiobotocore.config.AioConfig object at 0x112993d60>, 'has_streaming_input': False, 'auth_type': None}}
    And well, we have tons of task definitions -- bc we keep all the versions in our ECR repo.
    Kevin Kho

    Kevin Kho

    5 months ago
    Ahh. I think you can open an issue on their repo to ask if it can be avoided?
    Marc Lipoff

    Marc Lipoff

    5 months ago