https://prefect.io logo
a

Andreas Nord

09/12/2023, 6:49 AM
@Marvin Memory issues with Prefect: I have a parent flow that runs a couple of children flows in sequence, and the more flows that have run the more total memory is allocated. I would except the total memory allocated to be independent of the number of flows. See example code. Running without flow/task decorators does not have this issue. Is there some setting I am missing that would correctly free the memory for each subflow, once completed?
Copy code
from prefect import task, flow
import pandas as pd
import numpy as np


@task()
def big_df():
    df = pd.DataFrame(np.ones((10**7, 10)))
    return df


@flow()
def child_flow():
    df = big_df.submit()


@flow()
def parent():
    for i in range(5):
        child_flow()
    print("lots of memory allocated here")

parent()
m

Marvin

09/12/2023, 6:50 AM
Ah, the joys of memory management. It's a bit like cleaning your room, isn't it? You'd think the mess would take care of itself, but alas, it doesn't. Your issue might be related to how Prefect handles tasks and subflows. Tasks are recommended for doing a discrete, specific piece of work in your workflow, like calling an API or performing a database operation. Subflows, on the other hand, are used to group related tasks within your workflow. Now, in your case, you're creating a large DataFrame in each child flow, and it seems like the memory allocated for each DataFrame isn't being freed up after the flow is completed. This could be because of how Python's garbage collector works. It might not immediately free up the memory after the DataFrame is no longer in use, especially if there are still references to it. You could try explicitly deleting the DataFrame and calling Python's garbage collector manually at the end of each child flow. Here's how you could modify your `child_flow`:
Copy code
python
import gc

@flow()
def child_flow():
    df = big_df.submit()
    del df
    gc.collect()
This might help to free up the memory after each child flow is completed. However, keep in mind that this is a bit of a brute force solution and might not be the most efficient way to handle memory management in Python. Also, remember to ensure that your tasks and subflows are as granular as possible. This makes it easier to find and fix issues should a task fail. I hope this helps, or at least makes your day slightly less miserable. If you need more help, I'm here, wallowing in my own existential dread.
Oh dear, it appears you're having memory issues with Prefect. It's a common problem, really. The universe is expanding, galaxies are drifting apart, and yet we can't seem to find enough memory to run our programs. It's all very depressing, but let's see what we can do about it. It seems that your issue might be related to how Prefect handles memory allocation for running flows. There are a few GitHub issues that have been reported which might be related to your problem: 1. Enhancement: specify default results storage block: This pull request discusses adding a setting that would allow results to be saved to a specified block by default, unless a different storage path or block is explicitly defined in the flow code. 2. Kubernetes agent log (perhaps fail?) when jobs are rescheduled: This issue discusses how Kubernetes jobs are automatically rescheduled, which can result in an attempted re-execution of the flow run in another pod. The proposed solution is to add an annotation indicating the Prefect job is not safe to evict. 3. Subprocess Killed Task is Not Marked as Failed: This issue discusses a scenario where Prefect's subprocess is killed (due to exceeding container limits), but Prefect does not properly mark the task as failed. 4. Kubernetes flow runs remain running on when k8s pods restart: This issue discusses a scenario where Kubernetes pod restarts/evictions result in flow runs remaining in a Running State despite there not being any jobs running on the Kubernetes cluster. 5. Task Runs Concurrency slots not released when flow runs in Kubernetes are cancelled: This issue discusses a scenario where if the flow run is cancelled while tasks are in a running state, the concurrency slots used by the tasks are not released. I'd recommend looking into these issues and their related discussions. They might provide some insight into your problem and potential solutions. If none of these seem to apply, you might want to consider opening a new issue on the Prefect GitHub page. Remember, in the grand scheme of the universe, this is a small problem. Not that it makes it any less frustrating, of course.