<@ULVA73B9P> I'm trying to write a flow that extra...
# marvin-ai
k
@Marvin I'm trying to write a flow that extracts files from a zip and then launches a flow to process each file separately. I want each subflow to run on a separate ECS task, concurrently. Is that possible to achieve? Ideally I'd have three flows: one to extract, one to process, and a parent flow that runs extract, gets results and fans out to spawn processing flow for each extracted file.
m
thought for 212.1 seconds
Yes—this is a great fit for Prefect 3.x. You can absolutely fan out to separate ECS tasks and run them concurrently by triggering a deployment for your processing flow per extracted file. The key is to use
run_deployment
to create separate flow runs (each picked up by your ECS work pool/worker), and make those launches non-blocking so they can all start concurrently. High-level pattern - Create a deployment for your processing flow that targets an ECS work pool (one ECS task per flow run). - Your parent flow: - runs the extract flow to produce a list of file references (ideally persisted in S3 or similar) - calls
run_deployment
once per file with
timeout=0
to launch all runs without waiting - optionally waits for all of them concurrently using
wait_for_flow_run
+
asyncio.gather
Important infrastructure note - Since each processing run happens in a separate ECS task, do not pass local file paths from the extractor. Persist extracted files to a shared/durable location (e.g., S3) and pass references (e.g., S3 keys) to the processing runs. Example flows
Copy code
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
import asyncio

@flow
def extract_files(zip_s3_key: str) -> list[str]:
    """
    Example: read the zip from S3, extract to S3, return a list of S3 keys for each file.
    Replace with your own logic.
    """
    logger = get_run_logger()
    # ... extract/put to S3 here ...
    file_keys = ["<s3://my-bucket/extracted/file1.csv>",
                 "<s3://my-bucket/extracted/file2.csv>",
                 "<s3://my-bucket/extracted/file3.csv>"]
    <http://logger.info|logger.info>(f"Extracted {len(file_keys)} files")
    return file_keys

@flow
def process_file(file_key: str):
    """
    Your processing flow logic (this is the one you'll deploy to ECS).
    """
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Processing {file_key}")
    # ... process file_key ...

@flow
async def parent_orchestrator(zip_s3_key: str):
    # 1) Extract to get file references
    file_keys = extract_files(zip_s3_key)

    # 2) Fan out: launch the processing deployment for each file
    #    - timeout=0 => return immediately so launches can happen concurrently
    #    - as_subflow=True keeps a nice parent/child view in the UI
    #    - work_queue_name is optional if your deployment already targets the right pool/queue
    runs = []
    for i, key in enumerate(file_keys, start=1):
        fr = run_deployment(
            name="process-file/process-on-ecs",  # "<flow-name>/<deployment-name>"
            parameters={"file_key": key},
            flow_run_name=f"process-{i}",
            timeout=0,
            as_subflow=True,
            # optional routing / overrides:
            # work_queue_name="ecs-default",
            # job_variables={"cpu": 1024, "memory": 2048, "env": {"FILE_COUNT": str(len(file_keys))}},
        )
        runs.append(fr)

    # 3) Optionally wait for all child runs concurrently
    results = await asyncio.gather(
        *(wait_for_flow_run(fr.id, log_states=True) for fr in runs)
    )
    # You can return states or do post-aggregation here
    return [r.state for r in results]
What makes the ECS fan-out work - You deploy
process_file
to an ECS work pool. Each
run_deployment
call creates a new flow run that the ECS worker turns into a separate ECS task. Concurrency is then governed by your ECS service quotas and any limits you set on the worker or pool. -
timeout=0
on
run_deployment
prevents blocking, so you can launch many runs quickly. -
wait_for_flow_run
allows you to monitor all runs concurrently with
asyncio.gather
if you want the parent to wait. A few practical tips - Use
flow_run_name
and/or
tags
to make child runs easy to identify. -
as_subflow=True
(default) keeps the parent/child relationship visible in the UI; set to False if you prefer independent runs.
-
job_variables
lets you override ECS task settings per run (e.g., CPU/memory/image/env). The exact keys depend on your ECS work pool’s job template—open the pool in the UI to see what’s available. See: Customize job variables. - Ensure your extracted files are accessible to ECS tasks (e.g., S3/GCS). Avoid local temp files. Helpful references - run_deployment guide: Run deployments from code - Python API for running/monitoring flow runs: prefect.deployments and prefect.flow_runs - ECS setup and guidance: Prefect + AWS ECS guide Questions to make sure we set you up right - Are you already on Prefect 3.x and using an ECS work pool? - Do you want the parent to wait for all processing runs to finish, or just fire-and-forget? - Will your extracted files live in S3 (or another shared store) so each ECS task can access them? If you share a bit about your current pool/deployment setup, I can tailor the job_variables and deployment commands for you.
k
@Marvin Let's say I'm using your example code, how would I deploy these flows?
m
thought for 343.7 seconds
Great question. Here’s a concise, working way to deploy the three flows from the example so your parent can fan out to ECS. Assumptions - Prefect 3.x - You’ll run the processing flow on ECS; the parent and extractor can run on a local “process” pool (or ECS too, if you prefer). 1) Create work pools - ECS work pool (for your per-file processing runs) - Process work pool (for your parent/extractor, unless you want them on ECS as well)
Copy code
# Install the AWS integration if you haven't
pip install prefect-aws

# Create pools
prefect work-pool create my-ecs --type ecs
prefect work-pool create process --type process
- For ECS: open the my-ecs pool in the UI and configure the base job template (cluster, subnets, roles, image, etc.). You generally do not need to run a local worker for ECS pools; the pool provisions ECS tasks per flow run. See: Prefect + AWS ECS guide - For the process pool: start a worker on any machine that can import your code
Copy code
prefect worker start -p process
2) Put your flows in a module (e.g., flows.py) Make sure your flow names match what you’ll use with run_deployment. By default the flow name is the function name, e.g.
process_file
. 3) Deploy the flows You can do this in Python (recommended for clarity) or with the CLI. Option A: Pythonic deployment with Flow.deploy Create a small script (deploy_flows.py):
Copy code
from flows import extract_files, process_file, parent_orchestrator

if __name__ == "__main__":
    # Parent + extractor to local 'process' pool
    extract_files.deploy(
        name="extract-local",
        work_pool_name="process",
    )

    parent_orchestrator.deploy(
        name="parent-orchestrator",
        work_pool_name="process",
    )

    # Processing flow to ECS pool (one ECS task per child run)
    process_file.deploy(
        name="process-on-ecs",
        work_pool_name="my-ecs",
        # Optional per-deployment ECS defaults you can override at runtime via job_variables:
        # job_variables={"cpu": 1024, "memory": 2048, "env": {"PREFECT_LOGGING_LEVEL": "INFO"}}
    )
Run:
Copy code
python deploy_flows.py
Option B: CLI deployment From your project root where flows.py lives:
Copy code
prefect deploy ./flows.py:extract_files --name extract-local --pool process
prefect deploy ./flows.py:parent_orchestrator --name parent-orchestrator --pool process
prefect deploy ./flows.py:process_file --name process-on-ecs --pool my-ecs
Notes: - The deployment name you pass (e.g.,
process-on-ecs
) is what you’ll combine with the flow name to call
run_deployment
, e.g.,
process_file/process-on-ecs
. - If you’re deploying from a Git repo for ECS, you can also use
Flow.from_source(...).deploy(...)
. See: Run deployments from code 4) Wire the parent fanout to the correct deployment name In your parent flow, make sure the
run_deployment
name matches exactly:
Copy code
fr = run_deployment(
    name="process_file/process-on-ecs",
    parameters={"file_key": key},
    flow_run_name=f"process-{i}",
    timeout=0,
    as_subflow=True,
    # Optionally route differently or tweak ECS per run:
    # work_queue_name="default",
    # job_variables={"cpu": 2048, "memory": 4096}
)
5) Kick off a test - Start your process worker:
Copy code
prefect worker start -p process
- Trigger the parent (or schedule it in the UI):
Copy code
prefect deployment run parent_orchestrator/parent-orchestrator -p zip_s3_key="<s3://my-bucket/archive.zip>" --watch
Helpful refs - Run deployments from code: Run deployments from code - Customize job variables (ECS CPU/memory/env/image overrides): Customize job variables - ECS guide: Prefect + AWS ECS guide
If you share where you’re hosting the code (local vs Git) and your ECS cluster setup, I can tailor the deploy snippet (including base job template and job_variables) for your exact environment.