https://prefect.io logo
Title
m

Matthias

06/10/2020, 8:01 AM
Hi, i am running my flows on a RemoteDask Environment. I can monitor how the memory usage accumulates over time (with every scheduled run) until at one point the memory is full, dask becomes unresponsive and logs this:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 3.43 GB -- Worker memory limit: 4.18 GB
All the runs complete successfully and I have the feeling, but still the memory used does not get released. I am not really sure where to start debugging. Is there a way to force a memory release? The only other option I currently see is to force a dask-worker restart after the flow run finishes, but that feels very hacky.
j

Jim Crist-Harif

06/10/2020, 4:25 PM
Hi Matthias, there are a few reasons this could be occurring: • A bug in your flow code. Do your tasks create resources (e.g. db connections, boto client objects, etc...) that may not be being cleaned up properly? • The default Python malloc implementation actually performs fairly poorly in the presence of both Threads and large buffers (e.g. numpy, pandas things (this behavior shows up more with pandas, but still happens with numpy)). Over time this can fragment the memory. The warning from dask you're seeing checks the RSS, so it won't know the difference between fragmented memory and memory in use. Does your workload create a lot of numpy or pandas objects? Are you using theaded workers? • A bug in prefect, keeping objects around longer than they're needed. This is possible, I've only just started digging into perf optimizations for how Prefect uses Dask. • A bug in Dask doing the same. This is much less likely, but also possible. If your flow code looks correct and you don't see a place for memory to be leaking, the next thing I'd do is inspect what's in memory on the worker. A short script to collect this info (note that this requires [pympler](https://github.com/pympler/pympler):
def count_objects():
    from pympler import muppy
    from collections import defaultdict
    from sys import getsizeof
    from distributed.utils import typename
    objs = muppy.get_objects()
    counts = defaultdict(int)
    sizes = defaultdict(int)
    for o in objs:
        k = typename(type(o))
        counts[k] += 1
        try:
            sizes[k] += getsizeof(o)
        except:
            pass
    return counts, sizes

# Run count_objects on the worker in question
output = client.run(count_objects, workers=[worker_address])
counts, sizes = output[worker_address]

# you can then analyze the output to see what's taking up space.
# You might use pandas to do this
df = pd.DataFrame.from_dict(sizes, orient="index", columns=['sizes'])
print(df.sizes.nlargest(100))
A few questions, assuming this isn't due to a bug in your code: • Approximately ow long is the worker running before the message appears, both in time and in number of flows? • Are your workloads memory intensive? Are you creating large or many pandas or numpy objects? • Does this seem to happen more frequently with certain flows or operations? What kind of tasks are you running?
m

Matthias

06/10/2020, 9:19 PM
Hi @Jim Crist-Harif , appreciate your detailed response. I am only going to be able to check all of this next week, but I‘ll get back to you (public holiday in Germany). So far I can give the following: the flow is doing ETL with a lot of data. Parts of it are running through Pandas objects. In the end everything is written to a db. It might certainly be, that my code causes this by leaving connections open, etc. I‘ll go through it. It is a memory intensive flow, but it seems to me, that all the flows I run leave some memory occupied after finishing. I need to do some tests to see what the extent is. I had checkpointing turned on, then it was after ~8 runs (= 8 hours). I turned of checkpointing (also as it occupied a lot of storage space) for all tasks, since then it changed to being ~20 runs (=20 hours). Hope this helps already.