Thread
#prefect-community
    Christoph Wiese

    Christoph Wiese

    1 year ago
    hello one and all I’m trying to analyse the memory consumption of one of my flows, but I cannot figure out how to add guppy to my flow. My attempt was to create two tasks: one to create the hpy object and another to print it. I added dependencies to make the object creator the first task in the flow and the printer the last task in the flow. The heap is actually printed, but only for the task that created the hpy object. Does anybody know how I can have the hpy() object cover the entire flow? (or if there is an alternative to using guppy, that’d be fine as well)
    Dylan

    Dylan

    1 year ago
    Hi @Christoph Wiese! Can you share your Flow’s configuration code (Run Config and Executor) and a rough outline of your Flow? I’ve not used guppy before but I’m happy to take a quick look at the docs
    Christoph Wiese

    Christoph Wiese

    1 year ago
    Hi @Dylan 👋 I’m using UniversalRun and the LocalDask Executor:
    flow.run_config = UniversalRun(labels=[f"data-{self._config.env}"])
    and
    self._executor = LocalDaskExecutor(scheduler="threads", num_workers=15)
    The flow is relatively simple, it syncs files between an SFTP server and an S3 bucket. First it gets a secret from AWSSecretsManager, uses that to list file om an SFTP server using a task I wrote using paramiko, a separate task lists all files in an S3 bucket, then I’m comparing both file list and map all files that are not in the S3 bucket to an SFTP download task, which is followed by an S3Upload task
    I’d be happy to share more code if it’d be helpful 🙂
    Dylan

    Dylan

    1 year ago
    Ahh. I’m not sure how guppy works, but I do know that your tasks are likely executed in new threads created by the executor. You’ll need to make sure guppy is run inside each task
    from guppy import hpy; h=hpy()
    I suspect that needs to be called in each task you’re interested in
    Christoph Wiese

    Christoph Wiese

    1 year ago
    I tried that as well and that work s for the individual tasks, which seem fine individually - but the overall flow keeps accumulating memory, until it eventually crashes the system 😬
    Dylan

    Dylan

    1 year ago
    Have you tried adding more memory 😉
    Kidding
    Christoph Wiese

    Christoph Wiese

    1 year ago
    Am I right in assuming that the memory of mapped tasks is freed up once that child and all it dependent children have finished their run?
    Dylan

    Dylan

    1 year ago
    What agent are you using to run this?
    Christoph Wiese

    Christoph Wiese

    1 year ago
    Yeah, that is actually what I’m playing with now, switching to the ECSRunner and trying bigger instance 😅
    The original agent was a fargate with 1 vcpu and 3gb ram - my latest attempt was
    flow.run_config = ECSRun(
    labels=[f"data-{self._config.env}"], cpu=2048, memory=16384
    )
    thats 2 vCPUs in AWS parlance
    Dylan

    Dylan

    1 year ago
    Gotcha
    Christoph Wiese

    Christoph Wiese

    1 year ago
    and just for reference, I’m syncing about 25gb of data in about 2500 files
    i.e. thats about 5k children in the flow
    one child for the download, one for the upload
    Dylan

    Dylan

    1 year ago
    So Task results from all tasks (mapped or not) remain in memory for the duration of the flow
    Christoph Wiese

    Christoph Wiese

    1 year ago
    !
    woah, that explains it
    my understanding was that flows should be designed such that we could easily transition to running them on a cluster, i.e. directly pass data between tasks instead of using local files
    did I get that wrong?
    should I be doing a flow of flows here, where I map chunks of the workload to child flows? (Though I’d find that a bit excessive, since effectively I’d be mapping my workload twice, once on the flow level and then on the task level)
    Dylan

    Dylan

    1 year ago
    When I’m dealing with a large volume of data, I often put everything into cloud storage and pass references to it between tasks, so that my flow only has the data I’m working on in memory and tasks pass references to that data between each other
    Christoph Wiese

    Christoph Wiese

    1 year ago
    hm, I really misunderstood the point of the task result snapshots then
    I thought they should contain the data and the S3 result handler would upload them so that I could later resume if need be
    but understood, in that case I actually need to do the download and upload in one task
    Dylan

    Dylan

    1 year ago
    Hi Christoph, I’m juggling a few conversations at the moment but I’m also trying to dig in further. My understanding on how task results do or don’t get release from memory isn’t stellar, and I want to make sure I’m giving you the best information thatI can
    Let me discuss with the team a bit more and try to provide some more insight
    Christoph Wiese

    Christoph Wiese

    1 year ago
    that would be great, I’ll wait for that before I begin re-writing my flows 🙂
    Hi @Dylan, just wondering if you had a chance to talk to the team about the way memory is managed for a flow?
    Dylan

    Dylan

    1 year ago
    Not just yet, will get back to you 👍
    Christoph Wiese

    Christoph Wiese

    1 year ago
    @Dylan just wondering if there is an update? I don’t mean to rush you, just have a ticket blocked by this and I’m wondering how to proceed
    Dylan

    Dylan

    1 year ago
    Hey @Christoph Wiese
    Got a little more clarity here
    The short answer is that Prefect doesn’t do memory management, so it’s the same behavior that you might see if you wrote a script yourself. Variables will be garbage collected once Python determines there are no more references to that variable — all task return values will be referenced via state objects and therefore will live in memory for the duration of the flow run. Running a workflow on a dask cluster helps because any intermediate memory usage within your task (that isn’t referenced in your final return value) will be run on a dask worker and cleaned up more aggressively, but the final return value will still get collected into memory back in the main flow runner process
    If you’re using a LocalDaskExecutor, using process workers will clean up intermediate memory but thread workers will not
    Christoph Wiese

    Christoph Wiese

    1 year ago
    That is very helpful, thanks @Dylan!
    Dylan

    Dylan

    1 year ago
    Anytime!