<@ULVA73B9P> I have an ECS Worker correctly config...
# ask-marvin
p
@Marvin I have an ECS Worker correctly configured and now I'm trying to run a flow to its work-pool in a deployment with the following code:
Copy code
from prefect import flow,get_run_logger
from prefect.deployments.runner import DeploymentImage



@flow(log_prints=True)
def my_flow(name: str = "world"):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Test")


if __name__ == "__main__":
    
    my_flow.deploy(
        name="example3",
        image='prefecthq/prefect:2-latest',
        work_pool_name="test-pool", 
        job_variables={
               "command": "ls",
        },
        build= False,
        push= False,

    )
After the deployment, I can see it on my self-hosted Prefect, then I click "Quick run" in order to run a flow. The ECS Task start correctly, it executes the command "ls" and then it exits with code 0 (any issue) but the flow stay stucked on Pending state, how can I fix it ?
m
It looks like you might be facing an issue with ECS worker configurations and Prefect flow runs getting stuck in a pending state. Here are some key findings: ### Key Findings - ECS Worker Guide: - Ensure you've correctly deployed your flow to Prefect Cloud or your self-managed server instance, specifying the ECS work pool when prompted. Prefect AWS ECS Guide - Concurrency and Resource Limits: - The issue might be related to concurrency limits or resource allocations for your ECS tasks. - The work pools documentation suggests configuring priority and concurrency limits properly. - Example resources: - Prefect 1: 512 CPU, 256 Memory - Prefect 2: 2048 CPU, 16384 Memory with additional configurations such as prefetch seconds and query intervals. - GitHub Issues: - There have been similar issues where increasing resource allocations and adjusting agent configurations helped. - Make sure your agent is configured with sufficient resources, as seen in this issue. - `httpx.LocalProtocolError`: This might occur if Prefect agents fail to retrieve jobs properly, as mentioned here. ### Suggested Solutions 1. Resource Allocation: - Ensure your ECS tasks have sufficient CPU and memory allocated. You might need to increase these limits based on your flow requirements. - Example configuration:
Copy code
yaml
      job_variables:
        "command": "ls",
        "cpu": "2048",
        "memory": "16384"
- Adjust
PREFECT_AGENT_PREFETCH_SECONDS
and
PREFECT_AGENT_QUERY_INTERVAL
if necessary. 2. Concurrency Limits: - Check and adjust concurrency limits on your work pool or deployment. High-priority tasks might be waiting for available slots. - For issues related to concurrency limits and priority, refer to the work pools documentation. 3. Logs and Further Debugging: - Monitor the ECS and Prefect logs for any specific errors or warnings. This can give more clues on what might be going wrong. By ensuring you have these configurations in place, you should be able to resolve issues with your ECS tasks getting stuck in a pending state. If the issue persists, consider raising it with more detailed logs on the Prefect GitHub issues page.
p
@Marvin my ECS Worker is running with Docker in a EC2 instance, and executes the command
prefect worker start --pool test-pool
m
It appears that you have correctly set up your ECS worker and are using the command
prefect worker start --pool test-pool
to start it. Here are some possible issues and solutions based on available documentation: ### Key Points from Documentation 1. Worker Prefetch and Polling Intervals: - Prefetch Seconds: Prefect workers prefetch work a few seconds before the job is scheduled to run. It’s possible the infrastructure startup time exceeds this value. - Polling Interval: Workers poll for new work every 15 seconds by default. This can be adjusted using environment variables. 2. Environment Variables: - Prefetch: Ensure the
PREFECT_WORKER_PREFETCH_SECONDS
is sufficiently high. - Polling Interval: Adjust
PREFECT_WORKER_QUERY_SECONDS
to ensure timely polling. 3. Worker and Work Pools Install Policy: - Validate your worker's install policy with the appropriate settings. ### Suggested Solutions 1. Adjust Prefetch and Polling Intervals: - Increase the
PREFECT_WORKER_PREFETCH_SECONDS
and/or adjust the
PREFECT_WORKER_QUERY_SECONDS
based on your ECS task startup time.
Copy code
bash
    export PREFECT_WORKER_PREFETCH_SECONDS=60
    export PREFECT_WORKER_QUERY_SECONDS=10
Then restart your worker:
Copy code
bash
    prefect worker start --pool test-pool
2. Check Resource Allocation: - Ensure that ECS tasks have enough CPU and memory resources allocated. - Verify that your EC2 instance has sufficient resources for running Docker containers. 3. Logs and Monitoring: - Check the logs for your Docker container running on EC2. - Set up monitoring tools to check on EC2 instance performance and network latency, ensuring there's no resource bottleneck. ### References 1. ECS Worker Guide: Prefect AWS ECS Guide 2. Work Pools Documentation: Work Pools Implement these changes and let me know if your flow still gets stuck in the pending state.
k
it looks like you're creating a deployment that is looking in a docker image for your flow code, but you're not creating a docker image with the flow code in it
if you want to keep using the default prefect image, you'll need to use
flow.from_source().deploy()
where you're pointing to a git repo or other remote code storage in
from_source()
. otherwise you'll need to build and push a docker image that contains the flow code when calling
flow.deploy()
p
hi @Kevin Grismore thank you for your answer, it seems to run correctly on ECS, but the flow is stucked in Pending state
Screen Shot 2024-06-26 at 12.10.51.png
it there a way to set the flow code to a local path?
k
it's stuck in pending because the flow run never starts
that's because you've set the command to
ls
which overrides the default command of
prefect flow-run execute
p
I understand, thank you @Kevin Grismore it’s a test but I’d like to run my own command in the ECS task… which could be the correct way to achieve something similar?
k
I'm not exactly sure what you're looking to accomplish. do you want to run your prefect flow's python code? or something else?
p
I used to use ECSTask but I’m trying to use ECSWorker instead. I’d like to run custom ETL process, for example, run dbt models in a custom image which I already create and save in ECR. I run several commands using the same custom image so I want to override command variable in each flow
k
is the command a parameter to your flow, or a command to start the docker image with?
p
it’s equivalent to the command passed on the task definition, so it’s a command to run inside the docker image, after the docker-entrypoint
k
gotcha, but you still want to run your prefect flow code right?
so like
some command && prefect flow-run execute
?
or were you using
ECSTask
inside a prefect flow?
p
we were using ECSTask inside a prefect flow
k
cc @Nate
yeah, so the ECS worker isn't a replacement for running arbitrary ECS tasks, it's a way to run prefect flows on ECS that's similar to the ECSTask infrastructure block you'd place on a deployment
p
I understand, we are worried because ECSTask will be deprecated on September, so we are trying to adapt our current prefect flows, here is an example of a flow using ECSTask:
Copy code
from prefect import flow, task, variables, get_run_logger
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect_shell import shell_run_command
from prefect.server.schemas.schedules import CronSchedule
from prefect_aws import AwsCredentials, ECSTask
from prefect_aws.secrets_manager import read_secret
from handler.mysql_handler import DBHandler
 

import argparse
import base64
import boto3
import json
import os

from datetime import date, datetime, timedelta

parser = argparse.ArgumentParser()
parser.add_argument('--deploy', action='store_true', default=False)

def get_last_tag(client, repository ):
    jmespath_expression = 'sort_by(imageDetails, &to_string(imagePushedAt))[-1].imageTags'
    paginator = client.get_paginator('describe_images')
    iterator = paginator.paginate(repositoryName=repository)
    filter_iterator = iterator.search(jmespath_expression)
    result = list(filter_iterator)[0]
    return result

def run_ecs_task( image_tag, family, command, stream_prefix, env ):
    vpc_id = "vpc-id"
    security_groups="security-groups"
    subnet = "subnet"
    log_group = "log-group"
    cluster = "cluster"
    task_role_arn = "task-role"
    family_base = "family_base"
    
    aws_credentials = AwsCredentials.load("prefect-credentials")
    
    ecs_task = ECSTask(
        family = family_base+"-"+family, 
        command = command,
        image=image_tag,
        aws_credentials=aws_credentials,
        vpc_id=vpc_id,
        cluster=cluster,
        execution_role_arn="role",
        task_role_arn=task_role_arn,
        task_start_timeout_seconds = 300,
        configure_cloudwatch_logs = True,
        auto_deregister_task_definition = False,
        cloudwatch_logs_options = {
            "awslogs-group": log_group,
            "awslogs-region": "region",
            "awslogs-stream-prefix": stream_prefix
        },
        env = env,
        task_customizations=[
            {
                "op": "add",
                "path": "/networkConfiguration/awsvpcConfiguration/subnets",
                "value": [subnet],
            },
            {
                "op": "add",
                "path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
                "value": [security_groups],
            },
            {
                "op": "add",
                "path": "/tags",
                "value": [{'some-tag':'tag-value'}],
            },
        ],
        
    )
    
    result = ecs_task.run()

    return result

    
@task
def run_flujo_caja_task(image_tag: str, ha_reporting_secret: str, table_name):
    logger = get_run_logger()
    # Meltano
    path = "flujo_caja"
    family = "flujo-caja"
    stream_prefix = "ha-flujo-caja"
    env = {
            "ENVIRONMENT" : "prod",
            "SECRET" : ha_reporting_secret,
            "FLUJO_CAJA_TABLE_NAME" : table_name
        }
    
    command = ['php','%s/index.php' % (
            path
        )]
    
    logger.info("HA Reporting: Comienza carga de flujo caja ...")
    result = run_ecs_task( image_tag, family, command, stream_prefix, env )
    
    if result.__bool__() == False:
        raise Exception("HA Reporting: Falló carga de flujo caja . Revisar logs en Kibana.")
    else:
         logger.info("HA Reporting:: Finalizó correctamente carga de flujo caja.")  
         
@flow(name = "Run pipeline Flujo caja in ECS")
def run_flujo_caja():
    logger = get_run_logger()
    
    repository_name = "some-repo"
    aws_account = "account"
    aws_region =  "region"
    ha_reporting_secret = "secrets"
    
    ecr_client = boto3.client('ecr', aws_region)
    tag = get_last_tag(client = ecr_client, repository = repository_name)
    repository = aws_account+".dkr.ecr.us-east-1.amazonaws.com/"+repository_name
    image_tag = repository+":"+tag
    
    logger.info("Image: %s" % image_tag)
    
    flujo_caja_table_name = "table-name"
    run_flujo_caja_task(image_tag = image_tag, ha_reporting_secret = ha_reporting_secret ,table_name = flujo_caja_table_name)


deployment = Deployment.build_from_flow(
    flow=run_flujo_caja,
    name="flujo-caja",
    version=1.0,
    apply=True,
    tags=["prod"],
    schedule=(
        CronSchedule(
            cron="0 8 * * *",
            timezone="America/Santiago"
        )
    ),
    path="/var/www/ha-prefect/flows",
)

if __name__ == "__main__":
    args = parser.parse_args()
    if args.deploy:
        deployment.apply()