Ravish Mallya
08/12/2025, 9:19 AMMarvin
08/12/2025, 9:19 AMMarvin
08/12/2025, 9:22 AMRavish Mallya
08/12/2025, 9:26 AMMarvin
08/12/2025, 9:26 AMMarvin
08/12/2025, 9:29 AMfrom prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.states import raise_state_exception
@task
def trigger_child(deployment_name: str, params: dict | None = None) -> str:
"""
Start a deployment and block until it completes.
Returns the child flow run ID or raises if it failed.
"""
logger = get_run_logger()
fr = run_deployment(
name=deployment_name, # e.g. "child-flow/prod"
parameters=params or {},
flow_run_name=f"child-{deployment_name.replace('/', '-')}",
# as_subflow=True by default so lineage shows in the UI
timeout=60 * 60, # optional: 1h max wait
poll_interval=5,
)
<http://logger.info|logger.info>(f"{deployment_name} finished with state: {fr.state.type.name}")
# Raise if the child failed/crashed/cancelled
raise_state_exception(fr.state)
return str(fr.id)
@task
def do_more_work(child_run_ids: list[str]) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"All children completed: {child_run_ids}. Proceeding...")
@flow(name="orchestrator", task_runner=ThreadPoolTaskRunner(max_workers=8))
def orchestrator(deployments: list[str], child_params: dict[str, dict] | None = None):
"""
deployments: list of deployment names like ["child-a/prod", "child-b/prod", ...]
child_params: optional per-deployment parameters keyed by deployment name
"""
params = child_params or {}
# Kick off children in parallel — each task blocks until its child finishes
futures = [trigger_child.submit(d, params.get(d)) for d in deployments]
# Wait for all children (their tasks) to complete
done = wait(futures).done
child_run_ids = [f.result() for f in done] # will raise if any task raised above
# Continue with downstream tasks
do_more_work(child_run_ids)
if __name__ == "__main__":
orchestrator([
"child-a/prod",
"child-b/prod",
"child-c/prod",
])
Notes:
- run_deployment
waits for the child run to finish and returns its final `FlowRun`; we call raise_state_exception
to fail fast if any child fails.
- Parallelism comes from submitting the trigger_child
task N times and using a thread pool via ThreadPoolTaskRunner(max_workers=...)
.
- By default as_subflow=True
, so you’ll see nice parent/child lineage in the UI. Set as_subflow=False
if you prefer not to.Marvin
08/12/2025, 9:35 AMprefect work-pool create my-ecs --type ecs
- Pull the default base job template, edit, then apply:
prefect work-pool get-default-base-job-template --type ecs --file ecs-base.json
# edit ecs-base.json
prefect work-pool update my-ecs --base-job-template ecs-base.json
In ecs-base.json, set at minimum:
- cluster (ARN or name)
- container_name
- image (or keep templated and set per-deployment)
- network_configuration (subnets, securityGroups, assignPublicIp as needed)
- execution_role_arn and task_role_arn
- Use EC2 capacity by setting task_run_request.capacityProviderStrategy to your EC2 capacity provider(s), e.g.:
"task_run_request": {
"capacityProviderStrategy": [
{ "capacityProvider": "my-ec2-provider", "weight": 1 }
],
"cluster": "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
"taskDefinition": "{{ task_definition_arn }}"
}
- Optional: set a work-pool concurrency limit to the max number of concurrent child runs you want.
- Start a worker that will submit ECS tasks:
prefect worker start -p my-ecs
2) Deploy your child flow(s) to that ECS pool
In Prefect 3.x, build deployments like this:
from prefect import Flow
Flow.from_source(
"<https://github.com/your-org/your-repo>",
"flows/child.py:child_flow",
).deploy(
name="child-on-ecs",
work_pool_name="my-ecs",
image="<http://123456789012.dkr.ecr.us-east-1.amazonaws.com/prefect-image:latest|123456789012.dkr.ecr.us-east-1.amazonaws.com/prefect-image:latest>",
)
Repeat for each child flow if they’re distinct, or reuse the same deployment with different parameters.
3) Decide where the parent runs
- Best practice: run the parent on a cheap/process worker or a different pool so it doesn’t tie up an ECS task while it’s waiting.
- If you must run parent on ECS too, ensure the ECS work pool concurrency can handle N children plus the parent (to avoid deadlocks).
Example: create a simple process pool for the parent
prefect work-pool create parent-process --type process
prefect worker start -p parent-process
4) Parent orchestration flow
- Use run_deployment() to launch N child runs. It links them as subflows by default (as_subflow=True), and you can pass tags and job_variables to control the ECS job.
- Then wait for them all to finish, and continue.
```
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from uuid import UUID
@flow
def parent_flow(n: int = 5):
log = get_run_logger()
# EC2-only capacity provider override for child jobs (if you don’t bake it into the pool)
job_vars = {
"task_run_request": {
"capacityProviderStrategy": [
{"capacityProvider": "my-ec2-provider", "weight": 1}
]
}
}
child_runs = []
for i in range(n):
fr = run_deployment(
name="child-flow/child-on-ecs", # "<flow_name>/<deployment_name>"
parameters={"index": i},
tags={"triggered-by": "parent"},
job_variables=job_vars, # optional if set in the pool template
flow_run_name=f"child-{i}",
# as_subflow=True by default: links the child to the parent in the UI
)
log.info(f"Started child {i}: {fr.id}")
child_runs.append(fr)
# Wait for all children to finish; this does not consume ECS capacity itself
for fr in child_runs:
wait_for_flow_run(fr.id, timeout=7200, log_states=True)
# Continue with whatever comes afterMarvin
08/12/2025, 9:35 AMRavish Mallya
08/12/2025, 9:53 AMMarvin
08/12/2025, 9:54 AMMarvin
08/12/2025, 9:55 AMprefect work-pool set-concurrency-limit my-ecs 10
prefect worker start -p my-ecs
- If you need more pickup throughput or HA, add more worker replicas. They can run:
- On the same EC2 box (multiple processes/containers), or
- On different EC2 instances for resilience (recommended).
- Ensure your ECS cluster’s EC2 capacity provider/ASG can scale to match the concurrency you set. Workers don’t provide compute; EC2 capacity does.
Parent on a process worker
- Your parent flow on a process worker (on a small EC2) is perfect. It will trigger N child deployments; each child run becomes its own ECS task/container. The parent just waits and then continues—no ECS capacity consumed while waiting.
Rule of thumb
- Increase pool concurrency first. If a single worker keeps up, you don’t need more workers.
- Add more workers for faster pickup and fault tolerance—not because each needs its own machine. Running them on separate instances/AZs is just good redundancy.Ravish Mallya
08/12/2025, 10:15 AMThey can run:
- On the same EC2 box
but having an additional worker on same machine, means the jobs will be created on same machine na? @MarvinMarvin
08/12/2025, 10:15 AMMarvin
08/12/2025, 10:16 AM