George Shishorin
01/24/2021, 10:04 PMfrom memory_profiler import profile
from prefect import Flow
from prefect import task
from time import sleep
with Flow('test flow') as flow:
@task(
name="a_task",
tags=["etc"]
)
def a_task():
@profile(precision=3)
def _func_run():
return [1] * (12 ** 8)
sleep(3)
return _func_run()
a = a_task()
del a
a = None
@task(
name="b_task",
tags=["etc"],
)
def b_task():
@profile(precision=3)
def _func_run():
return [2] * (10 ** 8)
sleep(3)
return _func_run()
b = b_task(upstream_tasks=[a])
del b
b = None
def flow_run():
flow.run()
sleep(3)
if __name__ == '__main__':
flow_run()
Code for NONprefect flow:
@profile(precision=3)
def func_run():
def a_run():
a = [1] * (12 ** 8)
sleep(3)
return a
a = a_run()
del a
def b_run():
b = [2] * (10 ** 8)
sleep(3)
return b
b = b_run()
del b
return None
if __name__ == '__main__':
func_run()
sleep(3)
P.S. memory profiling is running via:
python -m mprof run script.py
python -m mprof plotSpencer
01/24/2021, 10:20 PMdel
and setting to None
will simply screw up the dependency graph. The None
is converted to a ConstantTask
and made a dependency for b_task
.flow.run()
. This is where the memory is hanging around.
flowrun = flow.run()
print(flowrun.result)
You'll see both task Result
objects.
flowrun.result[a].result <-- the return value
George Shishorin
01/24/2021, 11:06 PMfrom memory_profiler import profile
from prefect import Flow
from prefect import task
from time import sleep
import pandas as pd
with Flow('test flow') as flow:
@task(
name="a_task",
tags=["etc"]
)
def a_task():
@profile(precision=3)
def _func_run():
return pd.DataFrame({'A': [1] * (12 ** 5)}).reset_index()
sleep(3)
return _func_run()
a = a_task()
@task(
name="b_task",
tags=["etc"],
)
def b_task():
@profile(precision=3)
def _func_run():
return pd.DataFrame({'B': [2] * (10 ** 6)}).reset_index()
sleep(3)
return _func_run()
b = b_task(upstream_tasks=[a])
@task(
name="Merge",
tags=["pandas"],
)
def merge(left, right, how, columns):
"""Merge given dataframes."""
return pd.merge(left=left, right=right, how=how, on=columns)
c = merge(left=a, right=b, how='left', columns='index')
def flow_run():
flow.run()
sleep(3)
if __name__ == '__main__':
flow_run()
Spencer
01/24/2021, 11:07 PMGeorge Shishorin
01/24/2021, 11:12 PMSpencer
01/24/2021, 11:17 PMGeorge Shishorin
01/25/2021, 9:04 AM