Syméon del Marmol
10/09/2025, 9:06 AMMarvin
10/09/2025, 9:07 AMMarvin
10/09/2025, 9:19 AMtask.delay() and actual task start is higher than we’d expect.
Given your setup (Prefect 3.4.20, self-hosted, parent flow fanning out ~20 subflows that each submit background tasks, default ThreadPoolTaskRunner), the most common culprits are:
- Thread pool saturation in a single Python process (lots of subflows, each with tasks, competing for the same CPU/GIL)
- CPU-bound work in a thread pool (threads won’t help much under the GIL)
- Too-small/default max_workers on the task runner
- General orchestration overhead from very large bursts of submissions
A few quick questions to pinpoint it:
1) Roughly how many background tasks are you submitting in total across those 20 subflows?
2) Are the tasks CPU-bound or I/O-bound?
3) Do you see the delay only at high fan-out, or even with a handful of tasks?
Recommended next steps
1) Explicitly size your task runner
- If your tasks are I/O-bound, give the thread pool more workers. If CPU-bound, use a process pool.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
@task
def do_work(x):
# first line in the task to measure start time
import time, logging; logging.getLogger(__name__).info(f"task-start {x} at {time.time()}")
...
# I/O-bound: more threads
@flow(task_runner=ThreadPoolTaskRunner(max_workers=64))
def parent():
# submit many tasks
futures = [do_work.with_options(name=f"job-{i}").delay(i) for i in range(500)]
results = [f.result() for f in futures]
return results
# OR CPU-bound: processes
@flow(task_runner=ProcessPoolTaskRunner(max_workers=16))
def parent_cpu():
futures = [do_work.with_options(name=f"job-{i}").delay(i) for i in range(500)]
results = [f.result() for f in futures]
return results
If you’re using async and awaiting with asyncio.gather, the pattern is fine; just ensure the runner choice matches the workload:
@flow(task_runner=ThreadPoolTaskRunner(max_workers=64))
async def parent_async():
futures = [do_work.with_options(name=f"job-{i}").delay(i) for i in range(500)]
await asyncio.gather(*(f.result_async() for f in futures))
2) Consider fanning out to Task Workers (distributed execution)
If you want the subflows to offload work to separate processes or machines (and avoid a single process’ GIL/thread contention), start Task Workers that serve your tasks, then call .delay() from your flows — those runs will be executed by workers instead of local threads.
Serve your tasks on a worker node:
from prefect import task
from prefect.task_worker import serve
@task
def do_work(x): ...
# limit is the worker’s task concurrency
if __name__ == "__main__":
serve(do_work, limit=20)
Submit from your flow as usual:
@flow
def parent():
futures = [do_work.with_options(name=f"job-{i}").delay(i) for i in range(500)]
return [f.result() for f in futures]
- TaskWorker API: prefect.task_worker on GitHub
- Thread/Process task runners: prefect.task_runners on GitHub
3) Instrument to verify where time is spent
- Add a log line at the top of each task to capture when it really starts (as in the snippet above).
- Compare that to the timestamp where you call .delay() to measure queuing time.
- Enable debug logs to observe scheduling behavior (PREFECT_LOGGING_LEVEL=DEBUG).
4) Keep an eye on burst size
Even with larger thread/process pools, submitting thousands of tasks at once can introduce overhead. If you’re firing a very large burst from 20 subflows simultaneously, try batching submissions or reducing per-subflow concurrency slightly to smooth scheduling.
5) Upgrade to the latest 3.4.x
You’re on 3.4.20 — upgrading to the latest patch release is recommended for a variety of fixes and small perf improvements:Marvin
10/09/2025, 9:19 AMSyméon del Marmol
10/09/2025, 4:25 PMMarvin
10/09/2025, 4:25 PMSyméon del Marmol
10/09/2025, 4:28 PMMarvin
10/09/2025, 4:28 PMMarvin
10/09/2025, 4:29 PM@flow function like a normal function (e.g., result = my_flow()), the flow runs in the same process and main thread as your code. Prefect does not spin up a separate process/thread just for the flow. Any parallelism comes from how tasks are executed inside the flow.
- Tasks inside the flow are run by the flow’s task runner. By default, Prefect uses a thread-based runner (ThreadPoolTaskRunner), so task concurrency is via threads unless you change it.
- Yes — you can specify a task runner on the flow decorator. Examples:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
@flow # default: thread-based concurrency
def flow_default():
...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def flow_threads():
...
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def flow_processes():
...
- If you want distributed task execution (beyond a single process/machine), use an integration task runner like Dask or Ray:
from prefect import flow
from prefect_dask import DaskTaskRunner # pip install prefect-dask
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4}))
def flow_dask():
...
Docs: Prefect Dask integration
A note on background tasks since you’re using .delay() and `.result_async()`:
- Background tasks run on a Prefect task worker, not on the flow’s task runner. In other words, task_runner= does not affect tasks launched via .delay(). Concurrency for those is governed by the task worker settings/runtime.
- Docs: Background tasks in Prefect 3
Quick summary:
- Flow called as a function: runs in the same process; no extra process/thread for the flow itself.
- Task concurrency: controlled by the flow’s task runner (default threads; you can set processes/Dask/etc.).
- Background tasks (.delay()): bypass the task runner and are executed by task workers.
If you’re trying to “fan out” across multiple machines, prefer:
- Background tasks with multiple task workers
- Or a distributed task runner like Dask connected to a cluster
If you share a bit more about your fan-out target (single host vs cluster, CPU vs I/O bound), I can recommend a concrete setup.Marvin
10/09/2025, 4:31 PMProcessPoolTaskRunner uses a concurrent.futures.ProcessPoolExecutor under the hood. It lazily spawns worker processes up to max_workers and keeps them alive for the duration of the flow run.
- When a slot frees up, the next task is dispatched to an existing worker process; a new process is not started per task.
- The pool is created on first use and shut down when the flow (and thus the task runner context) finishes.
Config/caveats:
- `max_workers`: controls the pool size (defaults to CPU count if not set).
- Start method: Prefect doesn’t expose a setting; it uses Python’s default for your OS.
- Tasks and their inputs must be pickleable; module-level state in a worker process can persist across multiple tasks within the same flow, so avoid relying on mutable global state.
Links:
- Prefect implementation: prefect.task_runners source
- Python ProcessPoolExecutor behavior: ProcessPoolExecutor docsSyméon del Marmol
10/09/2025, 5:48 PMMarvin
10/09/2025, 5:48 PMMarvin
10/09/2025, 5:50 PM@flow inside a flow (a subflow), Prefect runs it in the same process and uses the same TaskRunner instance as the parent by default. You do not need to set task_runner= again for the subflow.
- If you decorate the subflow with its own task_runner=..., Prefect will use that runner for the duration of that subflow, then return to the parent’s runner afterward.
- This means pools are shared when inherited (e.g., the same process pool or thread pool), avoiding nested executors.
Example:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
@task
def compute(x):
return x * x
@flow # no task_runner set here -> will inherit from parent
def child_flow(values):
return [compute(v) for v in values]
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def parent_flow():
# child_flow will reuse the parent's process pool
res1 = child_flow(range(10))
# You can also define a subflow that overrides
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def thread_child(values):
return [compute(v) for v in values]
res2 = thread_child(range(10))
return res1, res2
Notes:
- Background tasks (.delay() / .result_async()) do not use the flow’s task runner; they run on task workers. So changing task_runner= on the flow does not affect those background tasks. Docs: Background tasks in Prefect 3
- Source references showing task_runner on flows and the runner classes:
- Flow decorator parameters (includes task_runner): prefect.flows source
- Task runners (Thread/Process): prefect.task_runners source
If you’re mixing subflows and background tasks, happy to help outline when to inherit the runner vs explicitly override it.Syméon del Marmol
10/09/2025, 5:55 PMMarvin
10/09/2025, 5:55 PMMarvin
10/09/2025, 5:56 PM.delay()), those tasks are executed by Prefect task workers, not by the flow’s task runner. They’ll run wherever your task workers are running (not necessarily in the same pod). See Background tasks in Prefect 3.
2) Specifying the internal task runner
- Set it on the flow decorator. This travels with your code and works the same whether your deployment runs on Kubernetes, Docker, VM, etc.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
@task
def work(i):
return i * i
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def my_flow():
# These tasks will run in child processes inside the pod
results = [work(i) for i in range(10)]
return results
- Subflows called from this flow inherit the parent’s task runner unless you explicitly override them on the subflow decorator.
- If you want distributed execution across nodes, use a distributed task runner like Dask:
from prefect import flow
from prefect_dask import DaskTaskRunner # pip install prefect-dask
@flow(task_runner=DaskTaskRunner(address="<tcp://dask-scheduler:8786>"))
def dask_flow():
...
Docs: Prefect Dask integration
Practical tips on Kubernetes:
- CPU limits/requests matter. If your pod has a CPU limit of 2, set ProcessPoolTaskRunner(max_workers=2) to avoid oversubscription.
- The task runner’s processes/threads are created inside the pod, so make sure the container image includes any libraries your tasks need.
- If you rely on background tasks for fan-out, run multiple task workers (they can also be on Kubernetes) to consume that background work; changing the flow’s task_runner will not affect .delay() tasks.
Kubernetes deployment docs: Running flows on Kubernetes
If you share your desired fan-out model (all work stays in the same pod vs spread across many pods/workers), I can suggest the concrete pattern: task_runner only, Dask, or background tasks with multiple workers.Syméon del Marmol
10/10/2025, 7:44 AMsubmit() to call a task or can I call the task as a function ?
When using submit() how can I wait for that task to complete, without blocking the thread ? (using an async function)Marvin
10/10/2025, 7:44 AMMarvin
10/10/2025, 7:51 AMtask.submit(...). Calling a task like a normal function (task(...)) runs it synchronously in-line; it won’t be scheduled on the runner and will block until it finishes.
- To wait for submitted tasks without blocking your async flow, await the futures’ async results with await fut.result_async(). For many, use asyncio.gather or asyncio.as_completed.
Examples:
- Direct call (synchronous, no runner concurrency):
from prefect import flow, task
@task
def work(x):
return x * x
@flow
def my_flow():
# Runs immediately and blocks here
res = work(3)
return res
- Submit for concurrent execution on the task runner:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import asyncio
@task
def work(x):
return x * x
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
async def my_flow():
# Schedule 10 tasks concurrently on the flow's process pool
futs = [work.submit(i) for i in range(10)]
# Option A: wait for all without blocking the event loop
results = await asyncio.gather(*(f.result_async() for f in futs))
# Option B: consume in completion order
for coro in asyncio.as_completed([f.result_async() for f in futs]):
res = await coro
...
return results
Notes:
- .submit() returns a PrefectFuture. Use await future.result_async() in async flows; use future.result() in sync flows.
- This is distinct from background tasks (.delay()), which bypass the flow’s task runner and run on task workers. Docs: Background tasks in Prefect 3
- Prefect futures source (including async APIs): prefect.futures sourceSyméon del Marmol
10/10/2025, 9:33 AMMarvin
10/10/2025, 9:33 AMMarvin
10/10/2025, 9:35 AMtask.submit(...) returns a PrefectConcurrentFuture that doesn’t expose `wait_async`/`result_async`. Those async helpers are available on distributed/background-style futures (e.g., from .delay() or certain distributed runners).
In an async flow, here are safe, non-blocking patterns to await concurrent futures:
Option A — await the underlying concurrent.futures.Future
- PrefectConcurrentFuture wraps a concurrent.futures.Future. You can await it by wrapping it with asyncio.
import asyncio
futs = [my_task.submit(i) for i in range(10)]
# Await completion without blocking the event loop
await asyncio.gather(*[asyncio.wrap_future(f.wrapped_future) for f in futs])
# Now fetch results (this is quick and non-blocking because they're done)
results = [f.result() for f in futs]
If your version exposes a private attribute (e.g., _wrapped_future) instead of wrapped_future, you can use that in the same way; otherwise use Option B.
Option B — run the blocking result call in a thread
- This keeps the event loop free, at the cost of using a thread per wait.
import asyncio
futs = [my_task.submit(i) for i in range(10)]
loop = asyncio.get_running_loop()
results = await asyncio.gather(
*[loop.run_in_executor(None, f.result) for f in futs]
)
A couple of notes:
- Direct f.result() blocks; only call it after ensuring completion (Option A) or via run_in_executor (Option B).
- If you specifically want native async .result_async(), use background tasks (.delay()) or a distributed runner that returns a PrefectDistributedFuture. Background tasks docs: Background tasks in Prefect 3
- Prefect futures source for reference: prefect.futures source
If you want, I can tailor this to your exact version (3.4.14) and show a minimal snippet with your task + runner setup.