<@ULVA73B9P> does my terraform code to start ecs b...
# ask-marvin
s
@Marvin does my terraform code to start ecs based worker pool and deployment look correct:
Copy code
provider "prefect" {
  api_key      = var.prefect_api_key
  account_id   = var.prefect_account_id
  workspace_id = var.prefect_workspace_id
}


module "prefect_ecs_worker" {
  source  = "PrefectHQ/ecs-worker/prefect"
  version = "~> 0.0.3"

  # Basic configuration
  name                  = var.project_name
  vpc_id                = var.vpc
  worker_subnets        = var.private_subnet_ids
  prefect_api_key       = var.prefect_api_key
  prefect_account_id    = var.prefect_account_id
  prefect_workspace_id  = var.prefect_workspace_id
  worker_work_pool_name = var.work_pool_name


  # Worker configuration
  worker_desired_count = var.worker_count
  worker_cpu           = var.worker_cpu
  worker_memory        = var.worker_memory

  # Optional: Additional packages to install in worker
  worker_extra_pip_packages = "prefect-aws s3fs"

  # Optional: CloudWatch log retention
  worker_log_retention_in_days = 7

  # Optional: Secrets Manager recovery window
  secrets_manager_recovery_in_days = 30
}

resource "prefect_work_pool" "ecs_pool" {
  name        = var.work_pool_name
  type        = "ecs"
  paused      = false
  description = "ECS work pool for running Prefect flows on Fargate"

  base_job_template = jsonencode({
    job_configuration = {
      cluster = module.prefect_ecs_worker.ecs_cluster_name
      
      # Default resources for flow runs
      cpu    = var.flow_cpu
      memory = var.flow_memory
      
      # Use Fargate
      launchType = "FARGATE"
      
      # IAM roles created by the module
      executionRoleArn = module.prefect_ecs_worker.ecs_worker_task_execution_role_arn
      taskRoleArn      = module.prefect_ecs_worker.ecs_worker_task_role_arn
      
      # Network configuration
      networkConfiguration = {
        awsvpcConfiguration = {
          subnets        = var.private_subnet_ids
          securityGroups = [module.prefect_ecs_worker.ecs_worker_security_group_id]
          assignPublicIp = "DISABLED"
        }
      }
      
      # Container overrides for flow runs
    #   containerOverrides = [{
    #     name = "prefect-job"
    #     environment = [
    #       {
    #         name  = "PREFECT_API_URL"
    #         value = "<https://api.prefect.cloud/api/accounts/${var.prefect_account_id}/workspaces/${var.prefect_workspace_id}>"
    #       }
    #     ]
    #   }]
    }
    
    # Variables that can be overridden per deployment
    variables = {
      type = "object"
      properties = {
        cluster = {
          title   = "ECS Cluster"
          type    = "string"
          default = module.prefect_ecs_worker.ecs_cluster_name
        }
        cpu = {
          title   = "CPU"
          type    = "integer"
          default = var.flow_cpu
        }
        memory = {
          title   = "Memory"
          type    = "integer"
          default = var.flow_memory
        }
        image = {
          title       = "Image"
          description = "Docker image for the flow run"
          type        = "string"
          default     = "prefecthq/prefect:2-python3.11"
        }
        executionRoleArn = {
          title   = "Execution Role ARN"
          type    = "string"
          default = module.prefect_ecs_worker.ecs_worker_task_execution_role_arn
        }
        taskRoleArn = {
          title   = "Task Role ARN"
          type    = "string"
          default = module.prefect_ecs_worker.ecs_worker_task_role_arn
        }
      }
    }
  })

  # Ensure module resources are created first
  depends_on = [module.prefect_ecs_worker]
}

resource "prefect_deployment" "dist_alerts" {
  name         = "customer-analytics"
  work_pool_id = prefect_work_pool.ecs_pool.id
  
  flow_name  = "Customer Analytics"
  entrypoint = "/app/flows/dist_flow:flow"
  version    = var.deployment_version
  
  schedules = [{
    schedule = jsonencode({
      cron = "0 3 * * *"  # 3 AM daily
      timezone = "UTC"
    })
    active = false
  }]
  
  job_variables = jsonencode({
    cpu              = 2048
    memory           = 4096
    image            = var.pipeline_image  # Same image!
    cluster          = module.prefect_ecs_worker.ecs_cluster_name
    executionRoleArn = module.prefect_ecs_worker.ecs_worker_task_execution_role_arn
    taskRoleArn      = module.prefect_ecs_worker.ecs_worker_task_role_arn
    env = {
      PYTHONPATH = "/app"
      API_KEY = var.api_key
    }
  })
}
m
thought for 329.1 seconds
You’re very close, but a few key fixes are needed for Prefect 3.x ECS pools: What to fix - Base job template shape: The ECS worker expects the Prefect 3.x default template structure. Keys like cpu, memory, launchType, executionRoleArn, taskRoleArn, networkConfiguration should not be top-level. They belong under task_definition and task_run_request. Also, Prefect’s template variables are snake_case (e.g., launch_type, execution_role_arn, task_role_arn, network_configuration). - Include container_name (or be sure you’re okay with the default). The default template references container_name in multiple places. - Use a Prefect 3 image: switch from prefecthq/prefect:2-python3.11 to prefecthq/prefect:3-python3.11. - Deployment job_variables: Use snake_case for the role variables and include launch_type if you want to be explicit. Your env mapping is fine (the worker converts it to the ECS format). - Optional but recommended: Start from the default ECS base job template and customize it. You can fetch it with:
Copy code
prefect work-pool get-default-base-job-template --type ecs
This shows the exact structure the worker expects. Authoritative template reference - Default ECS base job template (CLI):
Copy code
prefect work-pool get-default-base-job-template --type ecs
- Docs: Manage work pools and ECS guide Minimal corrected snippets 1) Work pool with corrected base_job_template (trimmed to essentials and aligned with the default) ``` resource "prefect_work_pool" "ecs_pool" { name = var.work_pool_name type = "ecs" paused = false description = "ECS work pool for running Prefect flows on Fargate" base_job_template = jsonencode({ job_configuration = { # Task definition fields (role, resources, image, container name) task_definition = { containerDefinitions = [{ image = "{{ image }}" name = "{{ container_name }}" }] cpu = "{{ cpu }}" memory = "{{ memory }}" family = "{{ family }}" executionRoleArn = "{{ execution_role_arn }}" } # Task run request fields (cluster, launch type, overrides, task role) task_run_request = { launchType = "{{ launch_type }}" cluster = "{{ cluster }}" overrides = { containerOverrides = [{ name = "{{ container_name }}" command = "{{ command }}" environment = "{{ env }}" cpu = "{{ cpu }}" memory = "{{ memory }}" }] cpu = "{{ cpu }}" memory = "{{ memory }}" taskRoleArn = "{{ task_role_arn }}" } tags = "{{ labels }}" taskDefinition = "{{ task_definition_arn }}" capacityProviderStrategy = "{{ capacity_provider_strategy }}" } # Networking and other options network_configuration = "{{ network_configuration }}" container_name = "{{ container_name }}" } variables = { type = "object" description = "Variables for templating an ECS job." properties = { image = { title = "Image" description = "Docker image for the flow run" type = "string" default = "prefecthq/prefect:3-python3.11" } container_name = { title = "Container Name" type = "string" default = "prefect" } cpu = { title = "CPU" type = "integer" default = var.flow_cpu } memory = { title = "Memory" type = "integer" default = var.flow_memory } cluster = { title = "ECS Cluster" type = "string"
default = module.prefect_ecs_worker.ecs_cluster_name } launch_type = { title = "Launch Type" type = "string" default = "FARGATE" } execution_role_arn = { title = "Execution Role ARN" type = "string" default = module.prefect_ecs_worker.ecs_worker_task_execution_role_arn } task_role_arn = { title = "Task Role ARN" type = "string" default = module.prefect_ecs_worker.ecs_worker_task_role_arn } family = { title = "Task Definition Family" type = "string" default = "${var.project_name}-prefect" } network_configuration = { title = "Network Configuration" type = "object" default = { awsvpcConfiguration = { subnets = var.private_subnet_ids securityGroups = [module.prefect_ecs_worker.ecs_worker_security_group_id] assignPublicIp = "DISABLED" } } } env = { title = "Environment variables" type = "object" default = {} } } additionalProperties = true } }) depends_on = [module.prefect_ecs_worker] }
Copy code
2) Deployment with corrected job_variables
resource "prefect_deployment" "dist_alerts" { name = "customer-analytics" work_pool_id = prefect_work_pool.ecs_pool.id flow_name = "Customer Analytics" entrypoint = "/app/flows/dist_flow:flow" version = var.deployment_version schedules = [{ schedule = jsonencode({ cron = "0 3 * * *" timezone = "UTC" }) active = false }] job_variables = jsonencode({ image = var.pipeline_image cpu = 2048 memory = 4096 cluster = module.prefect_ecs_worker.ecs_cluster_name launch_type = "FARGATE" execution_role_arn = module.prefect_ecs_worker.ecs_worker_task_execution_role_arn task_role_arn = module.prefect_ecs_worker.ecs_worker_task_role_arn network_configuration = { awsvpcConfiguration = { subnets = var.private_subnet_ids securityGroups = [module.prefect_ecs_worker.ecs_worker_security_group_id] assignPublicIp = "DISABLED" } } env = { PYTHONPATH = "/app" API_KEY = var.api_key } }) } ``` Notes on your existing config - The module block looks fine; providing worker_work_pool_name is expected so your ECS service can poll the pool you create with prefect_work_pool. - Your commented containerOverrides env block isn’t needed if you use the template’s env variable; the worker will convert the dict to the ECS “environment” list. - If you need logs in CloudWatch from flow runs, consider setting variables like configure_cloudwatch_logs and cloudwatch_logs_options in the template. Would you like me to generate the full base_job_template JSON tailored to your exact variables and module outputs?
s
yes @Marvin
m
thought for 92.4 seconds
Awesome — here’s a complete ECS base_job_template you can drop into your
prefect_work_pool
resource. It follows the Prefect 3.x default ECS template structure, uses snake_case variable names, and sets sensible defaults from your Terraform variables and module outputs. Paste this into `base_job_template = jsonencode({ ... })`: ``` { "job_configuration": { "command": "{{ command }}", "env": "{{ env }}", "labels": "{{ labels }}", "name": "{{ name }}", "aws_credentials": "{{ aws_credentials }}", "task_definition": { "containerDefinitions": [ { "image": "{{ image }}", "name": "{{ container_name }}" } ], "cpu": "{{ cpu }}", "family": "{{ family }}", "memory": "{{ memory }}", "executionRoleArn": "{{ execution_role_arn }}" }, "task_run_request": { "launchType": "{{ launch_type }}", "cluster": "{{ cluster }}", "overrides": { "containerOverrides": [ { "name": "{{ container_name }}", "command": "{{ command }}", "environment": "{{ env }}", "cpu": "{{ cpu }}", "memory": "{{ memory }}" } ], "cpu": "{{ cpu }}", "memory": "{{ memory }}", "taskRoleArn": "{{ task_role_arn }}" }, "tags": "{{ labels }}", "taskDefinition": "{{ task_definition_arn }}", "capacityProviderStrategy": "{{ capacity_provider_strategy }}" }, "configure_cloudwatch_logs": "{{ configure_cloudwatch_logs }}", "cloudwatch_logs_options": "{{ cloudwatch_logs_options }}", "cloudwatch_logs_prefix": "{{ cloudwatch_logs_prefix }}", "network_configuration": "{{ network_configuration }}", "stream_output": "{{ stream_output }}", "task_start_timeout_seconds": "{{ task_start_timeout_seconds }}", "task_watch_poll_interval": "{{ task_watch_poll_interval }}", "auto_deregister_task_definition": "{{ auto_deregister_task_definition }}", "vpc_id": "{{ vpc_id }}", "container_name": "{{ container_name }}", "cluster": "{{ cluster }}", "match_latest_revision_in_family": "{{ match_latest_revision_in_family }}" }, "variables": { "description": "Variables for templating an ECS job.", "type": "object", "properties": { "image": { "title": "Image", "description": "Docker image for the flow run", "type": "string", "default": "prefecthq/prefect:3-python3.11" }, "container_name": { "title": "Container Name", "type": "string", "default": "prefect" }, "cpu": { "title": "CPU", "type": "integer", "default": ${var.flow_cpu} }, "memory": { "title": "Memory", "type": "integer", "default": ${var.flow_memory} }, "cluster": { "title": "ECS Cluster", "type": "string", "default": "${module.prefect_ecs_worker.ecs_cluster_name}" }, "family": { "title": "Task Definition Family", "type": "string", "default": "${var.project_name}-prefect" }, "launch_type": { "title": "Launch Type", "type": "string", "default": "FARGATE" }, "execution_role_arn": { "title": "Execution Role ARN", "type": "string", "default": "${module.prefect_ecs_worker.ecs_worker_task_execution_role_arn}" }, "task_role_arn": { "title": "Task Role ARN", "type": "string", "default": "${module.prefect_ecs_worker.ecs_worker_task_role_arn}" }, "network_configuration": { "title": "Network Configuration", "type": "object", "default": { "awsvpcConfiguration": { "subnets": ${jsonencode(var.private_subnet_ids)}, "securityGroups": [ "${module.prefect_ecs_worker.ecs_worker_security_group_id}" ], "assignPublicIp": "DISABLED"
} } }, "vpc_id": { "title": "VPC ID", "type": "string", "default": "${var.vpc}" }, "env": { "title": "Environment variables", "type": "object", "default": {} }, "labels": { "title": "Labels", "type": "array", "items": { "type": "string" }, "default": [] }, "command": { "title": "Command", "type": "array", "items": { "type": "string" }, "default": [] }, "task_definition_arn": { "title": "Existing Task Definition ARN (optional)", "type": [ "string", "null" ], "default": null }, "capacity_provider_strategy": { "title": "Capacity Provider Strategy", "type": "array", "items": { "type": "object" }, "default": [] }, "configure_cloudwatch_logs": { "title": "Configure CloudWatch Logs", "type": "boolean", "default": false }, "cloudwatch_logs_prefix": { "title": "CloudWatch Logs Prefix", "type": "string", "default": "prefect" }, "cloudwatch_logs_options": { "title": "CloudWatch Logs Options", "type": "object", "default": {} }, "stream_output": { "title": "Stream output to Prefect UI", "type": "boolean", "default": true }, "task_start_timeout_seconds": { "title": "Task start timeout (s)", "type": "integer", "default": 600 }, "task_watch_poll_interval": { "title": "Task watch poll interval (s)", "type": "number", "default": 5 }, "auto_deregister_task_definition": { "title": "Auto deregister task definition", "type": "boolean", "default": false }, "match_latest_revision_in_family": { "title": "Match latest revision in family", "type": "boolean", "default": false } }, "additionalProperties": true } }
Copy code
How to use it in Terraform
resource "prefect_work_pool" "ecs_pool" { name = var.work_pool_name type = "ecs" paused = false description = "ECS work pool for running Prefect flows on Fargate" base_job_template = jsonencode(<PASTE_JSON_OBJECT_ABOVE>) depends_on = [module.prefect_ecs_worker] } ``` This aligns with the Prefect 3.x default ECS template and cleanly maps per-deployment
job_variables
like
image
,
cpu
,
memory
,
env
,
execution_role_arn
,
task_role_arn
, and
network_configuration
. Reference links - Default ECS base job template (CLI): run
prefect work-pool get-default-base-job-template --type ecs
- Docs: Manage work pools and ECS guide If you want, I can also emit a ready-to-paste
prefect_deployment
snippet with
job_variables
that matches this template (using your
var.pipeline_image
, cluster, roles, and env).
s
@Marvin This is what i have now, but a local worker starts in default region and looks for a cluster named default instead of what’s defined in the template:
Copy code
provider "prefect" {
  api_key      = var.prefect_api_key
  account_id   = var.prefect_account_id
  workspace_id = var.prefect_workspace_id
}


module "prefect_ecs_worker" {
  source  = "PrefectHQ/ecs-worker/prefect"
  version = "~> 0.0.3"

  # Basic configuration
  name                  = var.project_name
  vpc_id                = var.vpc
  worker_subnets        = var.subnet_ids
  prefect_api_key       = var.prefect_api_key
  prefect_account_id    = var.prefect_account_id
  prefect_workspace_id  = var.prefect_workspace_id
  worker_work_pool_name = var.work_pool_name


  # Worker configuration
  worker_desired_count = var.worker_count
  worker_cpu           = var.worker_cpu
  worker_memory        = var.worker_memory

  # Optional: Additional packages to install in worker
#   worker_extra_pip_packages = "prefect-aws s3fs"

  # Optional: CloudWatch log retention
  worker_log_retention_in_days = 7

  # Optional: Secrets Manager recovery window
  secrets_manager_recovery_in_days = 30
}

resource "prefect_block" "aws_credentials" {
  name     = "aws-credentials-us-east-1"
  type_slug    = "aws-credentials"
  
  data = jsonencode({
    region_name = "us-east-1"
    # If you need to specify credentials explicitly (otherwise uses default AWS credentials)
    # aws_access_key_id = var.aws_access_key_id
    # aws_secret_access_key = var.aws_secret_access_key
  })
}

resource "prefect_work_pool" "ecs_pool" {
  name = "GNW-Pipeline-ECS-Pool"
  type = "ecs"

  base_job_template = jsonencode({
    job_configuration = {
      command = "{{ command }}"
      env     = "{{ env }}"
      vpc_id = var.vpc
      
      task_definition = {
        executionRoleArn = module.prefect_ecs_worker.prefect_worker_execution_role_arn
        containerDefinitions = [{
          image = "{{ image }}"
          name  = "prefect"
        }]
        cpu    = "{{ cpu }}"
        memory = "{{ memory }}"
        networkMode = "awsvpc"
        requiresCompatibilities = ["FARGATE"]
      }

      launch_type = "FARGATE"
      vpc_configuration = {
        subnets = var.subnet_ids
        assign_public_ip = "ENABLED"  # or "DISABLED" if using private subnets with NAT
        security_groups = [module.prefect_ecs_worker.prefect_worker_security_group]
      }
      cluster = module.prefect_ecs_worker.prefect_worker_cluster_name
      aws_credentials = {
          "$ref" = {
          block_document_id = prefect_block.aws_credentials.id
          }
      }
      
      task_run_request = {
        overrides = {
          containerOverrides = [{
            environment = "{{ env }}"
            cpu         = "{{ cpu }}"
            memory      = "{{ memory }}"
          }]
        }
      }
    }
    
    variables = {
      properties = {
        image = {
          type    = "string"
        }
        cpu = {
          type    = "integer"
          default = var.flow_cpu
        }
        memory = {
          type    = "integer"
          default = var.flow_memory
        }
        env = {
          type   = "object"
          additionalProperties = { type = "string" }
        }
        command = {
          type = "string"
        }
      }
    }
  })
}

resource "prefect_flow" "dist_alerts_update" {
  name = "Dist Alerts"
}

resource "prefect_deployment" "dist_alerts" {
  name         = "DIST-data-update"
  work_pool_name = prefect_work_pool.ecs_pool.name
  flow_id = prefect_flow.dist_alerts_update.id
  
  entrypoint = "/app/pipelines/dist_flow.py:main"
#   version    = var.deployment_version
  
  job_variables = jsonencode({
    cpu    = 2048
    memory = 4096
    image  = "****" # replace with var.pipeline_image
    env = {
    #   PYTHONPATH = "/app"
      API_KEY = "****"
    }
  })
}
m
thought for 107.7 seconds
Thanks for the details — the behavior you’re seeing (worker using region “default” and cluster “default”) happens when the ECS worker can’t find the expected keys in the work pool’s base job template and falls back to its internal defaults. Two issues in your template are causing that: - Wrong key names/locations: ECS expects cluster, launchType, overrides, etc. under job_configuration.task_run_request (and networking as network_configuration), not as top-level keys. It also expects snake_case variables like execution_role_arn, task_role_arn, launch_type, network_configuration. - aws_credentials is not templated; it should be set to "{{ aws_credentials }}" in job_configuration and provided via variables (where you can default it to your block reference). Otherwise, region resolution falls back to ambient/default. Below is a corrected work pool base_job_template that: - Follows the Prefect 3.x default ECS schema - Sets your cluster, networking, and roles in the right places - Passes your aws-credentials block so region is honored - Uses a container_name and proper containerOverrides entries - Uses Prefect 3 image tag Drop this into base_job_template = jsonencode({ ... }): ``` { job_configuration = { command = "{{ command }}" env = "{{ env }}" labels = "{{ labels }}" name = "{{ name }}" aws_credentials = "{{ aws_credentials }}" task_definition = { containerDefinitions = [{ image = "{{ image }}" name = "{{ container_name }}" }] cpu = "{{ cpu }}" memory = "{{ memory }}" family = "{{ family }}" executionRoleArn = "{{ execution_role_arn }}" } task_run_request = { launchType = "{{ launch_type }}" cluster = "{{ cluster }}" overrides = { containerOverrides = [{ name = "{{ container_name }}" command = "{{ command }}" environment = "{{ env }}" cpu = "{{ cpu }}" memory = "{{ memory }}" }] cpu = "{{ cpu }}" memory = "{{ memory }}" taskRoleArn = "{{ task_role_arn }}" } tags = "{{ labels }}" taskDefinition = "{{ task_definition_arn }}" capacityProviderStrategy = "{{ capacity_provider_strategy }}" } network_configuration = "{{ network_configuration }}" container_name = "{{ container_name }}" task_start_timeout_seconds = "{{ task_start_timeout_seconds }}" task_watch_poll_interval = "{{ task_watch_poll_interval }}" configure_cloudwatch_logs = "{{ configure_cloudwatch_logs }}" cloudwatch_logs_prefix = "{{ cloudwatch_logs_prefix }}" cloudwatch_logs_options = "{{ cloudwatch_logs_options }}" stream_output = "{{ stream_output }}" auto_deregister_task_definition = "{{ auto_deregister_task_definition }}" vpc_id = "{{ vpc_id }}" match_latest_revision_in_family = "{{ match_latest_revision_in_family }}" cluster = "{{ cluster }}" } variables = { description = "Variables for templating an ECS job." type = "object" properties = { image = { title = "Image" description = "Docker image for the flow run" type = "string" default = "prefecthq/prefect:3-python3.11" } container_name = { title = "Container Name" type = "string" default = "prefect" } cpu = { title = "CPU" type = "integer" default = var.flow_cpu } memory = { title = "Memory" type = "integer" default = var.flow_memory } cluster = { title = "ECS Cluster" type = "string" default = module.prefect_ecs_worker.prefect_worker_cluster_name } family = {
title = "Task Definition Family" type = "string" default = "${var.project_name}-prefect" } launch_type = { title = "Launch Type" type = "string" default = "FARGATE" } execution_role_arn = { title = "Execution Role ARN" type = "string" default = module.prefect_ecs_worker.prefect_worker_execution_role_arn } task_role_arn = { title = "Task Role ARN" type = "string" default = module.prefect_ecs_worker.prefect_worker_task_role_arn } aws_credentials = { title = "AWS Credentials Block" type = "object" default = { "$ref" = { block_document_id = prefect_block.aws_credentials.id } } } network_configuration = { title = "Network Configuration" type = "object" default = { awsvpcConfiguration = { subnets = var.subnet_ids securityGroups = [ module.prefect_ecs_worker.prefect_worker_security_group ] assignPublicIp = "ENABLED" } } } vpc_id = { title = "VPC ID" type = "string" default = var.vpc } env = { title = "Environment variables" type = "object" default = {} } labels = { title = "Labels" type = "array" items = { type = "string" } default = [] } command = { title = "Command" type = "array" items = { type = "string" } default = [] } task_definition_arn = { title = "Existing Task Definition ARN (optional)" type = [ "string", "null" ] default = null } capacity_provider_strategy = { title = "Capacity Provider Strategy" type = "array" items = { type = "object" } default = [] } configure_cloudwatch_logs = { title = "Configure CloudWatch Logs" type = "boolean" default = false } cloudwatch_logs_prefix = { title = "CloudWatch Logs Prefix" type = "string" default = "prefect" } cloudwatch_logs_options = { title = "CloudWatch Logs Options" type = "object" default = {} } stream_output = { title = "Stream output to Prefect UI" type = "boolean" default = true } task_start_timeout_seconds = { title = "Task start timeout (s)" type = "integer" default = 600 } task_watch_poll_interval = { title = "Task watch poll interval (s)" type = "number" default = 5 } auto_deregister_task_definition = { title = "Auto deregister task definition" type = "boolean" default = false } match_latest_revision_in_family = { title = "Match latest revision in family" type = "boolean" default = false } } additionalProperties = true } }
Copy code
And update your deployment job_variables to match the template (you can rely on defaults, but being explicit helps while testing):
resource "prefect_deployment" "dist_alerts" { name = "DIST-data-update" work_pool_name = prefect_work_pool.ecs_pool.name flow_id = prefect_flow.dist_alerts_update.id entrypoint = "/app/pipelines/dist_flow.py:main" job_variables = jsonencode({ image = "**" # var.pipeline_image cpu = 2048 memory = 4096 cluster = module.prefect_ecs_worker.prefect_worker_cluster_name launch_type = "FARGATE" execution_role_arn = module.prefect_ecs_worker.prefect_worker_execution_role_arn task_role_arn = module.prefect_ecs_worker.prefect_worker_task_role_arn network_configuration = {
awsvpcConfiguration = { subnets = var.subnet_ids securityGroups = [ module.prefect_ecs_worker.prefect_worker_security_group ] assignPublicIp = "ENABLED" } } env = { API_KEY = "**" } }) }
Copy code
Double-check these gotchas
- Container name: containerOverrides require a name. The template sets container_name to "prefect" and references it everywhere.
- Command type: must be an array of strings, not a single string.
- Region: ensure the aws_credentials variable is present and set to your block reference (region_name = "us-east-1" in your block), and not hardcoded directly in job_configuration.
- Module outputs: I used module.prefect_ecs_worker.prefect_worker_* based on your snippet. If your module actually outputs ecs_worker_* names, swap them accordingly.

Helpful commands while debugging
- Inspect the pool to see the effective template Prefect sees:
prefect work-pool inspect "GNW-Pipeline-ECS-Pool"
Copy code
- Show the default ECS base template for comparison:
prefect work-pool get-default-base-job-template --type ecs ``` Docs - ECS guide: Prefect AWS ECS guide - Managing work pools and templates: Manage work pools If you apply these changes and still see “default/default” being used, let me know what
prefect work-pool inspect
shows for this pool; that’ll tell us exactly which values the worker is reading.
s
@Marvin can you share the json template only with the correct syntax
m
thought for 87.8 seconds
``` { "job_configuration": { "command": "{{ command }}", "env": "{{ env }}", "labels": "{{ labels }}", "name": "{{ name }}", "aws_credentials": "{{ aws_credentials }}", "task_definition": { "containerDefinitions": [ { "image": "{{ image }}", "name": "{{ container_name }}" } ], "cpu": "{{ cpu }}", "family": "{{ family }}", "memory": "{{ memory }}", "executionRoleArn": "{{ execution_role_arn }}" }, "task_run_request": { "launchType": "{{ launch_type }}", "cluster": "{{ cluster }}", "overrides": { "containerOverrides": [ { "name": "{{ container_name }}", "command": "{{ command }}", "environment": "{{ env }}", "cpu": "{{ cpu }}", "memory": "{{ memory }}" } ], "cpu": "{{ cpu }}", "memory": "{{ memory }}", "taskRoleArn": "{{ task_role_arn }}" }, "tags": "{{ labels }}", "taskDefinition": "{{ task_definition_arn }}", "capacityProviderStrategy": "{{ capacity_provider_strategy }}" }, "configure_cloudwatch_logs": "{{ configure_cloudwatch_logs }}", "cloudwatch_logs_options": "{{ cloudwatch_logs_options }}", "cloudwatch_logs_prefix": "{{ cloudwatch_logs_prefix }}", "network_configuration": "{{ network_configuration }}", "stream_output": "{{ stream_output }}", "task_start_timeout_seconds": "{{ task_start_timeout_seconds }}", "task_watch_poll_interval": "{{ task_watch_poll_interval }}", "auto_deregister_task_definition": "{{ auto_deregister_task_definition }}", "vpc_id": "{{ vpc_id }}", "container_name": "{{ container_name }}", "cluster": "{{ cluster }}", "match_latest_revision_in_family": "{{ match_latest_revision_in_family }}" }, "variables": { "description": "Variables for templating an ECS job.", "type": "object", "properties": { "image": { "title": "Image", "type": "string", "default": "prefecthq/prefect:3-python3.11" }, "container_name": { "title": "Container Name", "type": "string", "default": "prefect" }, "cpu": { "title": "CPU", "type": "integer", "default": 1024 }, "memory": { "title": "Memory", "type": "integer", "default": 2048 }, "family": { "title": "Task Definition Family", "type": "string", "default": "prefect" }, "cluster": { "title": "ECS Cluster", "type": "string" }, "launch_type": { "title": "Launch Type", "type": "string", "default": "FARGATE" }, "execution_role_arn": { "title": "Execution Role ARN", "type": "string" }, "task_role_arn": { "title": "Task Role ARN", "type": "string" }, "aws_credentials": { "title": "AWS Credentials Block", "type": ["object", "null"], "default": null }, "network_configuration": { "title": "Network Configuration", "type": "object", "default": { "awsvpcConfiguration": { "subnets": [], "securityGroups": [], "assignPublicIp": "DISABLED" } } }, "vpc_id": { "title": "VPC ID", "type": "string", "default": "" }, "env": { "title": "Environment variables", "type": "object", "default": {} }, "labels": { "title": "Labels", "type": "array", "items": { "type": "string" }, "default": [] }, "command": { "title": "Command", "type": "array", "items": { "type": "string" }, "default": [] }, "task_definition_arn": { "title": "Existing Task Definition ARN (optional)",
"type": ["string", "null"], "default": null }, "capacity_provider_strategy": { "title": "Capacity Provider Strategy", "type": "array", "items": { "type": "object" }, "default": [] }, "configure_cloudwatch_logs": { "title": "Configure CloudWatch Logs", "type": "boolean", "default": false }, "cloudwatch_logs_prefix": { "title": "CloudWatch Logs Prefix", "type": "string", "default": "prefect" }, "cloudwatch_logs_options": { "title": "CloudWatch Logs Options", "type": "object", "default": {} }, "stream_output": { "title": "Stream output to Prefect UI", "type": "boolean", "default": true }, "task_start_timeout_seconds": { "title": "Task start timeout (s)", "type": "integer", "default": 600 }, "task_watch_poll_interval": { "title": "Task watch poll interval (s)", "type": "number", "default": 5 }, "auto_deregister_task_definition": { "title": "Auto deregister task definition", "type": "boolean", "default": false }, "match_latest_revision_in_family": { "title": "Match latest revision in family", "type": "boolean", "default": false } }, "additionalProperties": true } } ```
s
@Marvin flow run errored with:
Copy code
Unexpected exception encountered when trying to load flow
Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/prefect/engine.py", line 112, in <module>
    flow: "Flow[..., Any]" = load_flow(flow_run)
                             ~~~~~~~~~^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/flow_engine.py", line 140, in load_flow
    flow = run_coro_as_sync(
        load_flow_from_flow_run(flow_run, use_placeholder_flow=False)
    )
  File "/app/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
    return call.result()
           ~~~~~~~~~~~^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 365, in result
    return self.future.result(timeout=timeout)
           ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
    return self.__get_result()
           ~~~~~~~~~~~~~~~~~^^
  File "/home/appuser/.local/share/uv/python/cpython-3.13.5-linux-x86_64-gnu/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/app/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 441, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/client/utilities.py", line 69, in wrapper
    return await func(client, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/flows.py", line 2647, in load_flow_from_flow_run
    await storage_block.get_directory(from_path=from_path, local_path=".")
  File "/app/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 341, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/prefect/filesystems.py", line 162, in get_directory
    copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func)
    ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/appuser/.local/share/uv/python/cpython-3.13.5-linux-x86_64-gnu/lib/python3.13/shutil.py", line 591, in copytree
    with os.scandir(src) as itr:
         ~~~~~~~~~~^^^^^
FileNotFoundError: [Errno 2] No such file or directory: '/app/None'
the work pool: resource “prefect_work_pool” “ecs_pool” { name = “GNW-Pipeline-ECS-Pool” type = “ecs” base_job_template = jsonencode({ job_configuration = { # command = “{{ command }}” env = “{{ env }}” vpc_id = var.vpc task_definition = { executionRoleArn = module.prefect_ecs_worker.prefect_worker_execution_role_arn containerDefinitions = [{ image = “{{ image }}” name = “prefect” }] cpu = “{{ cpu }}” memory = “{{ memory }}” networkMode = “awsvpc” requiresCompatibilities = [“FARGATE”] } vpc_configuration = { subnets = var.subnet_ids assign_public_ip = “ENABLED” # or “DISABLED” if using private subnets with NAT security_group_ids = [module.prefect_ecs_worker.prefect_worker_security_group] } aws_credentials = { “$ref” = { block_document_id = prefect_block.aws_credentials.id } } task_run_request = { cluster = module.prefect_ecs_worker.prefect_worker_cluster_name launchType = “FARGATE” overrides = { containerOverrides = [{ environment = “{{ env }}” cpu = “{{ cpu }}” memory = “{{ memory }}” }] } } } variables = { properties = { image = { type = “string” } cpu = { type = “integer” default = var.flow_cpu } memory = { type = “integer” default = var.flow_memory } env = { type = “object” additionalProperties = { type = “string” } } command = { type = “array” items = { type = “string” } } } } }) }
m
Your message was too long, here's your message at the allowed limit: flow run errored with
Copy code
Unexpected exception encountered when trying to load flow Traceback (most recent call last) File /app/ venv/lib/python3 13/site-packages/prefect/engine py line 112 in <module> flow Flow[ Any] = load_flow(flow_run) ~~~~~~~~~^^^^^^^^^^ File /app/ venv/lib/python3 13/site-packages/prefect/flow_engine py line 140 in load_flow flow = run_coro_as_sync( load_flow_from_flow_run(flow_run use_placeholder_flow=False) ) File /app/ venv/lib/python3 13/site-packages/prefect/utilities/asyncutils py line 207 in run_coro_as_sync return call result() ~~~~~~~~~~~^^ File /app/ venv/lib/python3 13/site-packages/prefect/_internal/concurrency/calls py line 365 in result return self future result(timeout=timeout) ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^ File /app/ venv/lib/python3 13/site-packages/prefect/_internal/concurrency/calls py line 192 in result return self __get_result() ~~~~~~~~~~~~~~~~~^^ File /home/appuser/ local/share/uv/python/cpython-3 13 5-linux-x86_64-gnu/lib/python3 13/concurrent/futures/_base py line 401 in __get_result raise self _exception File /app/ venv/lib/python3 13/site-packages/prefect/_internal/concurrency/calls py line 441 in _run_async result = await coro ^^^^^^^^^^ File /app/ venv/lib/python3 13/site-packages/prefect/utilities/asyncutils py line 188 in coroutine_wrapper return await task ^^^^^^^^^^ File /app/ venv/lib/python3 13/site-packages/prefect/client/utilities py line 69 in wrapper return await func(client *args *kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /app/ venv/lib/python3 13/site-packages/prefect/flows py line 2647 in load_flow_from_flow_run await storage_block get_directory(from_path=from_path local_path= ) File /app/ venv/lib/python3 13/site-packages/prefect/utilities/asyncutils py line 341 in ctx_call result = await async_fn(*args *kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /app/ venv/lib/python3 13/site-packages/prefect/filesystems py line 162 in get_directory copytree(from_path local_path dirs_exist_ok=True ignore=ignore_func) ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /home/appuser/ local/share/uv/python/cpython-3 13 5-linux-x86_64-gnu/lib/python3 13/shutil py line 591 in copytree with os scandir(src) as itr ~~~~~~~~~~^^^^^ FileNotFoundError [Errno 2] No such file or directory '/app/None'
The deployment resource “prefect_deployment” “dist_alerts” { name = “DIST-data-update” work_pool_name = prefect_work_pool ecs_pool name flow_id = prefect_flow dist_alerts_update id # entrypoint = “/app/pipelines/dist_flow py main” # version = var deployment_version job_variables = jsonencode({ cpu = 2048 memory = 4096 image = “<http //084375562450 dkr ecr us-east-1 amazonaws com/analytics-api d2947b8|084375562450 dkr ecr us-east-1 amazonaws com/analytics-api d2947b8>” # replace with var pipeline_image env = { # PYTHONPATH = “/app” API_KEY = “**” } command = [“python” “-m” “pipelines dist_flow”] }) } the work pool resource “prefect_work_pool” “ecs_pool” { name = “GNW-Pipeline-ECS-Pool” type = “ecs” base_job_template = jsonencode({ job_configuration = { # command = “{{ command }}” env = “{{ env }}” vpc_id = var vpc task_definition = { executionRoleArn = module prefect_ecs_worker prefect_worker_execution_role_arn containerDefinitions = [{ image = “{{ image }}” name = “prefect” }] cpu = “{{ cpu }}” memory = “{{ memory }}” networkMode = “awsvpc” requiresCompatibilities = [“FARGATE”] } vpc_configuration = { subnets = var subnet_ids assign_public_ip = “ENABLED” # or “DISABLED” if using private subnets with NAT security_group_ids = [module prefect_ecs_worker prefect_worker_security_group] } aws_credentials = { “$ref” = { block_document_id = prefect_block aws_credentials id } } task_run_request = { cluster = module prefect_ecs_worker prefect_worker_cluster_name launchType = “FARGATE” overrides = { containerOverrides = [{ environment = “{{ env }}” cpu = “{{ cpu }}” memory = “{{ memory }}”
}] } } } variables = { properties = { image = { type = “string” } cpu = { type = “integer” default = var flow_cpu } memory = { type = “integer” default = var flow_memory } env = { type = “object” additionalProperties = { type = “string” } } command = { type = “array” items = { type = “string” } } }
thought for 255.0 seconds
Thanks for sharing the traceback and your pool template. The error points to Prefect trying to pull code from a storage location that resolves to “/app/None”, which means the deployment has a storage configuration with a basepath of “/app” and a missing/null subpath. When the engine loads the flow, it calls storage_block.get_directory(from_path=from_path), and here from_path is None → becomes “/app/None”. Why this happens - If you’re doing image-based runs (code baked into your Docker image), the deployment should NOT have a storage block or pull steps. Otherwise, the engine will try to fetch code from storage, leading to paths like “/app/None”. - This can occur if your workspace has a default storage configured (it can get applied to new deployments) or if the deployment was created with storage set implicitly. What to do Pick one of these paths: A) Image-only (recommended for ECS) - Ensure your image contains the flow file at the entrypoint you specified: “/app/pipelines/dist_flow.py:main”. - Remove storage from the deployment and clear any default storage in your workspace for this deployment. Steps to verify/fix: 1) Inspect the deployment to confirm storage/pull steps:
Copy code
prefect deployment inspect "Dist Alerts/DIST-data-update"
If you see a “Storage” section or “Pull” steps, remove them. You can remove in the UI, or patch via API, or re-apply Terraform ensuring the deployment has no storage fields. If your workspace has a default storage set, clear it (or override it for this deployment). 2) Re-run after confirming no storage, and that your image truly contains the code at /app/pipelines/dist_flow.py. B) Use storage intentionally - If you want Prefect to fetch code at runtime, configure a proper storage block (e.g., S3 or a remote filesystem) and set its path to the directory that contains your flow code. Then your entrypoint should reference files relative to that storage location. - Example: S3 storage with base path “s3://my-bucket/my-project” and the flow at “flows/dist_flow.py:main”. Also fix your ECS base job template Your current template has a few schema issues. Use the JSON template I shared earlier (it matches the Prefect 3.x ECS default structure). Key points: - Put cluster and launchType under job_configuration.task_run_request. - Use network_configuration (not vpc_configuration/security_group_ids). - Set aws_credentials to “{{ aws_credentials }}” and provide a default via your aws-credentials block. - Include a container_name and reference it in containerOverrides. Quick checklist - Deployment: - No “Storage” configured if you’re using image-only. - entrypoint matches a real path in the image: “/app/pipelines/dist_flow.py:main”. - Work pool base job template: - Uses the corrected JSON structure with snake_case variables. - cluster is under task_run_request and set via variables/defaults. - network_configuration is provided and valid. - aws_credentials is templated and points to your region (us-east-1). - Job variables (deployment): - image includes your code. - cpu/memory set. - cluster, launch_type, execution_role_arn, task_role_arn, network_configuration provided (or defaults work). - env as a map. Helpful commands - Inspect deployment and pool:
Copy code
prefect deployment inspect "Dist Alerts/DIST-data-update"
prefect work-pool inspect "GNW-Pipeline-ECS-Pool"
References - Default ECS base template (CLI):
Copy code
prefect work-pool get-default-base-job-template --type ecs
- Docs: Manage work pools and ECS guide If you paste the output of
prefect deployment inspect
here, I can point to the exact storage/pull configuration to remove.