Michael Bell

01/28/2022, 10:02 PM
Hi folks. I have a simple example flow that I've written to test using a Dask executor configured to use a Fargate cluster, and I'm encountering an error with S3 permissions that I have not faced with other flows. I'm trying to run the flow below and am getting the error
Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
. If I run the flow w/out the DaskExecutor, it works just fine. I have previously tested using
directly within a task to create a cluster, send a dask array calculation to the cluster, and tear down all within one Prefect task and that worked fine as well. Any thoughts as to where the permissions error might be coming from? The execution role I specify in the
has s3:* permissions.
import os
import sys
from typing import List
import prefect
from prefect import task, Flow
from import S3
from prefect.run_configs.ecs import ECSRun
from prefect.executors.dask import DaskExecutor
from import FargateCluster
from import ECR_IMAGE

def prefect_task(n: int) -> int:
    print(f'Running task w/ input {n}')
    return n * 10

def reduce_task(ns: List[int]) -> int:
    print(f'Running reduction task')
    return sum(ns)

schedule = None 
account = os.environ.get("AWSENV")

with Flow("dask_poc", storage=S3(bucket=f'prefect-{account}-us-east-1')) as flow:
    results =[1,2,3,4,5])
    reduced_result = reduce_task(results)

flow.run_config = ECSRun(
        "cluster": "PrefectCluster",

flow.executor = DaskExecutor(
        "image": ECR_IMAGE, 
        "n_workers": 5,
        "cluster_arn": ...,
        "execution_role_arn": ...,
        "vpc": ...,
        "subnets": [..., ],
        "security_groups": [..., ],

Anna Geller

01/28/2022, 10:32 PM
The permissions to S3 would need to be included in the
. Execution role is mainly to pull the image and create log group and write logs.

Michael Bell

01/28/2022, 10:33 PM
Ahhh, OK. Gotcha. Let me try that. 🤞
Seems to have worked! Thanks!
Another question. This flow works well when I run it once. I wanted to test whether I would be able to run 2 DaskExecutor enabled flows at the same time, so I tried running the flow twice, back to back, by clicking "quick run" in the UI. When I do this one of the flows runs properly, but the other one fails with the error:
botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: TaskDefinition is inactive
Any idea what's going on here?

Anna Geller

01/29/2022, 11:29 AM
Not 100% sure, but it looks like the first flow run registers a task definition, the next one tries to reuse it and during that time the first one finishes and deregisters this task definition. You can explicitly set a task definition Arn on your ECSRun to reuse the same task definition and potentially avoid this issue
