<@ULVA73B9P> how can I set the number of thread pe...
# marvin-ai
m
@Marvin how can I set the number of thread per worker in a ProcessPoolTaskRunner?
m
thought for 267.8 seconds
Short answer: you can’t set “threads per worker” on a ProcessPoolTaskRunner because each worker is a separate process, not a thread. The only knob on ProcessPoolTaskRunner is
max_workers
, which controls how many processes are in the pool. If you want to configure it:
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def compute(x):
    return x * x

@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def my_flow():
    futures = [compute.submit(i) for i in range(10)]
    return [f.result() for f in futures]
If you actually want thread-level parallelism (i.e., number of threads), use
ThreadPoolTaskRunner
instead:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def my_flow():
    ...
If you need multiple threads inside each process for some reason, you’d implement that within your task code (e.g., spin up a local ThreadPoolExecutor inside the task), but the task runner won’t manage “threads per process” for you. Docs for reference: - Task runners in Prefect 3 - Running work concurrently Just to confirm: are you aiming to control the number of processes running tasks concurrently, or do you want multiple threads inside each task process?
m
@Marvin this is possible to set it in a DaskTaskRunner, my idea is that I want limit it to one, sometimes numerical librairies thorws a lot of unattended multiple thrades that I can to limit. Its its not possible with ProcessPoolTaskRunner; I will keep using DaskTaskRunner
m
thought for 47.8 seconds
You’re right: ProcessPoolTaskRunner can’t set “threads per worker.” DaskTaskRunner is the right choice when you want to constrain threads, e.g., to 1. Example with a local Dask cluster and a single thread per worker:
Copy code
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def compute(x):
    return x * x

@flow(
    task_runner=DaskTaskRunner(
        cluster_class="distributed.LocalCluster",
        cluster_kwargs={
            "n_workers": 4,           # number of workers (processes)
            "threads_per_worker": 1,  # limit threads per worker
        },
    )
)
def my_flow():
    futures = [compute.submit(i) for i in range(10)]
    return [f.result() for f in futures]
If you ever need to stick with ProcessPoolTaskRunner but still limit “unattended” threading from numeric libs (BLAS/MKL/OpenMP), set these environment variables before importing those libraries (and as early as possible, e.g., in your flow entrypoint or the worker environment):
Copy code
import os
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["BLIS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
These will propagate to subprocesses and are respected by NumPy/SciPy/Pandas stacks. References: - prefect-dask (integration) - Dask LocalCluster docs (see threads_per_worker) If you’re connecting to an existing Dask cluster instead of creating one in-flow, set threads on the workers themselves (e.g., Dask worker
--nthreads 1
) and point the task runner at the scheduler with
address="tcp://..."
. Want me to tailor this to your setup (local, Docker, Kubernetes, or an existing Dask cluster)?
m
@Marvin ok in the end, it seems to be a missing feature of ProcessPoolTaskRunner
m
thought for 15.1 seconds
You’re correct — in Prefect 3.x, ProcessPoolTaskRunner doesn’t support configuring “threads per worker.” It only exposes
max_workers
(number of processes). This is by design since it wraps Python’s
concurrent.futures.ProcessPoolExecutor
, where each worker is a separate process and not a threaded worker. Workarounds: - If the goal is to tame extra threads from BLAS/MKL/OpenMP, set env vars like
OMP_NUM_THREADS=1
,
MKL_NUM_THREADS=1
, etc., as early as possible so they apply to all subprocesses. - If you want first-class control of threads per worker, keep using DaskTaskRunner and set
threads_per_worker=1
. Refs: - Task runners in Prefect 3 - Dask LocalCluster (threads_per_worker) If you want, I can help you set the right env vars in your deployment/worker setup (local, Docker, or Kubernetes) so ProcessPoolTaskRunner runs single-threaded numerical libs.