Syméon del Marmol
10/16/2025, 7:16 AM@flow
async def my_flow(doc_ids: list[str]):
with ProcessPoolTaskRunner(max_workers=8) as runner:
futures = [runner.submit(my_task, {"doc_id": doc_id}) for doc_id in doc_ids]
wait(futures)
I also tried with the ProcessPoolTaskRunner being set directly in the flow config like this, but it has exactly the same result:
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
async def my_flow(doc_ids: list[str]):
futures = [my_task.submit(doc_id) for doc_id in doc_ids]
wait(futures)Marvin
10/16/2025, 7:16 AMMarvin
10/16/2025, 7:21 AMwith ProcessPoolTaskRunner(...) as runner: block exits, the runner (and its process pool) is shut down. You submit the tasks, then immediately exit the context, so nothing ever runs.
Fix: keep the wait inside the context (or collect results before leaving).
Example (sync flow):
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def my_task(doc_id: str) -> str:
return f"processed {doc_id}"
@flow
def my_flow(doc_ids: list[str]):
with ProcessPoolTaskRunner(max_workers=8) as runner:
futures = [runner.submit(my_task, {"doc_id": doc_id}) for doc_id in doc_ids]
# Block until all are done
for f in futures:
f.result()
2) In async flows, don’t call the blocking wait()
prefect.futures.wait is synchronous and will block your event loop in an async def flow, which can prevent work from being scheduled/executed.
Fix: use the flow-level task runner and await each future’s result_async() (e.g., with asyncio.gather).
Example (async flow, recommended):
import asyncio
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def my_task(doc_id: str) -> str:
return f"processed {doc_id}"
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
async def my_flow(doc_ids: list[str]):
futures = [my_task.submit(doc_id) for doc_id in doc_ids]
results = await asyncio.gather(*(f.result_async() for f in futures))
return results
Other important gotchas for ProcessPoolTaskRunner
- Tasks must be picklable: define @task functions at module top-level (not nested inside flows/other functions); avoid capturing non-picklable objects in closures.
- Arguments must be picklable: simple types like str are fine; avoid passing open connections, loggers, etc.
- On Windows/macOS (spawn), if you run this from a script, use the if __name__ == "__main__": guard. Prefect workers/flows launched by Prefect are fine, but ad‑hoc scripts need the guard.
- Keep the runner alive until results are gathered (as shown above).
If you still see “processes starting but tasks never start,” can you share:
- Your OS and where the flow is running (local process, Prefect worker, Docker, etc.)
- How my_task is defined (is it @task at module top-level? sync or async?)
- Any errors in DEBUG logs (set PREFECT_LOGGING_LEVEL=DEBUG)
With those details, I can help pinpoint further.Syméon del Marmol
10/16/2025, 7:38 AM@flow
def my_flow(doc_ids: list[str]):
with ProcessPoolTaskRunner(max_workers=8) as runner:
futures = [runner.submit(my_task, {"doc_id": doc_id}) for doc_id in doc_ids]
for future in futures:
future.result()
But I still encounter the same problem.
In the logs, I see that sub-processes are started:
Submitting task Process document to process pool executor...
I see the starting logs of the sub-processes, but then it does nothing more. Is the entry point no correct ? What command is used by prefect to spawn the sub-process and make it run the task ?
The same code works perfectly with the ThreadPoolTaskRunner.
I'm running that from a docker containerMarvin
10/16/2025, 7:38 AMMarvin
10/16/2025, 7:41 AMfuture.result() inside the with block is correct. Given that threads work but processes don’t, this almost always boils down to Python’s multiprocessing/pickling rules.
Answers to your questions first:
- There isn’t a shell command. Prefect’s ProcessPoolTaskRunner uses Python’s concurrent.futures.ProcessPoolExecutor, which in turn uses multiprocessing to spawn new Python interpreter processes. On macOS/Windows the start method is “spawn”; on most Linux it’s “fork”. The child process re-imports your module and unpickles the task function and its arguments, then Prefect executes the task within that process.
Why processes start but tasks never do
- If the task function or any argument is not picklable/importable from a module, the child process can’t load it. With “spawn”, the child will just sit there waiting for work it can deserialize. Threads don’t require pickling, which is why the same code can “work perfectly” with ThreadPoolTaskRunner.
Checklist to resolve
1) Ensure the task is defined at module top-level
- Not inside the flow or any function/class. This is critical.
- Quick check:
print(my_task.__module__, my_task.__qualname__)
# __qualname__ should NOT contain "<locals>"
2) Make sure the task and its arguments are picklable
- Avoid lambdas, local/inner functions, or bound methods.
- Don’t capture non-picklable objects in globals/closures (DB clients, loggers, sessions, open files, etc.). Create those inside the task function body if needed.
3) If you run from a script, add the main guard
- Required on Windows/macOS; often recommended everywhere when using multiprocessing.
if __name__ == "__main__":
my_flow(["a", "b", "c"])
4) Try an ultra-minimal repro task
- This isolates pickling/entrypoint issues.
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def my_task(doc_id: str) -> str:
return f"processed {doc_id}"
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def my_flow(doc_ids: list[str]):
futures = [my_task.submit(doc_id) for doc_id in doc_ids]
return [f.result() for f in futures]
if __name__ == "__main__":
print(my_flow(["1","2","3"]))
- If this works, the issue is with your original my_task definition or arguments. If it still hangs, we need to look at your environment.
5) Confirm your environment
- OS and Python version
- How the flow is launched (local script, Prefect worker, Docker, running inside a web server like Uvicorn/Gunicorn, Jupyter, etc.)
- Prefect version (3.x minor/patch)
6) Turn on debug logs
- Set PREFECT_LOGGING_LEVEL=DEBUG and re-run; child-process deserialization errors may surface in logs when the parent tries to retrieve results.
A note on “spawn” vs “fork”
- On macOS/Windows, “spawn” is used, which strictly requires the task to be importable by the child. On Linux, “fork” often “just works” even with less-than-ideal definitions, but can still bite if you capture non-picklable state. You can explicitly set a start method at program start if needed:
import multiprocessing as mp
mp.set_start_method("spawn", force=True) # or "fork" on Linux, if safe for your app
Use with care; “fork” can be unsafe in some environments.
Docs for reference:
- Prefect 3 Task Runners
- Python ProcessPoolExecutor
- Multiprocessing start methods
If you can share:
- The exact @task definition for my_task
- Where it’s defined (module vs inside the flow)
- How you’re launching the flow (script vs service)
I can pinpoint the exact cause. My top suspicion is that my_task isn’t top-level or captures a non-picklable object.