Juan Carlos Calvo Jackson
11/19/2024, 12:36 AMimport os
from aws_cdk import CfnOutput, RemovalPolicy, Stack
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecr as ecr
from aws_cdk import aws_ecs as ecs
from aws_cdk import aws_iam as iam
from aws_cdk import aws_logs as logs
from constructs import Construct
class PrefectEcsWorkerStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create ECR Repository for future use
repository = ecr.Repository(
self,
"PrefectWorkerRepo",
repository_name="prefect-worker-repo",
removal_policy=RemovalPolicy.DESTROY, # Be careful with this in production
)
# Create VPC
vpc = ec2.Vpc(
self,
"PrefectVpc",
max_azs=2, # Use 2 Availability Zones
nat_gateways=1, # Add NAT Gateway for private subnets
)
# Create ECS Cluster
cluster = ecs.Cluster(self, "PrefectCluster", vpc=vpc, cluster_name="prefect-worker-cluster")
# Create IAM roles with expanded permissions
execution_role = iam.Role(
self,
"PrefectTaskExecutionRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
role_name="ecsTaskExecutionRole",
)
# Add managed policy for ECS task execution
execution_role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonECSTaskExecutionRolePolicy")
)
# Add ECR read permissions
execution_role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonEC2ContainerRegistryReadOnly")
)
# Add PassRole permission
execution_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["iam:PassRole"],
resources=[execution_role.role_arn], # Only allow passing this specific role
)
)
# Add ECS permissions
execution_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ecs:RegisterTaskDefinition",
"ecs:DescribeTaskDefinition",
"ecs:ListTaskDefinitions",
"ecs:DeregisterTaskDefinition",
"ecs:RunTask",
"ecs:StopTask",
"ecs:DescribeTasks",
"ecs:TagResource",
"ecs:UntagResource",
],
resources=["*"],
)
)
# Add VPC permissions
execution_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeVpcs",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkInterfaces",
],
resources=["*"],
)
)
# Add CloudWatch Logs permissions
execution_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW, actions=["logs:CreateLogStream", "logs:PutLogEvents"], resources=["*"]
)
)
# Create Task Definition with execution role
task_definition = ecs.FargateTaskDefinition(
self,
"PrefectWorkerTask",
memory_limit_mib=1024,
cpu=512,
execution_role=execution_role,
task_role=execution_role,
)
# Add container to task definition
container = task_definition.add_container(
"prefect-worker",
image=ecs.ContainerImage.from_registry("prefecthq/prefect:3-latest"),
memory_limit_mib=1024,
cpu=512,
logging=ecs.LogDriver.aws_logs(
stream_prefix="prefect-worker",
log_group=logs.LogGroup(
self,
"PrefectWorkerLogs",
log_group_name="/ecs/prefect-worker",
retention=logs.RetentionDays.ONE_WEEK,
),
),
environment={
"PREFECT_API_URL": os.getenv("PREFECT_API_URL"), # Replace with your Prefect API URL
"LOG_LEVEL": "DEBUG",
"PREFECT_DEBUG_MODE": "true",
"PREFECT_INTERNAL_LOGGING_LEVEL": "DEBUG",
"PREFECT_LOGGING_LEVEL": "DEBUG",
# "PREFECT_API_KEY": "YOUR_PREFECT_API_KEY", # Replace with your Prefect API key
},
command=["/bin/sh", "-c", "pip install prefect-aws && prefect worker start --pool my-ecs-pool --type ecs"],
)
# Create ECS Service
service = ecs.FargateService(
self,
"PrefectWorkerService",
cluster=cluster,
task_definition=task_definition,
desired_count=1,
service_name="prefect-worker-service",
assign_public_ip=True, # Required for pulling container images
)
# Output the cluster name
CfnOutput(self, "ClusterName", value=cluster.cluster_name)
CfnOutput(self, "ServiceName", value=service.service_name)
# Output the repository URI and execution role ARN
CfnOutput(self, "RepositoryUri", value=repository.repository_uri)
CfnOutput(self, "ExecutionRoleArn", value=execution_role.role_arn)
this is my deployment
# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.
# Generic metadata about this project
name: prefect-flows-execution
prefect-version: 3.0.2
# build section allows you to manage and build docker images
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: '{{ prefect.variables.ecr_repository_url }}'
tag: '{{ prefect.variables.git_commit_sha }}'
dockerfile: Dockerfile
# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/prefect_deployments
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: ecs-worker-deployment
version: '{{ build_image.tag }}'
tags:
- ecs
- fargate
- dev
description: Prefect worker deployment on ECS Fargate
schedule:
entrypoint: workflows/test_flow.py:my_flow
parameters: {}
work_pool:
name: my-ecs-pool
work_queue_name:
job_variables:
image: '{{ build_image.image }}'
task_definition_arn: '{{ prefect.variables.task_definition_arn }}'
cluster: prefect-worker-cluster
concurrency_limit:
enforce_parameter_schema: true
schedules: []
but what I get on the ecs is this
18 November 2024 at 19:02 (UTC-5:00)
00:02:28.752 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Encountered exception in call get(<dropped>)
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
Traceback (most recent call last):
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_sync
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
result = self.fn(*self.args, **self.kwargs)
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
File "/usr/local/lib/python3.12/queue.py", line 179, in get
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
raise Empty
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
_queue.Empty
e81e247f81e64085b10ca5c732d6cafc
prefect-worker
18 November 2024 at 19:02 (UTC-5:00)
00:02:28.753 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9999224969997158) in thread 'APILogWorkerThread'
and on the prefect ui
Waiting for ECS task run to start...
07:31:21 PM
prefect.flow_runs.worker
ECS task status is PROVISIONING.
07:31:21 PM
prefect.flow_runs.worker
ECS task status is PENDING.
07:31:31 PM
prefect.flow_runs.worker
ECS task status is RUNNING.
07:31:51 PM
prefect.flow_runs.worker
Opening process...
07:31:55 PM
prefect.flow_runs.runner
Completed submission of flow run '031c7c66-b99a-4b22-aac8-9300787727c1'
07:31:57 PM
prefect.flow_runs.worker
Running 1 deployment pull step(s)
07:32:01 PM
prefect.flow_runs
Process for flow run 'organic-bulldog' exited with status code: 1
07:32:02 PM
prefect.flow_runs.runner
Reported flow run '031c7c66-b99a-4b22-aac8-9300787727c1' as crashed: Flow run process exited with non-zero status code 1.
07:32:03 PM
prefect.flow_runs.runner
Running 1 deployment pull step(s)
Juan Carlos Calvo Jackson
11/19/2024, 7:27 PM