Andreas Nord
09/11/2023, 6:38 PMJack P
09/12/2023, 12:30 AMAndreas Nord
09/12/2023, 6:43 AMAdam Azzam
09/12/2023, 11:22 AMAndreas Nord
09/12/2023, 11:24 AM@flow(task_runner=task_runners.SequentialTaskRunner)
def child_flow():
df = big_df.submit()
@flow(task_runner=task_runners.SequentialTaskRunner)
def parent():
for i in range(1):
child_flow()
print("lots of memory allocated here")
parent()
Jack P
09/12/2023, 11:35 AM@flow(cache_result_in_memory=False)
def foo():
return "pretend this is large data"
and for tasks there's 2 (might be for flow too don't have a testset up though)
@task(persist_result=False, cache_result_in_memory=False)
def bar():
return "pretend this is biiiig data"
Caching of results in memory¶
Andreas Nord
09/12/2023, 11:54 AMJack P
09/12/2023, 11:55 AMbig_df.submit().result()
?(<tel:4001261235|4001261235>, 4001587381)
(I think it's in bytes and the right number is peak, but I honestly haven't used tracemalloc in a bit..) of memory usage.
When I run this, I have about (826042347, 5485995247)
, which is lower left number, but higher right... which I don't know if this solves it. The left number here is the same number I get when I run range (1), and also by adding return_state=True, it seems to be a bit slower too.. I don't know if this is solution or even right direction, just sharing findings, lemme know what you think
from prefect import task, flow
import pandas as pd
import numpy as np
import tracemalloc
@task(cache_result_in_memory=False)
def big_df():
df = pd.DataFrame(np.ones((10**7, 10)))
return df
@flow()
def child_flow():
df = big_df.submit(return_state=True).result()
@flow()
def parent():
for i in range(5):
child_flow()
print("lots of memory allocated here")
parent()
(26057819, 5486067324)
, which appears to be really high peak but lower memory.. progress haha
*I don't think task runner is doing anything here, just second cache result is)
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
import pandas as pd
import numpy as np
import tracemalloc
@task(cache_result_in_memory=False)
def big_df():
df = pd.DataFrame(np.ones((10**7, 10)))
return df
@flow(
cache_result_in_memory=False,
task_runner=SequentialTaskRunner(),
)
def child_flow():
df = big_df.submit(return_state=True).result()
@flow()
def parent():
tracemalloc.start()
print(tracemalloc.get_traced_memory())
for i in range(5):
child_flow()
print("lots of memory allocated here")
print("\n\n\n\n")
print(tracemalloc.get_traced_memory())
tracemalloc.stop()
parent()
Andreas Nord
09/12/2023, 1:23 PM