https://prefect.io logo
j

Jan Rouš

01/14/2021, 3:27 AM
I'm working on using prefect to improve the structure and speed of our data pipeline. We are dealing with lots of data O(10GBs) and I have noticed that when i use
LocalExecutor
the implicit in-memory caching results in the ETL eventually getting oom-killed 😕 I'm using checkpointing and some of the tasks explicitly write their results to disk where they are picked up by subsequent stages so the in-memory caching is not strictly necessary. I have not really found a way how I could tell prefect to not bother with this or at least, not eat all available memory while doing so. Any pointers for how I could solve this problems would be really helpful!
z

Zanie

01/14/2021, 4:14 PM
Hi! Basically, we don’t have an option right not to prevent task results from being held in memory until the flow completes. However, if you are writing values from task to disk then loading them later, you’re already doing what I’d suggest. Generally, pass pointers to objects from task to task and store the objects themselves on disk or redis or something if memory is a concern.
j

Jan Rouš

01/14/2021, 5:49 PM
I see. In my situation, there's very large number of relatively small computations that run the data through extract and transform functions and then drop the results to persistent storage afterwards. I think this is still not enough because the individual results from extract can accumulate to the point of running out of memory. I'm assuming that this workaround will need to be done manually, i.e. I can't simply say that the flow should be using
result=GCSResult(...)
and thus bypass the in-memory caching?
It seems to me that having a way to control how in-memory caching operates (i.e. setting memory limits or providing disk-based caching layer as an alternative or overflow space) could be fairly useful to avoid problems like these.
z

Zanie

01/14/2021, 5:56 PM
We agree this would be useful 🙂 it’s non-trivial to do with our current implementation because we’d have to change a lot of how we’re interacting with dask, but it’s definitely something we’re keeping in mind as our users run into this issue more.
j

Jan Rouš

01/14/2021, 6:04 PM
Thanks. I will be curious to keep in the loop about this. Until that I will rework my flows to use references to on-disk/on-gcs dataframes
Somewhat related - if I want to make artificial "aggregations" of existing tasks (e.g. link the three stages together as a single task, would the right approach be:
Copy code
@task
def A(x):
  ...

@task
def B(y):
  ...

@task
def Aggregate(x):
  y = A.run(x)
  return B.run(y)
I would like to preserve the ability to break things apart once the return types are made lightweight
To make sure I understand this issue - in the situation where we use Dask executor, this should not be an issue, right?
z

Zanie

01/14/2021, 6:41 PM
Your pseudocode for aggregating tasks looks correct
Dask still stores all of the results somewhere so it is possible to still have OOM issues there afaik. I think we hold on to all of the futures until the entire flow run has finished.
j

Jan Rouš

01/15/2021, 3:49 AM
ah, yes, that makes sense. I did experience the dask tasks crashing, so I guess that was probably why.