Hi everyone, please help with the ECS Worker guide...
# ask-community
j
Hi everyone, please help with the ECS Worker guide, doesnt seem to work for a self hosted version These are the resources I'm creating
Copy code
import os

from aws_cdk import CfnOutput, RemovalPolicy, Stack
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecr as ecr
from aws_cdk import aws_ecs as ecs
from aws_cdk import aws_iam as iam
from aws_cdk import aws_logs as logs
from constructs import Construct


class PrefectEcsWorkerStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create ECR Repository for future use
        repository = ecr.Repository(
            self,
            "PrefectWorkerRepo",
            repository_name="prefect-worker-repo",
            removal_policy=RemovalPolicy.DESTROY,  # Be careful with this in production
        )

        # Create VPC
        vpc = ec2.Vpc(
            self,
            "PrefectVpc",
            max_azs=2,  # Use 2 Availability Zones
            nat_gateways=1,  # Add NAT Gateway for private subnets
        )

        # Create ECS Cluster
        cluster = ecs.Cluster(self, "PrefectCluster", vpc=vpc, cluster_name="prefect-worker-cluster")

        # Create IAM roles with expanded permissions
        execution_role = iam.Role(
            self,
            "PrefectTaskExecutionRole",
            assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
            role_name="ecsTaskExecutionRole",
        )

        # Add managed policy for ECS task execution
        execution_role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonECSTaskExecutionRolePolicy")
        )

        # Add ECR read permissions
        execution_role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name("AmazonEC2ContainerRegistryReadOnly")
        )

        # Add PassRole permission
        execution_role.add_to_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                actions=["iam:PassRole"],
                resources=[execution_role.role_arn],  # Only allow passing this specific role
            )
        )

        # Add ECS permissions
        execution_role.add_to_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                actions=[
                    "ecs:RegisterTaskDefinition",
                    "ecs:DescribeTaskDefinition",
                    "ecs:ListTaskDefinitions",
                    "ecs:DeregisterTaskDefinition",
                    "ecs:RunTask",
                    "ecs:StopTask",
                    "ecs:DescribeTasks",
                    "ecs:TagResource",
                    "ecs:UntagResource",
                ],
                resources=["*"],
            )
        )

        # Add VPC permissions
        execution_role.add_to_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                actions=[
                    "ec2:DescribeVpcs",
                    "ec2:DescribeSubnets",
                    "ec2:DescribeSecurityGroups",
                    "ec2:DescribeNetworkInterfaces",
                ],
                resources=["*"],
            )
        )

        # Add CloudWatch Logs permissions
        execution_role.add_to_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW, actions=["logs:CreateLogStream", "logs:PutLogEvents"], resources=["*"]
            )
        )

        # Create Task Definition with execution role
        task_definition = ecs.FargateTaskDefinition(
            self,
            "PrefectWorkerTask",
            memory_limit_mib=1024,
            cpu=512,
            execution_role=execution_role,
            task_role=execution_role,
        )

        # Add container to task definition
        container = task_definition.add_container(
            "prefect-worker",
            image=ecs.ContainerImage.from_registry("prefecthq/prefect:3-latest"),
            memory_limit_mib=1024,
            cpu=512,
            logging=ecs.LogDriver.aws_logs(
                stream_prefix="prefect-worker",
                log_group=logs.LogGroup(
                    self,
                    "PrefectWorkerLogs",
                    log_group_name="/ecs/prefect-worker",
                    retention=logs.RetentionDays.ONE_WEEK,
                ),
            ),
            environment={
                "PREFECT_API_URL": os.getenv("PREFECT_API_URL"),  # Replace with your Prefect API URL
                "LOG_LEVEL": "DEBUG",
                "PREFECT_DEBUG_MODE": "true",
                "PREFECT_INTERNAL_LOGGING_LEVEL": "DEBUG",
                "PREFECT_LOGGING_LEVEL": "DEBUG",
                # "PREFECT_API_KEY": "YOUR_PREFECT_API_KEY",  # Replace with your Prefect API key
            },
            command=["/bin/sh", "-c", "pip install prefect-aws && prefect worker start --pool my-ecs-pool --type ecs"],
        )

        # Create ECS Service
        service = ecs.FargateService(
            self,
            "PrefectWorkerService",
            cluster=cluster,
            task_definition=task_definition,
            desired_count=1,
            service_name="prefect-worker-service",
            assign_public_ip=True,  # Required for pulling container images
        )

        # Output the cluster name
        CfnOutput(self, "ClusterName", value=cluster.cluster_name)
        CfnOutput(self, "ServiceName", value=service.service_name)

        # Output the repository URI and execution role ARN
        CfnOutput(self, "RepositoryUri", value=repository.repository_uri)
        CfnOutput(self, "ExecutionRoleArn", value=execution_role.role_arn)
this is my deployment
Copy code
# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.

# Generic metadata about this project
name: prefect-flows-execution
prefect-version: 3.0.2

# build section allows you to manage and build docker images
build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build_image
    requires: prefect-docker>=0.3.1
    image_name: '{{ prefect.variables.ecr_repository_url }}'
    tag: '{{ prefect.variables.git_commit_sha }}'
    dockerfile: Dockerfile

# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.3.1
    image_name: '{{ build_image.image_name }}'
    tag: '{{ build_image.tag }}'

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /opt/prefect/prefect_deployments

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: ecs-worker-deployment
  version: '{{ build_image.tag }}'
  tags:
  - ecs
  - fargate
  - dev
  description: Prefect worker deployment on ECS Fargate
  schedule:
  entrypoint: workflows/test_flow.py:my_flow
  parameters: {}
  work_pool:
    name: my-ecs-pool
    work_queue_name:
    job_variables:
      image: '{{ build_image.image }}'
      task_definition_arn: '{{ prefect.variables.task_definition_arn }}'
      cluster: prefect-worker-cluster
  concurrency_limit:
  enforce_parameter_schema: true
  schedules: []
but what I get on the ecs is this
Copy code
18 November 2024 at 19:02 (UTC-5:00)
00:02:28.752 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Encountered exception in call get(<dropped>)
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
Traceback (most recent call last):
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_sync
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
result = self.fn(*self.args, **self.kwargs)
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
File "/usr/local/lib/python3.12/queue.py", line 179, in get
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
raise Empty
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
_queue.Empty
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
00:02:28.753 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9999224969997158) in thread 'APILogWorkerThread'
and on the prefect ui
Copy code
Waiting for ECS task run to start...
07:31:21 PM
prefect.flow_runs.worker
ECS task status is PROVISIONING.
07:31:21 PM
prefect.flow_runs.worker
ECS task status is PENDING.
07:31:31 PM
prefect.flow_runs.worker
ECS task status is RUNNING.
07:31:51 PM
prefect.flow_runs.worker
Opening process...
07:31:55 PM
prefect.flow_runs.runner
Completed submission of flow run '031c7c66-b99a-4b22-aac8-9300787727c1'
07:31:57 PM
prefect.flow_runs.worker
Running 1 deployment pull step(s)
07:32:01 PM
prefect.flow_runs
Process for flow run 'organic-bulldog' exited with status code: 1
07:32:02 PM
prefect.flow_runs.runner
Reported flow run '031c7c66-b99a-4b22-aac8-9300787727c1' as crashed: Flow run process exited with non-zero status code 1.
07:32:03 PM
prefect.flow_runs.runner
Running 1 deployment pull step(s)
@Marvin