https://prefect.io logo
h

Henrik Väisänen

01/28/2021, 11:39 AM
Hey, I use Prefect+Dask distributed and in the code the prefect flow is executed multiple times consecutively. The problem is that in the Dask profiler I see that the memory usage keep growing linearly although each execution (prefect flow) can be considered as a "clean run" and the amount of used memory should stay constant. Has anyone tackled similar issue?
z

Zanie

01/28/2021, 3:50 PM
HI @Henrik Väisänen — how are you running your flow multiple times?
h

Henrik Väisänen

01/28/2021, 4:05 PM
Hey, basic for loop. Within the loop there is the definition of the flow which is followed by flow.run(). After the flow has finished, I just take the result and start the next iteration. I am suspecting that maybe my libraries leak memory but is it possible that the leaked memory persists between the runs?
z

Zanie

01/28/2021, 4:06 PM
What result are you taking? If you’re storing a reference to the result then python can’t garbage collect it.
h

Henrik Väisänen

01/28/2021, 4:34 PM
Thanks for your advice. I just pass flow_state.result[foo].result to an function which then stores the value to an array.
z

Zanie

01/28/2021, 4:35 PM
Perhaps try explicitly running
del flow_state
at the end of your for loop — I’m a bit surprised it’s not being garbage collected.
h

Henrik Väisänen

01/28/2021, 5:01 PM
I tried but no difference. Dask-workers retain their allocated memory and it keeps growing after I start the program again. Only restarting the worker seem to help.
z

Zanie

01/28/2021, 5:02 PM
I’ll reach out to someone on our team with more dask experience. You could try acquiring a dask client and restarting the workers at the end of the flow run — see https://distributed.dask.org/en/latest/memory.html#aggressively-clearing-data
Also, just to confirm, you’re not holding on to instances of the
Flow
class for more than one iteration of your loop, right?
Are you persisting dask collections within your flows?
h

Henrik Väisänen

01/28/2021, 5:29 PM
thank you. Yes I'm not holding any instances and isolated the flow in a way that it should be destroyed accordingly. Also, I'm not calling any dask operations like persist() myself, everything is done with prefect API.
@Zanie Thank you again. The library that was used inside the task was leaking. The memory use was stabilized after fixing that.
z

Zanie

01/28/2021, 7:05 PM
Great to hear!
3 Views