Title
n

Newskooler

05/13/2021, 4:24 PM
Hi 👋 I have a very simple ETL where I extract data from a Rest API and save it to a DB. I notice that my RAM keep increasing until the whole Flow has finished. Why is that? This happens despite me using a depth first search in the flow, so I don’t really keep or need the data, but the Flow keeps it in memory it seems. Can anyone please advise?: )
k

Kevin Kho

05/13/2021, 4:27 PM
Hi, Prefect does not do garbage collection natively. I think you can garbage collect it after the DB upload. If the api is one task and the DB upload is one task, it becomes a bit harder to do this. You’d need to persist it somewhere -> garbage collect -> retrieve -> upload -> garbage collect to be precise.
n

Newskooler

05/13/2021, 4:29 PM
I see; Thanks! Is there an example somewhere how I can set this up?
I ask because for me this is super crucial. Hundreds of GB pass over a single flow, so I have to keep scheduling new ones because of this.
k

Kevin Kho

05/13/2021, 4:38 PM
I dont have a good example. It would be a matter of
import gc, del data, gc.collect()
n

Newskooler

05/13/2021, 4:38 PM
so you are saying just to add a general garbage collection task at the end of my depth tree?
I guess this would work for a single working but may fail with multiple workers? (2 or more)
k

Kevin Kho

05/13/2021, 4:41 PM
Let me ask the team and get more info about garbage collection in the distributed setup.
n

Newskooler

05/13/2021, 4:41 PM
Thank you 🙏 Finding a solution to this would literary save me a ton of time.
k

Kevin Kho

05/13/2021, 4:49 PM
Ok so the recommendation of the team is that Prefect tends to hold data inefficiently. For jobs with bigger data like yours, the recommended approach is to use Prefect to orchestrate Dask and run the jobs there. Dask itself has better memory management and cleans up stuff that’s no longer needed.
We can’t clean up results from tasks currently
The best you might do is to combine the api query and db upload task into one bigger task. delete the data after upload with the Python garbage collection and hopefully that frees things up.
n

Newskooler

05/13/2021, 4:51 PM
so I use that already:
flow.storage = Local(directory=my_storage_dir)
    flow.environment = LocalEnvironment(
        labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers),
    )
that’s what you mean, right?
A separate question - is there a plan for this memory management of prefect to be addressed in the near future?
k

Kevin Kho

05/13/2021, 4:56 PM
No for the DaskExecutior (btw environments are being deprecated). It would be more like using Prefect on a single core to drive spinning up the Dask Cluster and submitting jobs like https://docs.prefect.io/core/idioms/resource-manager.html
n

Newskooler

05/13/2021, 4:57 PM
This link seems to explain exactly how to do it, right? : ) (I just skimmed it now) Will take a deeper look now.
k

Kevin Kho

05/13/2021, 4:58 PM
It’s a known pain point for sure but I can’t give a promise as to when it would be solved. This would take some careful design to really handle all edge cases. Sorry about that.
đź‘Ť 1
n

Newskooler

05/13/2021, 5:01 PM
It’s good enough for me to know that it’s a known pain point to the team. Thanks for the help, Kevin! : ) I will take a loot at the link.