ASHIK
08/11/2025, 2:10 PMMarvin
08/11/2025, 2:10 PMMarvin
08/11/2025, 2:11 PMpip install psutil
import psutil
from prefect import flow, task, get_run_logger
def rss_mb():
return psutil.Process().memory_info().rss / (1024**2)
@task
def do_work():
log = get_run_logger()
<http://log.info|log.info>(f"RSS at start: {rss_mb():.1f} MiB")
# ... your code that might allocate memory ...
data = [bytearray(10_000_000) for _ in range(10)]
<http://log.info|log.info>(f"RSS after work: {rss_mb():.1f} MiB")
return len(data)
@flow
def my_flow():
return do_work()
if __name__ == "__main__":
my_flow()
Find where allocations come from: tracemalloc
- Built into Python; shows top allocation sites by file/line.
- Great for “who allocated this?” moments.
import tracemalloc
from prefect import flow, task, get_run_logger
@task
def heavy_task():
log = get_run_logger()
tracemalloc.start()
# ... your code ...
data = [bytearray(5_000_000) for _ in range(20)]
snap = tracemalloc.take_snapshot()
for stat in snap.statistics("lineno")[:5]:
<http://log.info|log.info>(str(stat))
tracemalloc.stop()
@flow
def my_flow():
heavy_task()
if __name__ == "__main__":
my_flow()
Line-by-line memory usage: memory_profiler
- Easiest way to see growth per line.
- Use the CLI for whole-script profiling, or the decorator for specific functions.
- Note: if your tasks run in separate worker processes (e.g., Dask/external infrastructure), profile in the process where the code executes.
pip install -U memory_profiler
mprof run python my_flow.py
mprof plot # opens a plot of memory over time
Or annotate specific functions:
from memory_profiler import profile
@profile
def your_function():
...
Production-grade, deep allocation tracing: Memray (Bloomberg)
- Powerful flamegraphs and allocation traces, low overhead for production-like runs.
- Excellent for leaks and unexpected growth.
pip install memray
memray run -o run.bin python my_flow.py
memray flamegraph run.bin # produces an HTML flamegraph
System/process metrics in containers/workers
- If you’re running Prefect tasks in containers or on Kubernetes, logging RSS with psutil (as above) still works and respects cgroups in most cases.
- For long-lived workers, you can emit periodic memory logs from tasks or a small background thread to spot trends.
Other useful tools
- Scalene (CPU + memory sampling): https://github.com/plasma-umass/scalene
- Fil (great for leak hunting): https://pythonspeed.com/fil/
- Python tracemalloc docs: https://docs.python.org/3/library/tracemalloc.html
- memory_profiler: https://github.com/pythonprofilers/memory_profiler
- Memray: https://bloomberg.github.io/memray/
- psutil: https://psutil.readthedocs.io/
Tips
- Reproduce with a smaller dataset or a focused task where possible.
- For leaks, run multiple iterations in a loop and watch RSS; if it climbs without returning, you’ve got a leak (or an intentional cache).
- Pandas/NumPy can copy more than you expect; watch chained operations.
- Set PYTHONMALLOC=debug to catch some allocator-level issues in CPython.
If you share:
- How you’re running (local script vs Prefect flow; single process vs distributed),
- The library stack you suspect (Pandas, NumPy, requests, etc.),
- And whether it’s a leak or a one-time spike,
…I’ll give you a targeted plan and a minimal repro that fits your setup. I know, thrilling.ASHIK
08/11/2025, 2:12 PMMarvin
08/11/2025, 2:12 PMMarvin
08/11/2025, 2:14 PMpip install psutil objgraph
import gc, time, psutil, tracemalloc, objgraph
def rss_mb():
return psutil.Process().memory_info().rss / (1024**2)
def suspect():
# TODO: call the code you think leaks, e.g. your task body
data = [bytearray(5_000_000) for _ in range(5)]
return len(data)
tracemalloc.start()
snap_start = tracemalloc.take_snapshot()
for i in range(1, 11):
suspect()
gc.collect()
time.sleep(0.1) # give allocators a moment
print(f"iter {i}: RSS {rss_mb():.1f} MiB")
snap_end = tracemalloc.take_snapshot()
print("\nTop allocation diffs:")
for stat in snap_end.compare_to(snap_start, "lineno")[:10]:
print(stat)
print("\nObject growth:")
objgraph.show_growth(limit=10)
Interpretation:
- If RSS rises across iterations and doesn’t settle after gc.collect()
, you likely have a leak (or a cache).
- The tracemalloc “diffs” point to files/lines accumulating allocations.
- objgraph.show_growth()
can reveal types that keep increasing.
2) Deep dive if needed
- Memray (HTML flamegraphs of long-lived allocations):
- Install: pip install memray
- Run: memray run -o run.bin python your_script.py
- Visualize: memray flamegraph run.bin
- Docs: https://bloomberg.github.io/memray/
- Fil (excellent for leak attribution, esp. NumPy/Pandas): https://pythonspeed.com/fil/
- If C extensions are involved, leaks may live outside Python objects; prefer Memray/Fil in that case.
Peak/steady-state measurement (how big does it get, and where)
- Quick whole-run view: memory_profiler’s mprof
- Install: pip install -U memory_profiler
- Run: mprof run python your_script.py
- Plot: mprof plot
- Docs: https://github.com/pythonprofilers/memory_profiler
- Line-by-line hotspots:
- Decorator:
from memory_profiler import profile
@profile
def heavy():
...
- Sampling profiler with CPU + memory: Scalene
- Install: pip install scalene
- Run: scalene your_script.py
- Repo: https://github.com/plasma-umass/scalene
- Production traces: Memray (again). Great to confirm peak phases and which call stacks allocate most.
Using these with Prefect 3.x
- Local flow runs: just run the snippets inside your @task
or @flow
. Log RSS before/after suspicious blocks.
- Distributed/external execution (e.g., DaskTaskRunner, containers, Kubernetes):
- Profile inside the task function so the profiler runs in the actual worker process.
- For trends, add periodic memory logging in your tasks/flows:
import psutil, threading, time
from prefect import flow, task, get_run_logger
def rss_mb():
return psutil.Process().memory_info().rss / (1024**2)
def start_mem_logger(interval=5):
stop = threading.Event()
def loop():
while not stop.is_set():
print(f"[MEM] RSS={rss_mb():.1f} MiB")
time.sleep(interval)
t = threading.Thread(target=loop, daemon=True)
t.start()
return stop
@task
def work():
stop = start_mem_logger(5)
try:
# do work
time.sleep(30)
finally:
stop.set()
@flow
def run():
work()
- If you suspect the orchestrator rather than your code, reproduce the same workload outside Prefect to isolate it. Or run the “leak probe” loop as a single Prefect task repeated N times.
Useful docs
- tracemalloc: https://docs.python.org/3/library/tracemalloc.html
- psutil: https://psutil.readthedocs.io/
If you can share:Marvin
08/11/2025, 2:14 PM