Mehdi Lallouache
11/05/2025, 9:46 AMMarvin
11/05/2025, 9:46 AMMarvin
11/05/2025, 9:50 AMmax_workers, which controls how many processes are in the pool.
If you want to configure it:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def compute(x):
return x * x
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def my_flow():
futures = [compute.submit(i) for i in range(10)]
return [f.result() for f in futures]
If you actually want thread-level parallelism (i.e., number of threads), use ThreadPoolTaskRunner instead:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow():
...
If you need multiple threads inside each process for some reason, you’d implement that within your task code (e.g., spin up a local ThreadPoolExecutor inside the task), but the task runner won’t manage “threads per process” for you.
Docs for reference:
- Task runners in Prefect 3
- Running work concurrently
Just to confirm: are you aiming to control the number of processes running tasks concurrently, or do you want multiple threads inside each task process?Mehdi Lallouache
11/05/2025, 9:56 AMMarvin
11/05/2025, 9:56 AMMarvin
11/05/2025, 9:57 AMfrom prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def compute(x):
return x * x
@flow(
task_runner=DaskTaskRunner(
cluster_class="distributed.LocalCluster",
cluster_kwargs={
"n_workers": 4, # number of workers (processes)
"threads_per_worker": 1, # limit threads per worker
},
)
)
def my_flow():
futures = [compute.submit(i) for i in range(10)]
return [f.result() for f in futures]
If you ever need to stick with ProcessPoolTaskRunner but still limit “unattended” threading from numeric libs (BLAS/MKL/OpenMP), set these environment variables before importing those libraries (and as early as possible, e.g., in your flow entrypoint or the worker environment):
import os
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["BLIS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
These will propagate to subprocesses and are respected by NumPy/SciPy/Pandas stacks.
References:
- prefect-dask (integration)
- Dask LocalCluster docs (see threads_per_worker)
If you’re connecting to an existing Dask cluster instead of creating one in-flow, set threads on the workers themselves (e.g., Dask worker --nthreads 1) and point the task runner at the scheduler with address="tcp://...".
Want me to tailor this to your setup (local, Docker, Kubernetes, or an existing Dask cluster)?Mehdi Lallouache
11/05/2025, 9:58 AMMarvin
11/05/2025, 9:58 AMMarvin
11/05/2025, 9:58 AMmax_workers (number of processes). This is by design since it wraps Python’s concurrent.futures.ProcessPoolExecutor, where each worker is a separate process and not a threaded worker.
Workarounds:
- If the goal is to tame extra threads from BLAS/MKL/OpenMP, set env vars like OMP_NUM_THREADS=1, MKL_NUM_THREADS=1, etc., as early as possible so they apply to all subprocesses.
- If you want first-class control of threads per worker, keep using DaskTaskRunner and set threads_per_worker=1.
Refs:
- Task runners in Prefect 3
- Dask LocalCluster (threads_per_worker)
If you want, I can help you set the right env vars in your deployment/worker setup (local, Docker, or Kubernetes) so ProcessPoolTaskRunner runs single-threaded numerical libs.