<@ULVA73B9P> I'm using the ProcessPoolTaskRunner t...
# ask-marvin
s
@Marvin I'm using the ProcessPoolTaskRunner to start tasks in my flow. I see in the logs that new processes are starting as expected, but they never start the submitted tasks themselves. Here is my code, what do I do wrong ?
Copy code
@flow
async def my_flow(doc_ids: list[str]):
    with ProcessPoolTaskRunner(max_workers=8) as runner:
        futures = [runner.submit(my_task, {"doc_id": doc_id}) for doc_id in doc_ids]
    wait(futures)
I also tried with the ProcessPoolTaskRunner being set directly in the flow config like this, but it has exactly the same result:
Copy code
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
async def my_flow(doc_ids: list[str]):
    futures = [my_task.submit(doc_id) for doc_id in doc_ids]
    wait(futures)
m
thought for 297.7 seconds
A couple of things are tripping you up here. 1) In your first snippet you’re closing the runner before you wait When the
with ProcessPoolTaskRunner(...) as runner:
block exits, the runner (and its process pool) is shut down. You submit the tasks, then immediately exit the context, so nothing ever runs. Fix: keep the wait inside the context (or collect results before leaving). Example (sync flow):
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def my_task(doc_id: str) -> str:
    return f"processed {doc_id}"

@flow
def my_flow(doc_ids: list[str]):
    with ProcessPoolTaskRunner(max_workers=8) as runner:
        futures = [runner.submit(my_task, {"doc_id": doc_id}) for doc_id in doc_ids]
        # Block until all are done
        for f in futures:
            f.result()
2) In async flows, don’t call the blocking wait()
prefect.futures.wait
is synchronous and will block your event loop in an
async def
flow, which can prevent work from being scheduled/executed. Fix: use the flow-level task runner and await each future’s
result_async()
(e.g., with
asyncio.gather
). Example (async flow, recommended):
Copy code
import asyncio
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def my_task(doc_id: str) -> str:
    return f"processed {doc_id}"

@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
async def my_flow(doc_ids: list[str]):
    futures = [my_task.submit(doc_id) for doc_id in doc_ids]
    results = await asyncio.gather(*(f.result_async() for f in futures))
    return results
Other important gotchas for ProcessPoolTaskRunner - Tasks must be picklable: define
@task
functions at module top-level (not nested inside flows/other functions); avoid capturing non-picklable objects in closures. - Arguments must be picklable: simple types like
str
are fine; avoid passing open connections, loggers, etc. - On Windows/macOS (spawn), if you run this from a script, use the
if __name__ == "__main__":
guard. Prefect workers/flows launched by Prefect are fine, but ad‑hoc scripts need the guard. - Keep the runner alive until results are gathered (as shown above). If you still see “processes starting but tasks never start,” can you share: - Your OS and where the flow is running (local process, Prefect worker, Docker, etc.) - How
my_task
is defined (is it
@task
at module top-level? sync or async?) - Any errors in DEBUG logs (set
PREFECT_LOGGING_LEVEL=DEBUG
) With those details, I can help pinpoint further.
s
@Marvin I changed the flow like this:
Copy code
@flow
def my_flow(doc_ids: list[str]):
    with ProcessPoolTaskRunner(max_workers=8) as runner:
        futures = [runner.submit(my_task, {"doc_id": doc_id}) for doc_id in doc_ids]
        for future in futures:
            future.result()
But I still encounter the same problem. In the logs, I see that sub-processes are started:
Copy code
Submitting task Process document to process pool executor...
I see the starting logs of the sub-processes, but then it does nothing more. Is the entry point no correct ? What command is used by prefect to spawn the sub-process and make it run the task ? The same code works perfectly with the
ThreadPoolTaskRunner
. I'm running that from a docker container
m
thought for 203.6 seconds
Thanks for the update — keeping
future.result()
inside the
with
block is correct. Given that threads work but processes don’t, this almost always boils down to Python’s multiprocessing/pickling rules. Answers to your questions first: - There isn’t a shell command. Prefect’s
ProcessPoolTaskRunner
uses Python’s
concurrent.futures.ProcessPoolExecutor
, which in turn uses
multiprocessing
to spawn new Python interpreter processes. On macOS/Windows the start method is “spawn”; on most Linux it’s “fork”. The child process re-imports your module and unpickles the task function and its arguments, then Prefect executes the task within that process. Why processes start but tasks never do - If the task function or any argument is not picklable/importable from a module, the child process can’t load it. With “spawn”, the child will just sit there waiting for work it can deserialize. Threads don’t require pickling, which is why the same code can “work perfectly” with
ThreadPoolTaskRunner
. Checklist to resolve 1) Ensure the task is defined at module top-level - Not inside the flow or any function/class. This is critical. - Quick check:
Copy code
print(my_task.__module__, my_task.__qualname__)
  # __qualname__ should NOT contain "<locals>"
2) Make sure the task and its arguments are picklable - Avoid lambdas, local/inner functions, or bound methods. - Don’t capture non-picklable objects in globals/closures (DB clients, loggers, sessions, open files, etc.). Create those inside the task function body if needed. 3) If you run from a script, add the main guard - Required on Windows/macOS; often recommended everywhere when using multiprocessing.
Copy code
if __name__ == "__main__":
      my_flow(["a", "b", "c"])
4) Try an ultra-minimal repro task - This isolates pickling/entrypoint issues.
Copy code
from prefect import flow, task
  from prefect.task_runners import ProcessPoolTaskRunner

  @task
  def my_task(doc_id: str) -> str:
      return f"processed {doc_id}"

  @flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
  def my_flow(doc_ids: list[str]):
      futures = [my_task.submit(doc_id) for doc_id in doc_ids]
      return [f.result() for f in futures]

  if __name__ == "__main__":
      print(my_flow(["1","2","3"]))
- If this works, the issue is with your original
my_task
definition or arguments. If it still hangs, we need to look at your environment. 5) Confirm your environment - OS and Python version - How the flow is launched (local script, Prefect worker, Docker, running inside a web server like Uvicorn/Gunicorn, Jupyter, etc.) - Prefect version (3.x minor/patch) 6) Turn on debug logs - Set
PREFECT_LOGGING_LEVEL=DEBUG
and re-run; child-process deserialization errors may surface in logs when the parent tries to retrieve results. A note on “spawn” vs “fork” - On macOS/Windows, “spawn” is used, which strictly requires the task to be importable by the child. On Linux, “fork” often “just works” even with less-than-ideal definitions, but can still bite if you capture non-picklable state. You can explicitly set a start method at program start if needed:
Copy code
import multiprocessing as mp
  mp.set_start_method("spawn", force=True)  # or "fork" on Linux, if safe for your app
Use with care; “fork” can be unsafe in some environments. Docs for reference: - Prefect 3 Task Runners - Python ProcessPoolExecutor - Multiprocessing start methods If you can share: - The exact
@task
definition for
my_task
- Where it’s defined (module vs inside the flow) - How you’re launching the flow (script vs service) I can pinpoint the exact cause. My top suspicion is that
my_task
isn’t top-level or captures a non-picklable object.