Hey everyone, we've been running out of memory on ...
# ask-community
n
Hey everyone, we've been running out of memory on our servers using prefect with the dask task runner. I've narrowed the code down to the example below:
Copy code
import numpy as np
import pandas as pd
from prefect import flow, task
from prefect_dask import DaskTaskRunner


@task
def do_nothing(df):
    return df


@flow(task_runner=DaskTaskRunner())
def test():
    num_rows = 1000000
    num_columns = 10

    # Create a medium sized db
    df = pd.DataFrame(np.random.rand(num_rows, num_columns), columns=[f"Column_{i}" for i in range(num_columns)])
    for x in range(0, 5):
        a = do_nothing.submit(df)
        a.result()


if __name__ == "__main__":
    test()
When I run this, after each submit of the task is completed, i would anticipate the memory of the dask worker to go down but instead, with each for loop the memory keeps climbing up and up. When the code is finished, the dask worker (if kept alive) just holds memory and never releases it at all. Does anyone have any clue as to why this might be happening?
I've tried setting
MALLOC_TRIM_THRESHOLD_
to 0, per this document, but no luck: https://distributed.dask.org/en/latest/worker-memory.html#automatically-trim-memory