hello one and all I’m trying to analyse the memory...
# ask-community
c
hello one and all I’m trying to analyse the memory consumption of one of my flows, but I cannot figure out how to add guppy to my flow. My attempt was to create two tasks: one to create the hpy object and another to print it. I added dependencies to make the object creator the first task in the flow and the printer the last task in the flow. The heap is actually printed, but only for the task that created the hpy object. Does anybody know how I can have the hpy() object cover the entire flow? (or if there is an alternative to using guppy, that’d be fine as well)
d
Hi @Christoph Wiese! Can you share your Flow’s configuration code (Run Config and Executor) and a rough outline of your Flow? I’ve not used guppy before but I’m happy to take a quick look at the docs
c
Hi @Dylan 👋 I’m using UniversalRun and the LocalDask Executor:
flow.run_config = UniversalRun(labels=[f"data-{self._config.env}"])
and
self._executor = LocalDaskExecutor(scheduler="threads", num_workers=15)
The flow is relatively simple, it syncs files between an SFTP server and an S3 bucket. First it gets a secret from AWSSecretsManager, uses that to list file om an SFTP server using a task I wrote using paramiko, a separate task lists all files in an S3 bucket, then I’m comparing both file list and map all files that are not in the S3 bucket to an SFTP download task, which is followed by an S3Upload task
I’d be happy to share more code if it’d be helpful 🙂
d
Ahh. I’m not sure how guppy works, but I do know that your tasks are likely executed in new threads created by the executor. You’ll need to make sure guppy is run inside each task
from guppy import hpy; h=hpy()
I suspect that needs to be called in each task you’re interested in
c
I tried that as well and that work s for the individual tasks, which seem fine individually - but the overall flow keeps accumulating memory, until it eventually crashes the system 😬
d
Have you tried adding more memory 😉
Kidding
c
Am I right in assuming that the memory of mapped tasks is freed up once that child and all it dependent children have finished their run?
d
What agent are you using to run this?
c
Yeah, that is actually what I’m playing with now, switching to the ECSRunner and trying bigger instance 😅
The original agent was a fargate with 1 vcpu and 3gb ram - my latest attempt was
flow.run_config = ECSRun(
labels=[f"data-{self._config.env}"], cpu=2048, memory=16384
)
thats 2 vCPUs in AWS parlance
d
Gotcha
c
and just for reference, I’m syncing about 25gb of data in about 2500 files
i.e. thats about 5k children in the flow
one child for the download, one for the upload
d
So Task results from all tasks (mapped or not) remain in memory for the duration of the flow
c
!
woah, that explains it
my understanding was that flows should be designed such that we could easily transition to running them on a cluster, i.e. directly pass data between tasks instead of using local files
did I get that wrong?
should I be doing a flow of flows here, where I map chunks of the workload to child flows? (Though I’d find that a bit excessive, since effectively I’d be mapping my workload twice, once on the flow level and then on the task level)
d
When I’m dealing with a large volume of data, I often put everything into cloud storage and pass references to it between tasks, so that my flow only has the data I’m working on in memory and tasks pass references to that data between each other
c
hm, I really misunderstood the point of the task result snapshots then
I thought they should contain the data and the S3 result handler would upload them so that I could later resume if need be
but understood, in that case I actually need to do the download and upload in one task
d
Hi Christoph, I’m juggling a few conversations at the moment but I’m also trying to dig in further. My understanding on how task results do or don’t get release from memory isn’t stellar, and I want to make sure I’m giving you the best information thatI can
Let me discuss with the team a bit more and try to provide some more insight
c
that would be great, I’ll wait for that before I begin re-writing my flows 🙂
👍 1
Hi @Dylan, just wondering if you had a chance to talk to the team about the way memory is managed for a flow?
d
Not just yet, will get back to you 👍
👌🏻 1
c
@Dylan just wondering if there is an update? I don’t mean to rush you, just have a ticket blocked by this and I’m wondering how to proceed
d
Hey @Christoph Wiese
Got a little more clarity here
The short answer is that Prefect doesn’t do memory management, so it’s the same behavior that you might see if you wrote a script yourself. Variables will be garbage collected once Python determines there are no more references to that variable — all task return values will be referenced via state objects and therefore will live in memory for the duration of the flow run. Running a workflow on a dask cluster helps because any intermediate memory usage within your task (that isn’t referenced in your final return value) will be run on a dask worker and cleaned up more aggressively, but the final return value will still get collected into memory back in the main flow runner process
If you’re using a LocalDaskExecutor, using process workers will clean up intermediate memory but thread workers will not
c
That is very helpful, thanks @Dylan!
d
Anytime!