Kerem Kaynak
10/28/2025, 12:31 PMMarvin
10/28/2025, 12:31 PMMarvin
10/28/2025, 12:34 PMrun_deployment to create separate flow runs (each picked up by your ECS work pool/worker), and make those launches non-blocking so they can all start concurrently.
High-level pattern
- Create a deployment for your processing flow that targets an ECS work pool (one ECS task per flow run).
- Your parent flow:
- runs the extract flow to produce a list of file references (ideally persisted in S3 or similar)
- calls run_deployment once per file with timeout=0 to launch all runs without waiting
- optionally waits for all of them concurrently using wait_for_flow_run + asyncio.gather
Important infrastructure note
- Since each processing run happens in a separate ECS task, do not pass local file paths from the extractor. Persist extracted files to a shared/durable location (e.g., S3) and pass references (e.g., S3 keys) to the processing runs.
Example flows
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
import asyncio
@flow
def extract_files(zip_s3_key: str) -> list[str]:
"""
Example: read the zip from S3, extract to S3, return a list of S3 keys for each file.
Replace with your own logic.
"""
logger = get_run_logger()
# ... extract/put to S3 here ...
file_keys = ["<s3://my-bucket/extracted/file1.csv>",
"<s3://my-bucket/extracted/file2.csv>",
"<s3://my-bucket/extracted/file3.csv>"]
<http://logger.info|logger.info>(f"Extracted {len(file_keys)} files")
return file_keys
@flow
def process_file(file_key: str):
"""
Your processing flow logic (this is the one you'll deploy to ECS).
"""
logger = get_run_logger()
<http://logger.info|logger.info>(f"Processing {file_key}")
# ... process file_key ...
@flow
async def parent_orchestrator(zip_s3_key: str):
# 1) Extract to get file references
file_keys = extract_files(zip_s3_key)
# 2) Fan out: launch the processing deployment for each file
# - timeout=0 => return immediately so launches can happen concurrently
# - as_subflow=True keeps a nice parent/child view in the UI
# - work_queue_name is optional if your deployment already targets the right pool/queue
runs = []
for i, key in enumerate(file_keys, start=1):
fr = run_deployment(
name="process-file/process-on-ecs", # "<flow-name>/<deployment-name>"
parameters={"file_key": key},
flow_run_name=f"process-{i}",
timeout=0,
as_subflow=True,
# optional routing / overrides:
# work_queue_name="ecs-default",
# job_variables={"cpu": 1024, "memory": 2048, "env": {"FILE_COUNT": str(len(file_keys))}},
)
runs.append(fr)
# 3) Optionally wait for all child runs concurrently
results = await asyncio.gather(
*(wait_for_flow_run(fr.id, log_states=True) for fr in runs)
)
# You can return states or do post-aggregation here
return [r.state for r in results]
What makes the ECS fan-out work
- You deploy process_file to an ECS work pool. Each run_deployment call creates a new flow run that the ECS worker turns into a separate ECS task. Concurrency is then governed by your ECS service quotas and any limits you set on the worker or pool.
- timeout=0 on run_deployment prevents blocking, so you can launch many runs quickly.
- wait_for_flow_run allows you to monitor all runs concurrently with asyncio.gather if you want the parent to wait.
A few practical tips
- Use flow_run_name and/or tags to make child runs easy to identify.
- as_subflow=True (default) keeps the parent/child relationship visible in the UI; set to False if you prefer independent runs.Marvin
10/28/2025, 12:34 PMjob_variables lets you override ECS task settings per run (e.g., CPU/memory/image/env). The exact keys depend on your ECS work pool’s job template—open the pool in the UI to see what’s available. See: Customize job variables.
- Ensure your extracted files are accessible to ECS tasks (e.g., S3/GCS). Avoid local temp files.
Helpful references
- run_deployment guide: Run deployments from code
- Python API for running/monitoring flow runs: prefect.deployments and prefect.flow_runs
- ECS setup and guidance: Prefect + AWS ECS guide
Questions to make sure we set you up right
- Are you already on Prefect 3.x and using an ECS work pool?
- Do you want the parent to wait for all processing runs to finish, or just fire-and-forget?
- Will your extracted files live in S3 (or another shared store) so each ECS task can access them?
If you share a bit about your current pool/deployment setup, I can tailor the job_variables and deployment commands for you.Kerem Kaynak
10/28/2025, 12:44 PMMarvin
10/28/2025, 12:44 PMMarvin
10/28/2025, 12:50 PM# Install the AWS integration if you haven't
pip install prefect-aws
# Create pools
prefect work-pool create my-ecs --type ecs
prefect work-pool create process --type process
- For ECS: open the my-ecs pool in the UI and configure the base job template (cluster, subnets, roles, image, etc.). You generally do not need to run a local worker for ECS pools; the pool provisions ECS tasks per flow run. See: Prefect + AWS ECS guide
- For the process pool: start a worker on any machine that can import your code
prefect worker start -p process
2) Put your flows in a module (e.g., flows.py)
Make sure your flow names match what you’ll use with run_deployment. By default the flow name is the function name, e.g. process_file.
3) Deploy the flows
You can do this in Python (recommended for clarity) or with the CLI.
Option A: Pythonic deployment with Flow.deploy
Create a small script (deploy_flows.py):
from flows import extract_files, process_file, parent_orchestrator
if __name__ == "__main__":
# Parent + extractor to local 'process' pool
extract_files.deploy(
name="extract-local",
work_pool_name="process",
)
parent_orchestrator.deploy(
name="parent-orchestrator",
work_pool_name="process",
)
# Processing flow to ECS pool (one ECS task per child run)
process_file.deploy(
name="process-on-ecs",
work_pool_name="my-ecs",
# Optional per-deployment ECS defaults you can override at runtime via job_variables:
# job_variables={"cpu": 1024, "memory": 2048, "env": {"PREFECT_LOGGING_LEVEL": "INFO"}}
)
Run:
python deploy_flows.py
Option B: CLI deployment
From your project root where flows.py lives:
prefect deploy ./flows.py:extract_files --name extract-local --pool process
prefect deploy ./flows.py:parent_orchestrator --name parent-orchestrator --pool process
prefect deploy ./flows.py:process_file --name process-on-ecs --pool my-ecs
Notes:
- The deployment name you pass (e.g., process-on-ecs) is what you’ll combine with the flow name to call run_deployment, e.g., process_file/process-on-ecs.
- If you’re deploying from a Git repo for ECS, you can also use Flow.from_source(...).deploy(...). See: Run deployments from code
4) Wire the parent fanout to the correct deployment name
In your parent flow, make sure the run_deployment name matches exactly:
fr = run_deployment(
name="process_file/process-on-ecs",
parameters={"file_key": key},
flow_run_name=f"process-{i}",
timeout=0,
as_subflow=True,
# Optionally route differently or tweak ECS per run:
# work_queue_name="default",
# job_variables={"cpu": 2048, "memory": 4096}
)
5) Kick off a test
- Start your process worker:
prefect worker start -p process
- Trigger the parent (or schedule it in the UI):
prefect deployment run parent_orchestrator/parent-orchestrator -p zip_s3_key="<s3://my-bucket/archive.zip>" --watch
Helpful refs
- Run deployments from code: Run deployments from code
- Customize job variables (ECS CPU/memory/env/image overrides): Customize job variables
- ECS guide: Prefect + AWS ECS guideMarvin
10/28/2025, 12:50 PM