Hello prefect, I have a flow with the following ta...
# ask-community
c
Hello prefect, I have a flow with the following task:
Copy code
@task
def mem_n_slp(x):
    sleep(x)    # Make the task fail randomly with 5% chance
    if randint(0, 100) < 5:
        raise ValueError("Random failure")
    return x
when I am calling this task within a pod of 0.5GB ram like this everything works fine
Copy code
A = mem_n_slp.map(1000 * [1] + 1000 * [2])
but when I use its results to downstream identical task the memory is not enough
Copy code
A = mem_n_slp.map(1000 * [1] + 1000 * [2])
        B = mem_n_slp.map(A)
        C = mem_n_slp.map(B)
        D = mem_n_slp.map(C)
        E = mem_n_slp.map(D)
I have serached a lot to find out what is going on but I failed. I even used
MALLOC_TRIM_THRESHOLD_=0
but significant results. I suspect prefect/dask is keeping some metadata of completed tasks in memory even when they are not used anymore but I have not found anything related in the docs. I am registering the flow with:
Copy code
current_flow.executor = dask.LocalDaskExecutor(num_workers=250)
    current_flow.register(project_name=project_name)
I am using prefect
1.4.0
Could someone explain to me what could be the cause? Thanks!