chara
10/31/2023, 3:42 PM@task
def mem_n_slp(x):
sleep(x) # Make the task fail randomly with 5% chance
if randint(0, 100) < 5:
raise ValueError("Random failure")
return x
when I am calling this task within a pod of 0.5GB ram like this everything works fine
A = mem_n_slp.map(1000 * [1] + 1000 * [2])
but when I use its results to downstream identical task the memory is not enough
A = mem_n_slp.map(1000 * [1] + 1000 * [2])
B = mem_n_slp.map(A)
C = mem_n_slp.map(B)
D = mem_n_slp.map(C)
E = mem_n_slp.map(D)
I have serached a lot to find out what is going on but I failed. I even used MALLOC_TRIM_THRESHOLD_=0
but significant results. I suspect prefect/dask is keeping some metadata of completed tasks in memory even when they are not used anymore but I have not found anything related in the docs.
I am registering the flow with:
current_flow.executor = dask.LocalDaskExecutor(num_workers=250)
current_flow.register(project_name=project_name)
I am using prefect 1.4.0
Could someone explain to me what could be the cause? Thanks!