Hi! <https://docs.prefect.io/api/latest/run_config...
# ask-community
l
Hi! https://docs.prefect.io/api/latest/run_configs.html#ecsrun Is there any way we can configure
ECSRun
to take the most recent (or the only revision) of a
task_definition_arn
? Otherwise we’d have to update and deploy the flow every time we update a task definition which is not ideal
a
@Lana Dann afaik, if you don’t set a specific revision, ECS task will use the latest. In general, if you don’t provide a specific task definition ARN, Prefect registers a new task definition (or a new revision) on each flow run. But you can explicitly define a task definition either by: #1 Referencing a YAML file stored in S3
Copy code
flow.run_config = ECSRun(
    task_definition="<s3://bucket/flow_task_definition.yaml>",
    image="<http://XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest|XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest>",
    cpu="2 vcpu",
)
#2 Setting the task definition as kwargs on the ECSRun:
Copy code
import prefect
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow
from prefect.client.secrets import Secret


FLOW_NAME = "ecs_demo_ecr"
ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
    bucket="your_bucket_name",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = ECSRun(
    labels=["prod"],
    task_definition=dict(
        family=FLOW_NAME,
        requiresCompatibilities=["FARGATE"],
        networkMode="awsvpc",
        cpu=1024,
        memory=2048,
        taskRoleArn=f"arn:aws:iam::{ACCOUNT_ID}:role/prefectTaskRole",
        executionRoleArn=f"arn:aws:iam::{ACCOUNT_ID}:role/prefectECSAgentTaskExecutionRole",
        containerDefinitions=[
            dict(
                name="flow",
                image=f"{ACCOUNT_ID}.<http://dkr.ecr.us-east-1.amazonaws.com/your_image_name:latest|dkr.ecr.us-east-1.amazonaws.com/your_image_name:latest>",
            )
        ],
    ),
    run_task_kwargs=dict(cluster="prefectEcsCluster"),
)


@task
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    say_hi()
But to answer your question really specifically, I believe that: #1 This creates a flow from revision 1:
Copy code
flow.run_config = ECSRun(
    labels=["prod"],
    task_definition_arn="prefectFlow:1",
    run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
#2 This creates a flow from the latest revision of the task definition “prefectFlow”:
Copy code
flow.run_config = ECSRun(
    labels=["prod"],
    task_definition_arn="prefectFlow",
    run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
But I haven’t tested it on a cluster yet.
l
i'll try it out, thanks!
👍 1
unfortunately removing the revision tag doesn’t default to the latest task definition
a
I see, thanks for confirming that. In that case, perhaps providing a full path (
Copy code
task_definition="<s3://bucket/flow_task_definition.yaml>",
) or defining task definition as a dictionary will work for you?