Hi :wave: I have a very simple ETL where I extract...
# ask-community
n
Hi 👋 I have a very simple ETL where I extract data from a Rest API and save it to a DB. I notice that my RAM keep increasing until the whole Flow has finished. Why is that? This happens despite me using a depth first search in the flow, so I don’t really keep or need the data, but the Flow keeps it in memory it seems. Can anyone please advise?: )
k
Hi, Prefect does not do garbage collection natively. I think you can garbage collect it after the DB upload. If the api is one task and the DB upload is one task, it becomes a bit harder to do this. You’d need to persist it somewhere -> garbage collect -> retrieve -> upload -> garbage collect to be precise.
n
I see; Thanks! Is there an example somewhere how I can set this up?
I ask because for me this is super crucial. Hundreds of GB pass over a single flow, so I have to keep scheduling new ones because of this.
k
I dont have a good example. It would be a matter of
import gc, del data, gc.collect()
n
so you are saying just to add a general garbage collection task at the end of my depth tree?
I guess this would work for a single working but may fail with multiple workers? (2 or more)
k
Let me ask the team and get more info about garbage collection in the distributed setup.
n
Thank you 🙏 Finding a solution to this would literary save me a ton of time.
k
Ok so the recommendation of the team is that Prefect tends to hold data inefficiently. For jobs with bigger data like yours, the recommended approach is to use Prefect to orchestrate Dask and run the jobs there. Dask itself has better memory management and cleans up stuff that’s no longer needed.
We can’t clean up results from tasks currently
The best you might do is to combine the api query and db upload task into one bigger task. delete the data after upload with the Python garbage collection and hopefully that frees things up.
n
so I use that already:
Copy code
flow.storage = Local(directory=my_storage_dir)
    flow.environment = LocalEnvironment(
        labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers),
    )
that’s what you mean, right?
A separate question - is there a plan for this memory management of prefect to be addressed in the near future?
k
No for the DaskExecutior (btw environments are being deprecated). It would be more like using Prefect on a single core to drive spinning up the Dask Cluster and submitting jobs like https://docs.prefect.io/core/idioms/resource-manager.html
n
This link seems to explain exactly how to do it, right? : ) (I just skimmed it now) Will take a deeper look now.
k
It’s a known pain point for sure but I can’t give a promise as to when it would be solved. This would take some careful design to really handle all edge cases. Sorry about that.
👍 1
n
It’s good enough for me to know that it’s a known pain point to the team. Thanks for the help, Kevin! : ) I will take a loot at the link.