Thread
#prefect-community
    m

    Michael Bell

    7 months ago
    Hi folks. I have a simple example flow that I've written to test using a Dask executor configured to use a Fargate cluster, and I'm encountering an error with S3 permissions that I have not faced with other flows. I'm trying to run the flow below and am getting the error
    Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    . If I run the flow w/out the DaskExecutor, it works just fine. I have previously tested using
    dask_cloudprovider.aws.FargateCluster
    directly within a task to create a cluster, send a dask array calculation to the cluster, and tear down all within one Prefect task and that worked fine as well. Any thoughts as to where the permissions error might be coming from? The execution role I specify in the
    cluster_kwargs
    has s3😗 permissions.
    import os
    import sys
    from typing import List
    sys.path.append('.')
    import prefect
    from prefect import task, Flow
    from prefect.storage.s3 import S3
    from prefect.run_configs.ecs import ECSRun
    from prefect.executors.dask import DaskExecutor
    from dask_cloudprovider.aws import FargateCluster
    from config.build import ECR_IMAGE
    
    @task(log_stdout=True)
    def prefect_task(n: int) -> int:
        print(f'Running task w/ input {n}')
        time.sleep(2)
        return n * 10
    
    @task(log_stdout=True)
    def reduce_task(ns: List[int]) -> int:
        print(f'Running reduction task')
        return sum(ns)
    
    schedule = None 
    account = os.environ.get("AWSENV")
    
    with Flow("dask_poc", storage=S3(bucket=f'prefect-{account}-us-east-1')) as flow:
        results = prefect_task.map([1,2,3,4,5])
        reduced_result = reduce_task(results)
    
    
    flow.run_config = ECSRun(
        image=ECR_IMAGE,
        execution_role_arn=...,
        task_role_arn=...,
        run_task_kwargs={
            "cluster": "PrefectCluster",
        },
        labels=[account]
    )
    
    
    flow.executor = DaskExecutor(
        cluster_class="dask_cloudprovider.aws.FargateCluster",
        cluster_kwargs={
            "image": ECR_IMAGE, 
            "n_workers": 5,
            "cluster_arn": ...,
            "execution_role_arn": ...,
            "vpc": ...,
            "subnets": [..., ],
            "security_groups": [..., ],
        },
    )
    Anna Geller

    Anna Geller

    7 months ago
    The permissions to S3 would need to be included in the
    task_role_arn
    . Execution role is mainly to pull the image and create log group and write logs.
    m

    Michael Bell

    7 months ago
    Ahhh, OK. Gotcha. Let me try that. 🤞
    Seems to have worked! Thanks!
    Another question. This flow works well when I run it once. I wanted to test whether I would be able to run 2 DaskExecutor enabled flows at the same time, so I tried running the flow twice, back to back, by clicking "quick run" in the UI. When I do this one of the flows runs properly, but the other one fails with the error:
    botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: TaskDefinition is inactive
    Any idea what's going on here?
    Anna Geller

    Anna Geller

    7 months ago
    Not 100% sure, but it looks like the first flow run registers a task definition, the next one tries to reuse it and during that time the first one finishes and deregisters this task definition. You can explicitly set a task definition Arn on your ECSRun to reuse the same task definition and potentially avoid this issue