https://prefect.io logo
a

Andreas Nord

09/11/2023, 6:38 PM
Hi! Memory issues with Prefect: I have a parent flow that runs a couple of children flows in sequence, and the more flows that have run the more total memory is allocated. I would except the total memory allocated to be independent of the number of flows. See example code. Running without flow/task decorators does not have this issue. Is there some setting I am missing that would correctly free the memory for each subflow, once completed?
👀 1
⚠️ 1
j

Jack P

09/12/2023, 12:30 AM
Hey @Andreas Nord, just chiming in, but I have no idea.. I wonder if we would need to run literally gc.collect in order for the df to be erased, or del df.. Seems excessive though.. Hoping someone can provide insight, as I am very curious 🙏
a

Andreas Nord

09/12/2023, 6:43 AM
Yes for sure it should not be necessary to do that
a

Adam Azzam

09/12/2023, 11:22 AM
Curious to see if it’s fixed by switching to a sequential task runner? https://docs.prefect.io/latest/concepts/task-runners/
a

Andreas Nord

09/12/2023, 11:24 AM
Like this?
Copy code
@flow(task_runner=task_runners.SequentialTaskRunner)
def child_flow():
    df = big_df.submit()


@flow(task_runner=task_runners.SequentialTaskRunner)
def parent():
    for i in range(1):
        child_flow()
    print("lots of memory allocated here")

parent()
There is no difference
j

Jack P

09/12/2023, 11:35 AM
@Andreas Nord does this answer it -> https://docs.prefect.io/2.11.4/concepts/results/?h=memor#caching-of-results-in-memory
Copy code
@flow(cache_result_in_memory=False)
def foo():
    return "pretend this is large data"
and for tasks there's 2 (might be for flow too don't have a testset up though)
Copy code
@task(persist_result=False, cache_result_in_memory=False)
def bar():
    return "pretend this is biiiig data"
^Should link to section named
Copy code
Caching of results in memory¶
a

Andreas Nord

09/12/2023, 11:54 AM
Yeah I tried this, if I do both persist_result=False, cache_result_in_memory=False, it works but I cannot use my task results within the same flow so it's not useable
j

Jack P

09/12/2023, 11:55 AM
even if you do `
Copy code
big_df.submit().result()
?
oh i see, one sec lemme play around with it.. stand-by
So with your original code, I had around
(<tel:4001261235|4001261235>, 4001587381)
(I think it's in bytes and the right number is peak, but I honestly haven't used tracemalloc in a bit..) of memory usage. When I run this, I have about
(826042347, 5485995247)
, which is lower left number, but higher right... which I don't know if this solves it. The left number here is the same number I get when I run range (1), and also by adding return_state=True, it seems to be a bit slower too.. I don't know if this is solution or even right direction, just sharing findings, lemme know what you think
Copy code
from prefect import task, flow
import pandas as pd
import numpy as np

import tracemalloc


@task(cache_result_in_memory=False)
def big_df():
    df = pd.DataFrame(np.ones((10**7, 10)))
    return df


@flow()
def child_flow():
    df = big_df.submit(return_state=True).result()

@flow()
def parent():
    for i in range(5):
        child_flow()
    print("lots of memory allocated here")

parent()
This has
(26057819, 5486067324)
, which appears to be really high peak but lower memory.. progress haha *I don't think task runner is doing anything here, just second cache result is)
Copy code
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
import pandas as pd
import numpy as np

import tracemalloc


@task(cache_result_in_memory=False)
def big_df():
    df = pd.DataFrame(np.ones((10**7, 10)))
    return df


@flow(
    cache_result_in_memory=False,
    task_runner=SequentialTaskRunner(),
)
def child_flow():
    df = big_df.submit(return_state=True).result()

@flow()
def parent():
    tracemalloc.start()

    print(tracemalloc.get_traced_memory())

    for i in range(5):
        child_flow()
    print("lots of memory allocated here")

    print("\n\n\n\n")

    print(tracemalloc.get_traced_memory())

    tracemalloc.stop()

parent()
a

Andreas Nord

09/12/2023, 1:23 PM
With peak you mean peak memory usage? In my case I need to prioritize the peak because my prefect process is crashing because of the memory usage.
Thanks for having a look! These things should be more clearly documented...
26 Views