<@ULVA73B9P> Can I use a ProcessPoolTaskRunner wit...
# ask-marvin
s
@Marvin Can I use a ProcessPoolTaskRunner within a task, to run sub-tasks ?
m
thought for 240.7 seconds
Short answer: No. You can’t start or use a ProcessPoolTaskRunner inside a task. Task runners are configured at the flow (or subflow) level and are started when the flow begins. Any calls to `.submit()`/`.map()` from inside tasks will still use the parent flow’s task runner; spinning up a new runner inside a task isn’t supported. What to use instead - If you want to fan out across multiple task workers (distributed): - Keep using background tasks with
.delay()
and await with
.result_async()
(your current pattern is correct). You can dynamically name each task with
.with_options(name=...)
as you’re doing. - Example:
Copy code
from prefect import flow, task
    import asyncio

    @task
    async def do_work(x):
        # ... your unit of work ...
        return x * 2

    @flow
    async def parent(items):
        futs = [
            do_work.with_options(name=f"do-work-{i}").delay(x)
            for i, x in enumerate(items)
        ]
        results = await asyncio.gather(*(f.result_async() for f in futs))
        return results
- Control overall concurrency via your work pool/worker concurrency and, if useful, Prefect concurrency limits. - If you want CPU-bound parallelism on the same machine where the flow runs: - Configure the flow’s task runner to
ProcessPoolTaskRunner
. Then use
.submit()
to fan out work; it will run in multiple processes on that machine. - Note: all task callables and inputs must be picklable for process pools. - Example:
Copy code
from prefect import flow, task
    from prefect.task_runners import ProcessPoolTaskRunner

    @task
    def cpu_bound(x):
        # ... CPU-heavy work ...
        return x * x

    @flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
    def parent(items):
        futs = [cpu_bound.submit(x) for x in items]
        return [f.result() for f in futs]
- If you need both distributed fan-out and local CPU parallelism: - Put the CPU-heavy section in a subflow that uses
ProcessPoolTaskRunner
, and call that subflow from your parent flow (which can still fan out with
.delay()
). - Example:
Copy code
from prefect import flow, task
    from prefect.task_runners import ProcessPoolTaskRunner
    import asyncio

    @task
    def cpu_bound(x):
        return x * x

    @flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
    def local_parallel(batch):
        futs = [cpu_bound.submit(x) for x in batch]
        return [f.result() for f in futs]

    @flow
    async def parent(batches):
        futs = [
            local_parallel.with_options(name=f"batch-{i}").delay(batch)
            for i, batch in enumerate(batches)
        ]
        results = await asyncio.gather(*(f.result_async() for f in futs))
        return [item for batch in results for item in batch]
A few notes - `.submit()`/`.map()` require a running flow context and always use the flow’s configured task runner. - Background tasks submitted with
.delay()
are scheduled for execution by workers and are independent of the flow’s task runner. - If you truly need to parallelize inside a single task without changing the flow’s runner, you can use Python’s
concurrent.futures.ProcessPoolExecutor
directly inside that task, but you’ll lose Prefect-level visibility into those inner units of work. Docs for reference: - Task runners - Run work concurrently - Background tasks If you can share whether your goal is “spread work across workers” vs “speed up CPU-bound work on the same box,” I can recommend the exact pattern and configuration.