Paula Lasalas
06/26/2024, 3:50 PMfrom prefect import flow,get_run_logger
from prefect.deployments.runner import DeploymentImage
@flow(log_prints=True)
def my_flow(name: str = "world"):
logger = get_run_logger()
<http://logger.info|logger.info>("Test")
if __name__ == "__main__":
my_flow.deploy(
name="example3",
image='prefecthq/prefect:2-latest',
work_pool_name="test-pool",
job_variables={
"command": "ls",
},
build= False,
push= False,
)
After the deployment, I can see it on my self-hosted Prefect, then I click "Quick run" in order to run a flow. The ECS Task start correctly, it executes the command "ls" and then it exits with code 0 (any issue) but the flow stay stucked on Pending state, how can I fix it ?Marvin
06/26/2024, 3:51 PMyaml
job_variables:
"command": "ls",
"cpu": "2048",
"memory": "16384"
- Adjust PREFECT_AGENT_PREFETCH_SECONDS
and PREFECT_AGENT_QUERY_INTERVAL
if necessary.
2. Concurrency Limits:
- Check and adjust concurrency limits on your work pool or deployment. High-priority tasks might be waiting for available slots.
- For issues related to concurrency limits and priority, refer to the work pools documentation.
3. Logs and Further Debugging:
- Monitor the ECS and Prefect logs for any specific errors or warnings. This can give more clues on what might be going wrong.
By ensuring you have these configurations in place, you should be able to resolve issues with your ECS tasks getting stuck in a pending state. If the issue persists, consider raising it with more detailed logs on the Prefect GitHub issues page.Paula Lasalas
06/26/2024, 3:54 PMprefect worker start --pool test-pool
Marvin
06/26/2024, 3:54 PMprefect worker start --pool test-pool
to start it. Here are some possible issues and solutions based on available documentation:
### Key Points from Documentation
1. Worker Prefetch and Polling Intervals:
- Prefetch Seconds: Prefect workers prefetch work a few seconds before the job is scheduled to run. It’s possible the infrastructure startup time exceeds this value.
- Polling Interval: Workers poll for new work every 15 seconds by default. This can be adjusted using environment variables.
2. Environment Variables:
- Prefetch: Ensure the PREFECT_WORKER_PREFETCH_SECONDS
is sufficiently high.
- Polling Interval: Adjust PREFECT_WORKER_QUERY_SECONDS
to ensure timely polling.
3. Worker and Work Pools Install Policy:
- Validate your worker's install policy with the appropriate settings.
### Suggested Solutions
1. Adjust Prefetch and Polling Intervals:
- Increase the PREFECT_WORKER_PREFETCH_SECONDS
and/or adjust the PREFECT_WORKER_QUERY_SECONDS
based on your ECS task startup time.
bash
export PREFECT_WORKER_PREFETCH_SECONDS=60
export PREFECT_WORKER_QUERY_SECONDS=10
Then restart your worker:
bash
prefect worker start --pool test-pool
2. Check Resource Allocation:
- Ensure that ECS tasks have enough CPU and memory resources allocated.
- Verify that your EC2 instance has sufficient resources for running Docker containers.
3. Logs and Monitoring:
- Check the logs for your Docker container running on EC2.
- Set up monitoring tools to check on EC2 instance performance and network latency, ensuring there's no resource bottleneck.
### References
1. ECS Worker Guide: Prefect AWS ECS Guide
2. Work Pools Documentation: Work Pools
Implement these changes and let me know if your flow still gets stuck in the pending state.Kevin Grismore
06/26/2024, 3:56 PMKevin Grismore
06/26/2024, 3:57 PMflow.from_source().deploy()
where you're pointing to a git repo or other remote code storage in from_source()
. otherwise you'll need to build and push a docker image that contains the flow code when calling flow.deploy()
Paula Lasalas
06/26/2024, 4:11 PMPaula Lasalas
06/26/2024, 4:14 PMPaula Lasalas
06/26/2024, 4:26 PMKevin Grismore
06/26/2024, 4:28 PMKevin Grismore
06/26/2024, 4:29 PMls
which overrides the default command of prefect flow-run execute
Paula Lasalas
06/26/2024, 4:31 PMKevin Grismore
06/26/2024, 4:33 PMPaula Lasalas
06/26/2024, 4:37 PMKevin Grismore
06/26/2024, 4:39 PMPaula Lasalas
06/26/2024, 4:41 PMKevin Grismore
06/26/2024, 4:41 PMKevin Grismore
06/26/2024, 4:42 PMsome command && prefect flow-run execute
?Kevin Grismore
06/26/2024, 4:42 PMECSTask
inside a prefect flow?Paula Lasalas
06/26/2024, 4:43 PMKevin Grismore
06/26/2024, 4:43 PMKevin Grismore
06/26/2024, 4:44 PMPaula Lasalas
06/26/2024, 4:51 PMfrom prefect import flow, task, variables, get_run_logger
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect_shell import shell_run_command
from prefect.server.schemas.schedules import CronSchedule
from prefect_aws import AwsCredentials, ECSTask
from prefect_aws.secrets_manager import read_secret
from handler.mysql_handler import DBHandler
import argparse
import base64
import boto3
import json
import os
from datetime import date, datetime, timedelta
parser = argparse.ArgumentParser()
parser.add_argument('--deploy', action='store_true', default=False)
def get_last_tag(client, repository ):
jmespath_expression = 'sort_by(imageDetails, &to_string(imagePushedAt))[-1].imageTags'
paginator = client.get_paginator('describe_images')
iterator = paginator.paginate(repositoryName=repository)
filter_iterator = iterator.search(jmespath_expression)
result = list(filter_iterator)[0]
return result
def run_ecs_task( image_tag, family, command, stream_prefix, env ):
vpc_id = "vpc-id"
security_groups="security-groups"
subnet = "subnet"
log_group = "log-group"
cluster = "cluster"
task_role_arn = "task-role"
family_base = "family_base"
aws_credentials = AwsCredentials.load("prefect-credentials")
ecs_task = ECSTask(
family = family_base+"-"+family,
command = command,
image=image_tag,
aws_credentials=aws_credentials,
vpc_id=vpc_id,
cluster=cluster,
execution_role_arn="role",
task_role_arn=task_role_arn,
task_start_timeout_seconds = 300,
configure_cloudwatch_logs = True,
auto_deregister_task_definition = False,
cloudwatch_logs_options = {
"awslogs-group": log_group,
"awslogs-region": "region",
"awslogs-stream-prefix": stream_prefix
},
env = env,
task_customizations=[
{
"op": "add",
"path": "/networkConfiguration/awsvpcConfiguration/subnets",
"value": [subnet],
},
{
"op": "add",
"path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
"value": [security_groups],
},
{
"op": "add",
"path": "/tags",
"value": [{'some-tag':'tag-value'}],
},
],
)
result = ecs_task.run()
return result
@task
def run_flujo_caja_task(image_tag: str, ha_reporting_secret: str, table_name):
logger = get_run_logger()
# Meltano
path = "flujo_caja"
family = "flujo-caja"
stream_prefix = "ha-flujo-caja"
env = {
"ENVIRONMENT" : "prod",
"SECRET" : ha_reporting_secret,
"FLUJO_CAJA_TABLE_NAME" : table_name
}
command = ['php','%s/index.php' % (
path
)]
logger.info("HA Reporting: Comienza carga de flujo caja ...")
result = run_ecs_task( image_tag, family, command, stream_prefix, env )
if result.__bool__() == False:
raise Exception("HA Reporting: Falló carga de flujo caja . Revisar logs en Kibana.")
else:
logger.info("HA Reporting:: Finalizó correctamente carga de flujo caja.")
@flow(name = "Run pipeline Flujo caja in ECS")
def run_flujo_caja():
logger = get_run_logger()
repository_name = "some-repo"
aws_account = "account"
aws_region = "region"
ha_reporting_secret = "secrets"
ecr_client = boto3.client('ecr', aws_region)
tag = get_last_tag(client = ecr_client, repository = repository_name)
repository = aws_account+".dkr.ecr.us-east-1.amazonaws.com/"+repository_name
image_tag = repository+":"+tag
logger.info("Image: %s" % image_tag)
flujo_caja_table_name = "table-name"
run_flujo_caja_task(image_tag = image_tag, ha_reporting_secret = ha_reporting_secret ,table_name = flujo_caja_table_name)
deployment = Deployment.build_from_flow(
flow=run_flujo_caja,
name="flujo-caja",
version=1.0,
apply=True,
tags=["prod"],
schedule=(
CronSchedule(
cron="0 8 * * *",
timezone="America/Santiago"
)
),
path="/var/www/ha-prefect/flows",
)
if __name__ == "__main__":
args = parser.parse_args()
if args.deploy:
deployment.apply()