https://prefect.io logo
m

Michael Kennely

12/17/2020, 8:10 PM
Hi all! I’m having a RAM issue when running one of my flows (diagram in thread). Essentially, I have a list of calls I need to make to a source system that I store in a list and the flow is for each item in that list
ping source system for dataset -> send dataset to pandas DF -> store dataset as a file in S3 -> push contents of said file to our datastore
I’m using the
LocalDaskExecutor
to do DFE and that’s helped a ton as the flow used to fail at the first step because it was doing Breadth first and holding all of the datasets in memory. I expected that once the task tree was completed for each dataset that it’d release that memory but that doesn’t seem to be the case. Am I doing something wrong/poorly or is there a way to free up that memory once the terminal task has completed for each branch?
d

Dylan

12/17/2020, 8:28 PM
Hi @Michael Kennely! Can you share a bit more info about your memory usage reporting here, what monitoring is indicating that memory isn’t being released over the course of the Flow Run?
m

Michael Kennely

12/17/2020, 8:37 PM
The “monitoring” is pretty back of the envelope 😅 As the flow is running I have the
top
command running on the machine that I have my agent installed on and there’s a process that’s listed as
prefect
and it’s
%MEM
is continually increasing while the available memory on that machine continually decreases.
The pid for that process also matches the pid that’s returned in the logs when the flow is first kicked off which leads me to believe that memory is not being released despite the various task trees “completing”
d

Dylan

12/17/2020, 8:38 PM
Got it, thanks that’s helpful!
m

Michael Kennely

12/17/2020, 8:39 PM
Obviously, there are easy solutions for this, but I didn’t want to jus do something easy that wasn’t a good solution
I could split this into 2 separate flows for load balancing, similarly I could try to do a “parent” flow that kicks off sub-flows and splits the constant list between them. I could simply increase the resources in the execution environment as well (I’m using a fairly small EC2 instance) but I’m thinking that those are knee jerk reactions that aren’t really “future-proof”
d

Dylan

12/17/2020, 8:48 PM
Hey I haven’t forgotten about this, just having a quick discussion
👍 1
Hey hey, quickly want to confirm, are you orchestrating Flow Runs with Prefect Server or Prefect Cloud?
m

Michael Kennely

12/17/2020, 8:55 PM
Prefect Cloud - on the Developer Tier
d

Dylan

12/17/2020, 8:55 PM
Local agent and run config?
m

Michael Kennely

12/17/2020, 8:59 PM
Local agent and
Copy code
flow.environment = LocalEnvironment(
    executor=LocalDaskExecutor()
)

flow.storage = GitHub(redacted)
I use supervisor to keep the agent up persistently on the EC2 instance if that’s relevant/helpful
z

Zanie

12/17/2020, 9:23 PM
Hi! We’ve been talking about this as a team — it’s a bit of a tricky situation. Basically, we have to store the return values from every task so they can be used downstream. We can’t just “drop” this from some “intermediate” tasks because
dask
is managing the values. We’ll continue to think about this in the future, but because of this complexity we can’t promise a perfect solution right now. The best recommendation I have for this is to reduce the amount of data that needs to be passed between tasks. Passing large amounts of data from task to task won’t scale to a distributed setting anyway. I’d recommend moving your tasks that use the dataframe into a single task. Alternatively, you can write the dataframe to disk and pass the filepath to the next task.
🙌 1
m

Michael Kennely

12/17/2020, 9:26 PM
Thanks for the replies @Zanie and @Dylan!! I greatly appreciate your time and attention on this. I appreciate the guidance and I think that the suggestions you gave are totally viable
Your suggestion was spot on! I should have realized that passing datasets between tasks was no bueno 😅
12 Views