Hello! I am currently running into issues when usi...
# ask-community
p
Hello! I am currently running into issues when using @tasks in a flow, we are iterating through a lot of dates and pass very large pandas dataframes to a task for each date, what I notice is that when you do this after about the 10th day our flow gets OOMKilled, if we don't use tasks it doesn't seem to have an issue. The problem being our upsert method is currently a task. My assumption is that the flow is keeping track of the parameters sent to each task and effectively running out of memory by trying to keep each of our dataframes in memory. Is there a way to avoid this or any information on this kind of thing? 🙂
j
Hey, can you try annotating your task parameters with
quote
and see if that makes a difference?
Copy code
from prefect.utilities.annotations import quote

@task
def my_task(df):
    ...

@flow
def my_flow():
    my_task(quote(df))
p
I'll give it a shot and get back to you, I think I need to create a simple test case for this as it's quite slow to go re-run all these! can I ask what this does?
I have just tried this and unfortunately it hasn't helped 😞
j
that keeps prefect from inspecting the task params for futures. I've seen it make the task startup time take awhile. Do you happen to have an MRE that causes this? Where running without
@task
succeeds and with it OOMs?
If so would be great if you could file an issue here: https://github.com/PrefectHQ/prefect/issues what you're describing is definitely not expected but it's tough to say what is happening without more information
p
I'll get an example over to you bare with me 🙂 it wont be what oomkills but it'll be the same principle
🙌 1
🙏 1
This showcases the problem I run into, if you look at the prints (ignoring the df.size print) the current memory goes up relatively consistently each task run. This seems to be what's causing our problem as we're running a lot of these very quickly it builds up. if you were to run
fake_upsert.fn(
instead it doesn't have that level of fluctuation
Copy code
import tracemalloc
import pandas as pd
import numpy as np
from prefect import task, flow


@task
def create_large_df():
    df = pd.DataFrame(np.ones((10**7, 10)))
    return df


@task()
def fake_upsert(df: pd.DataFrame):
    print(df.size)


@flow
def fake_flow():
    tracemalloc.start()
    df = create_large_df()
    memory_start, _ = tracemalloc.get_traced_memory()
    print(f"START: {memory_start/1000}")

    for i in range(50):
        fake_upsert(df)
        memory, _ = tracemalloc.get_traced_memory()
        print(f"ITERATION {i}: {(memory - memory_start)/1000}")

    memory_end, _ = tracemalloc.get_traced_memory()
    print(f"END: {(memory_end - memory_start)/1000}")
    tracemalloc.stop()


fake_flow()
Although, when i reduce the dataframe size here i can't tell whether that is causing an issue, it just seems to be the tasks themselves that are causing an increase in memory
j
Hm it it doesn't seem like it's totally proportional to the parameter size?
Tasks run in worker threads which explains some memory overhead but not enough to be the only deal breaker
p
Hmm, yeah it's hard to figure out what is the culprit, do you have any examples of pushing multiple large datasets up to the database with a single flow? I wonder if we've just written it badly