David Martin Calalang
08/19/2025, 5:48 PMMarvin
08/19/2025, 5:48 PMMarvin
08/19/2025, 5:52 PMfrom prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def do_work(i):
# your work here
return i * 2
@flow(task_runner=ConcurrentTaskRunner(max_workers=16))
def my_flow():
futures = [do_work.submit(i) for i in range(100)]
results = [f.result() for f in futures]
return results
- This runs tasks concurrently inside the single ECS task that’s executing the flow run.
- Good for IO-bound work. For heavy CPU, consider distributing (see next).
3) Distributed execution for one “parent” flow (many machines)
- Fan out into many child flow runs (each a separate ECS task) from inside your flow by calling run_deployment
repeatedly for a “child” deployment. That gives you real horizontal scale across ECS tasks.
- Alternatively, use a distributed task runner (Dask/Ray) via the Prefect collections if you already have a Dask/Ray cluster available. This keeps it one flow run but distributes work to the cluster.
Notes and sanity checks
- If you only run one flow at a time, you’ll only get one ECS task at a time. Parallel flow runs require multiple triggers (or a parent flow that fans out).
- Bound your blast radius with pool concurrency limits if needed; then increase limits as you scale out.
- Ensure your AWS side (cluster capacity, task definitions, service quotas) can handle the parallelism you ask for.
If you can share which flavor you want—parallel flow runs, parallel tasks within one run, or true distributed execution—I’ll give you the exact wiring.
Related reading (high level but useful):
- Work Pools and Workers
- Scaling parallel work (conceptual; in modern Prefect you’ll use .submit
instead of 1.x-style mapping)
- Prefect 3 GA overviewDavid Martin Calalang
08/19/2025, 7:15 PMMarvin
08/19/2025, 7:16 PMMarvin
08/19/2025, 7:16 PMfrom time import sleep
from random import randint
from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
@task
def fetch(i: int) -> int:
logger = get_run_logger()
d = randint(1, 3)
<http://logger.info|logger.info>(f"fetch {i}: sleeping {d}s")
sleep(d)
<http://logger.info|logger.info>(f"fetch {i}: done")
return i
@task
def combine(values: list[int]) -> int:
return sum(values)
@flow(name="parallel-ui-demo", task_runner=ConcurrentTaskRunner(max_workers=8))
def parallel_flow(n: int = 6) -> int:
# Fan-out: these run concurrently
futures = [fetch.submit(i) for i in range(n)]
# IMPORTANT for a clear graph: pass futures directly to the downstream task
# (don’t call .result() here, or the UI loses the explicit dependency edges)
total = combine.submit(futures)
# If you want a Python value in the driver at the end:
return total.result()
if __name__ == "__main__":
parallel_flow()
What you’ll see in the UI:
- A “parallel-ui-demo” flow run with N sibling fetch
tasks at the same level, running at the same time.
- One downstream combine
task that depends on all of them.
Notes:
- ConcurrentTaskRunner(max_workers=8)
controls intra-flow parallelism. Bump it up (within reason) to increase concurrency.
- This works the same on ECS: each flow run is one ECS task; the parallelism happens inside that task.
Light reading (if you insist):
- Scaling parallel work
- Work pools and workersDavid Martin Calalang
08/19/2025, 7:31 PMMarvin
08/19/2025, 7:31 PMMarvin
08/19/2025, 7:32 PMpip install "prefect-dask dask[distributed]"
- Or if you insist on being tidy: uv run --with 'prefect-dask' --with 'dask[distributed]' python your_script.py
Example with a local Dask cluster (parallel on the ECS task/container that’s running the flow):
from time import sleep
from random import randint
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner
from dask.distributed import LocalCluster
@task
def fetch(i: int) -> int:
logger = get_run_logger()
d = randint(1, 3)
<http://logger.info|logger.info>(f"fetch {i}: sleeping {d}s")
sleep(d)
<http://logger.info|logger.info>(f"fetch {i}: done")
return i
@task
def combine(values: list[int]) -> int:
return sum(values)
@flow(
name="dask-parallel-ui-demo",
task_runner=DaskTaskRunner(
cluster_class=LocalCluster,
cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}, # adjust as needed
),
)
def parallel_flow(n: int = 6) -> int:
# Fan-out: these run concurrently on Dask workers
futures = [fetch.submit(i) for i in range(n)]
# Keep the UI graph parallel by passing futures, not resolved results
total = combine.submit(futures)
return total.result()
if __name__ == "__main__":
parallel_flow()
What you’ll see in the UI:
- N sibling fetch tasks running at the same time (parallel branches)
- One downstream combine task depending on all of them
Want true distributed across multiple machines? Point the runner to an existing Dask scheduler:
@flow(task_runner=DaskTaskRunner(address="<tcp://dask-scheduler:8786>"))
def parallel_flow(n: int = 6):
futures = [fetch.submit(i) for i in range(n)]
return combine.submit(futures).result()
Notes:
- On ECS, the LocalCluster runs inside the single ECS task for that flow run. For multi-machine scale, use an external Dask cluster (ECS, EKS, EC2, etc.) and pass its scheduler address.
- Keep the futures wired into downstream tasks to preserve the parallel graph in the UI.
Docs, if you like reading more than watching things run:
- prefect-dask integration
- Dask distributedDavid Martin Calalang
08/19/2025, 8:07 PMMarvin
08/19/2025, 8:07 PMMarvin
08/19/2025, 8:08 PMMarvin
08/19/2025, 8:08 PMMarvin
08/19/2025, 8:14 PMMarvin
08/19/2025, 8:15 PMMarvin
08/19/2025, 8:15 PMMarvin
08/19/2025, 8:15 PM