Panda
09/28/2023, 2:19 PMJake Kaplan
09/28/2023, 2:28 PMquote
and see if that makes a difference?
from prefect.utilities.annotations import quote
@task
def my_task(df):
...
@flow
def my_flow():
my_task(quote(df))
Panda
09/28/2023, 2:31 PMPanda
09/28/2023, 3:02 PMJake Kaplan
09/28/2023, 3:04 PM@task
succeeds and with it OOMs?Jake Kaplan
09/28/2023, 3:05 PMPanda
09/28/2023, 3:07 PMPanda
09/28/2023, 3:12 PMfake_upsert.fn(
instead it doesn't have that level of fluctuation
import tracemalloc
import pandas as pd
import numpy as np
from prefect import task, flow
@task
def create_large_df():
df = pd.DataFrame(np.ones((10**7, 10)))
return df
@task()
def fake_upsert(df: pd.DataFrame):
print(df.size)
@flow
def fake_flow():
tracemalloc.start()
df = create_large_df()
memory_start, _ = tracemalloc.get_traced_memory()
print(f"START: {memory_start/1000}")
for i in range(50):
fake_upsert(df)
memory, _ = tracemalloc.get_traced_memory()
print(f"ITERATION {i}: {(memory - memory_start)/1000}")
memory_end, _ = tracemalloc.get_traced_memory()
print(f"END: {(memory_end - memory_start)/1000}")
tracemalloc.stop()
fake_flow()
Panda
09/28/2023, 3:17 PMJake Kaplan
09/28/2023, 4:47 PMJake Kaplan
09/28/2023, 4:47 PMPanda
09/28/2023, 4:52 PM