Dev Dabke
08/16/2024, 5:24 PMQTai
11/10/2024, 4:07 PMDev Dabke
11/11/2024, 4:26 AMDev Dabke
11/11/2024, 4:27 AMQTai
11/11/2024, 4:35 AMDev Dabke
11/15/2024, 12:29 AMQTai
11/15/2024, 7:23 AMDev Dabke
12/24/2024, 4:54 PMyaml
Dev Dabke
12/24/2024, 4:57 PM"""An ECS task definition for GPU machines."""
from .aws_ecs_machine import AwsEcsMachine
MAX_MEMORY = 61440
MEMORY_OVERHEAD = 1024
ROLE_NAME = "PrefectEcsTaskExecutionRole"
def get_cpu_task_dict(
flow_name: str, commit: str, machine: AwsEcsMachine, is_main_line: bool
):
"""
Get the ECS task definition for CPU machines.
Args:
flow_name: the name of the flow.
commit: the commit hash to use for the image.
machine: the machine configuration to use.
is_main_line: whether the flow is the main line or not.
"""
main_display = "main" if is_main_line else "feat"
log_prefix = f"prefect-cpu-{main_display}__{flow_name}__{commit}"
task_dict: dict[
str,
str
| list[
dict[
str,
str
| int
| list[str | int]
| list[dict[str, str] | dict[str, str | dict[str, str]]],
]
],
] = {
"family": f"prefect-cpu-{main_display}__{flow_name}",
"containerDefinitions": [
{
"name": "prefect",
"image": f"{machine.image.to_ecr()}:{commit}",
"cpu": 0,
"portMappings": [],
"essential": True,
"environment": [],
"mountPoints": [],
"volumesFrom": [],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "prefect",
"awslogs-create-group": "true",
"awslogs-region": "us-east-2",
"awslogs-stream-prefix": log_prefix,
},
},
"systemControls": [],
}
],
# NOTE: change xxx to your AWS account number
"executionRoleArn": f"arn:aws:iam::xxx:role/{ROLE_NAME}",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": f"{str(machine.cpu_value)}",
"memory": f"{str(machine.memory_value)}",
"ephemeralStorage": {"sizeInGiB": machine.storage},
"tags": [
{
"key": "commit",
"value": commit,
},
{
"key": "is_main_line",
"value": str(is_main_line),
},
],
}
return task_dict
def get_gpu_task_dict(
flow_name: str, commit: str, machine: AwsEcsMachine, is_main_line: bool
):
"""
Get the ECS task definition for GPU machines.
Args:
flow_name: the name of the flow.
commit: the commit hash to use for the image.
machine: the machine configuration
is_main_line: whether the flow is the main line or not.
"""
main_display = "main" if is_main_line else "feat"
log_prefix = f"prefect-gpu-{main_display}__{flow_name}__{commit}"
task_dict: dict[
str,
str
| list[
dict[
str,
str
| int
| list[str | int]
| list[dict[str, str] | dict[str, str | dict[str, str]]],
]
],
] = {
"family": f"prefect-gpu-{main_display}__{flow_name}",
"containerDefinitions": [
{
"name": "prefect",
"image": f"{machine.image.to_ecr()}:{commit}",
"cpu": 8192, # Hardcoded because of the instance type
"memory": MAX_MEMORY - MEMORY_OVERHEAD,
"portMappings": [],
"essential": True,
"environment": [],
"mountPoints": [],
"volumesFrom": [],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "prefect",
"awslogs-create-group": "true",
"awslogs-region": "us-east-2",
"awslogs-stream-prefix": log_prefix,
},
},
"systemControls": [],
"resourceRequirements": [{"value": "1", "type": "GPU"}],
}
],
# NOTE: change xxx to your AWS account number
"executionRoleArn": f"arn:aws:iam::xxx:role/{ROLE_NAME}",
"cpu": "8192",
"memory": f"{MAX_MEMORY - MEMORY_OVERHEAD}",
"ipcMode": "host",
"tags": [
{
"key": "commit",
"value": commit,
},
{
"key": "is_main_line",
"value": str(is_main_line),
},
],
}
return task_dict
Dev Dabke
12/24/2024, 4:58 PMtask_definition_arn = boto3.client("ecs").register_task_definition(**task_dict)["taskDefinition"]["taskDefinitionArn"]
job_variables = {"task_definition_arn": task_definition_arn}
flow.deploy(
...,
job_variables=job_variables,
...
)