<@ULVA73B9P> i need to configure a ecs+ec2 setup ...
# ask-marvin
r
@Marvin i need to configure a ecs+ec2 setup wherin: a parent flow triggers deployments for n other flows. Each of n flows should run in their own job containers, once all jobs completed , the parent flow should then run other tasks in its own flow, how to go about this?
m
Error: No module named 'prefect.client.orchestration.flow_runs'
thought for 727.3 seconds
r
@Marvin give me an example of one flow of one deployment calling n other deployments in a task and waiting for all them to complete before proceeding to run other tasks
m
thought for 180.1 seconds
Here’s a compact Prefect 3.x pattern: a single “orchestrator” deployment triggers N other deployments in parallel from a task, waits for them all to finish, then proceeds.
Copy code
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.states import raise_state_exception

@task
def trigger_child(deployment_name: str, params: dict | None = None) -> str:
    """
    Start a deployment and block until it completes.
    Returns the child flow run ID or raises if it failed.
    """
    logger = get_run_logger()
    fr = run_deployment(
        name=deployment_name,               # e.g. "child-flow/prod"
        parameters=params or {},
        flow_run_name=f"child-{deployment_name.replace('/', '-')}",
        # as_subflow=True by default so lineage shows in the UI
        timeout=60 * 60,                    # optional: 1h max wait
        poll_interval=5,
    )
    <http://logger.info|logger.info>(f"{deployment_name} finished with state: {fr.state.type.name}")
    # Raise if the child failed/crashed/cancelled
    raise_state_exception(fr.state)
    return str(fr.id)

@task
def do_more_work(child_run_ids: list[str]) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"All children completed: {child_run_ids}. Proceeding...")

@flow(name="orchestrator", task_runner=ThreadPoolTaskRunner(max_workers=8))
def orchestrator(deployments: list[str], child_params: dict[str, dict] | None = None):
    """
    deployments: list of deployment names like ["child-a/prod", "child-b/prod", ...]
    child_params: optional per-deployment parameters keyed by deployment name
    """
    params = child_params or {}
    # Kick off children in parallel — each task blocks until its child finishes
    futures = [trigger_child.submit(d, params.get(d)) for d in deployments]

    # Wait for all children (their tasks) to complete
    done = wait(futures).done
    child_run_ids = [f.result() for f in done]  # will raise if any task raised above

    # Continue with downstream tasks
    do_more_work(child_run_ids)

if __name__ == "__main__":
    orchestrator([
        "child-a/prod",
        "child-b/prod",
        "child-c/prod",
    ])
Notes: -
run_deployment
waits for the child run to finish and returns its final `FlowRun`; we call
raise_state_exception
to fail fast if any child fails. - Parallelism comes from submitting the
trigger_child
task N times and using a thread pool via
ThreadPoolTaskRunner(max_workers=...)
. - By default
as_subflow=True
, so you’ll see nice parent/child lineage in the UI. Set
as_subflow=False
if you prefer not to.
Short answer: use an ECS work pool for the N child flows (each flow run = one ECS task/container), trigger them from a parent flow with run_deployment(), wait for them to finish, then continue the parent’s work. The only real choices are where to run the parent and how you want ECS to be configured for EC2 capacity. Here’s a concrete way to do it in Prefect 3.x. 1) Create and configure an ECS work pool (EC2 capacity) - Create the pool:
Copy code
prefect work-pool create my-ecs --type ecs
- Pull the default base job template, edit, then apply:
Copy code
prefect work-pool get-default-base-job-template --type ecs --file ecs-base.json
# edit ecs-base.json
prefect work-pool update my-ecs --base-job-template ecs-base.json
In ecs-base.json, set at minimum: - cluster (ARN or name) - container_name - image (or keep templated and set per-deployment) - network_configuration (subnets, securityGroups, assignPublicIp as needed) - execution_role_arn and task_role_arn - Use EC2 capacity by setting task_run_request.capacityProviderStrategy to your EC2 capacity provider(s), e.g.:
Copy code
"task_run_request": {
  "capacityProviderStrategy": [
    { "capacityProvider": "my-ec2-provider", "weight": 1 }
  ],
  "cluster": "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
  "taskDefinition": "{{ task_definition_arn }}"
}
- Optional: set a work-pool concurrency limit to the max number of concurrent child runs you want. - Start a worker that will submit ECS tasks:
Copy code
prefect worker start -p my-ecs
2) Deploy your child flow(s) to that ECS pool In Prefect 3.x, build deployments like this:
Copy code
from prefect import Flow

Flow.from_source(
    "<https://github.com/your-org/your-repo>",
    "flows/child.py:child_flow",
).deploy(
    name="child-on-ecs",
    work_pool_name="my-ecs",
    image="<http://123456789012.dkr.ecr.us-east-1.amazonaws.com/prefect-image:latest|123456789012.dkr.ecr.us-east-1.amazonaws.com/prefect-image:latest>",
)
Repeat for each child flow if they’re distinct, or reuse the same deployment with different parameters. 3) Decide where the parent runs - Best practice: run the parent on a cheap/process worker or a different pool so it doesn’t tie up an ECS task while it’s waiting. - If you must run parent on ECS too, ensure the ECS work pool concurrency can handle N children plus the parent (to avoid deadlocks). Example: create a simple process pool for the parent
Copy code
prefect work-pool create parent-process --type process
prefect worker start -p parent-process
4) Parent orchestration flow - Use run_deployment() to launch N child runs. It links them as subflows by default (as_subflow=True), and you can pass tags and job_variables to control the ECS job. - Then wait for them all to finish, and continue. ``` from prefect import flow, get_run_logger from prefect.deployments import run_deployment from prefect.flow_runs import wait_for_flow_run from uuid import UUID @flow def parent_flow(n: int = 5): log = get_run_logger() # EC2-only capacity provider override for child jobs (if you don’t bake it into the pool) job_vars = { "task_run_request": { "capacityProviderStrategy": [ {"capacityProvider": "my-ec2-provider", "weight": 1} ] } } child_runs = [] for i in range(n): fr = run_deployment( name="child-flow/child-on-ecs", # "<flow_name>/<deployment_name>" parameters={"index": i}, tags={"triggered-by": "parent"}, job_variables=job_vars, # optional if set in the pool template flow_run_name=f"child-{i}", # as_subflow=True by default: links the child to the parent in the UI ) log.info(f"Started child {i}: {fr.id}") child_runs.append(fr) # Wait for all children to finish; this does not consume ECS capacity itself for fr in child_runs: wait_for_flow_run(fr.id, timeout=7200, log_states=True) # Continue with whatever comes after
log.info("All children completed; running post-processing...") # ... your parent tasks here ... ``` Notes, gotchas, and knobs - run_deployment behavior: - It starts a separate flow run (picked up by the ECS worker) and returns a FlowRun object. It does not block; use wait_for_flow_run to wait. - as_subflow=True keeps the parent/child linkage in the UI. Set False if you want the runs detached. - Passing tags and job_variables: - tags attaches Prefect tags to the child run; by default the ECS base job template maps them to ECS task tags too. - job_variables can override anything in the base job template per-run, e.g., image, CPU/memory, capacityProviderStrategy, etc. - Concurrency: - Set the ECS pool concurrency ≥ the number of simultaneous children. - If the parent also runs on ECS, ensure concurrency ≥ N + 1 or use a separate pool for the parent to avoid starvation. - Images: - Either set a fixed image in the pool’s base job template or pass one per deployment. For EC2 tasks your image must be present in a registry the cluster can reach (typically ECR). - Logging: - Consider enabling configure_cloudwatch_logs and cloudwatch_logs_options in the base job template so you can tail ECS logs easily. If you want, share: - Your ECS cluster name/ARN and EC2 capacity provider name - Whether you prefer the parent to run on ECS or on a lightweight process worker - The max number of children you plan to run concurrently I’ll tailor the base job template snippet and deployment config to your setup. Thrilling, I know.
r
i will go with process worker running in a ec2 machine for parent and ecs+ec2 workers for child. in this setup, initially i will have one ecs worker polling the workpool . But for faster runs, i may need to scale the worker service to multiple workers. in this scenario, each worker needs to run in a different ec2 machine isnt it? @Marvin
m
thought for 102.0 seconds
Short answer: no. Prefect “workers” are control-plane processes that submit and watch ECS tasks; they don’t run your jobs. You can run multiple ECS workers on the same EC2 instance, different instances, or even as an ECS service with multiple replicas. The actual child flows run as ECS tasks spread across whatever EC2 capacity your cluster has. Practical guidance - Start with one ECS worker and set your pool concurrency to the number of simultaneous child runs you want:
Copy code
prefect work-pool set-concurrency-limit my-ecs 10
prefect worker start -p my-ecs
- If you need more pickup throughput or HA, add more worker replicas. They can run: - On the same EC2 box (multiple processes/containers), or - On different EC2 instances for resilience (recommended). - Ensure your ECS cluster’s EC2 capacity provider/ASG can scale to match the concurrency you set. Workers don’t provide compute; EC2 capacity does. Parent on a process worker - Your parent flow on a process worker (on a small EC2) is perfect. It will trigger N child deployments; each child run becomes its own ECS task/container. The parent just waits and then continues—no ECS capacity consumed while waiting. Rule of thumb - Increase pool concurrency first. If a single worker keeps up, you don’t need more workers. - Add more workers for faster pickup and fault tolerance—not because each needs its own machine. Running them on separate instances/AZs is just good redundancy.
r
Copy code
They can run:
  - On the same EC2 box
but having an additional worker on same machine, means the jobs will be created on same machine na? @Marvin
m
thought for 29.4 seconds
No—running multiple ECS workers on the same EC2 box does not make your child jobs run on that box. - ECS workers just submit and monitor tasks. They call RunTask on your ECS cluster; the ECS scheduler decides which EC2 container instance runs each task based on capacity/provider/placement. The worker’s host is irrelevant to where the task runs. - Your parent Process worker on a single EC2 machine will only run the parent flow there. Child flows will run as ECS tasks on whatever EC2 instances your cluster provides. When would jobs run “on the same machine”? - With Process or Docker workers: yes, those execute the workload locally on the worker’s host. - With ECS workers: no; the workload runs on ECS container instances in the cluster. If you want better throughput/HA: - Increase the work pool concurrency (so more child tasks can run at once). - Scale your ECS worker replicas (same box or different boxes—different is better for resilience). - Ensure your ECS EC2 capacity (ASG/capacity provider) can scale to match that concurrency. If you truly need to pin tasks to specific instances, that’s an ECS concern: use placement constraints/strategies or dedicated capacity providers—not Prefect worker placement.