Syméon del Marmol
10/20/2025, 11:40 AMMarvin
10/20/2025, 11:40 AMMarvin
10/20/2025, 11:44 AM.delay() and await with .result_async() (your current pattern is correct). You can dynamically name each task with .with_options(name=...) as you’re doing.
- Example:
from prefect import flow, task
import asyncio
@task
async def do_work(x):
# ... your unit of work ...
return x * 2
@flow
async def parent(items):
futs = [
do_work.with_options(name=f"do-work-{i}").delay(x)
for i, x in enumerate(items)
]
results = await asyncio.gather(*(f.result_async() for f in futs))
return results
- Control overall concurrency via your work pool/worker concurrency and, if useful, Prefect concurrency limits.
- If you want CPU-bound parallelism on the same machine where the flow runs:
- Configure the flow’s task runner to ProcessPoolTaskRunner. Then use .submit() to fan out work; it will run in multiple processes on that machine.
- Note: all task callables and inputs must be picklable for process pools.
- Example:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def cpu_bound(x):
# ... CPU-heavy work ...
return x * x
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def parent(items):
futs = [cpu_bound.submit(x) for x in items]
return [f.result() for f in futs]
- If you need both distributed fan-out and local CPU parallelism:
- Put the CPU-heavy section in a subflow that uses ProcessPoolTaskRunner, and call that subflow from your parent flow (which can still fan out with .delay()).
- Example:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import asyncio
@task
def cpu_bound(x):
return x * x
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def local_parallel(batch):
futs = [cpu_bound.submit(x) for x in batch]
return [f.result() for f in futs]
@flow
async def parent(batches):
futs = [
local_parallel.with_options(name=f"batch-{i}").delay(batch)
for i, batch in enumerate(batches)
]
results = await asyncio.gather(*(f.result_async() for f in futs))
return [item for batch in results for item in batch]
A few notes
- `.submit()`/`.map()` require a running flow context and always use the flow’s configured task runner.
- Background tasks submitted with .delay() are scheduled for execution by workers and are independent of the flow’s task runner.
- If you truly need to parallelize inside a single task without changing the flow’s runner, you can use Python’s concurrent.futures.ProcessPoolExecutor directly inside that task, but you’ll lose Prefect-level visibility into those inner units of work.
Docs for reference:
- Task runners
- Run work concurrently
- Background tasks
If you can share whether your goal is “spread work across workers” vs “speed up CPU-bound work on the same box,” I can recommend the exact pattern and configuration.