https://prefect.io logo
g

George Shishorin

01/24/2021, 10:04 PM
Dear Prefect Community, As it is known, there are some great features for caching tasks results, that make sometimes much convenience to reuse. But at the same time I faced with a problem related to how Prefect deal with RAM during the flow running. The main question: is it possible to remove Tasks results and thus make RAM free during the flow running? Most of flows contains quite difficult computing logic. There is no way to combine several functions into one Task (atomicity and readability will be lost). I would provide an example for prefect vs “nonprefect” flow and its RAM consumption...->
Flow logic: 1. create a 2. sleep (3 sec) 3. remove a 4. create b 5. sleep (3 sec) 6. remove b 7. sleep (3 sec) Code for prefect flow:
Copy code
from memory_profiler import profile
from prefect import Flow
from prefect import task
from time import sleep

with Flow('test flow') as flow:

    @task(
        name="a_task",
        tags=["etc"]
    )
    def a_task():
        @profile(precision=3)
        def _func_run():
            return [1] * (12 ** 8)

        sleep(3)

        return _func_run()

    a = a_task()

    del a

    a = None


    @task(
        name="b_task",
        tags=["etc"],
    )
    def b_task():
        @profile(precision=3)
        def _func_run():
            return [2] * (10 ** 8)

        sleep(3)

        return _func_run()

    b = b_task(upstream_tasks=[a])

    del b
    b = None


def flow_run():
    flow.run()
    sleep(3)


if __name__ == '__main__':
    flow_run()
Code for NONprefect flow:
Copy code
@profile(precision=3)
def func_run():

    def a_run():
        a = [1] * (12 ** 8)

        sleep(3)

        return a

    a = a_run()

    del a

    def b_run():

        b = [2] * (10 ** 8)

        sleep(3)

        return b

    b = b_run()

    del b

    return None

if __name__ == '__main__':
    func_run()
    sleep(3)
P.S. memory profiling is running via: python -m mprof run script.py python -m mprof plot
Memory profiling for prefect flow:
Memory profiling for nonprefect flow:
s

Spencer

01/24/2021, 10:20 PM
It seems that there is a misunderstanding of how Prefect executes the flows. The `del`s will have no impact during the execution of prefect.
If you don't want the Result to stay around, don't return any values from the task. Prefect encapsulates the return values in `Result`s incase of retries and passing the values along to downstream tasks.
upvote 1
Memory profiling for Prefect flow without returning values:
For what it's worth, doing the
del
and setting to
None
will simply screw up the dependency graph. The
None
is converted to a
ConstantTask
and made a dependency for
b_task
.
You can see the values that are returned originally by inspecting the result from
flow.run()
. This is where the memory is hanging around.
Copy code
flowrun = flow.run()
print(flowrun.result)
You'll see both task
Result
objects.
Copy code
flowrun.result[a].result <-- the return value
g

George Shishorin

01/24/2021, 11:06 PM
@Spencer, thank you for the answer! To be more precise, my question is related to this case: Flow logic: 1. create dataframe a 2. create dataframe b 3. create dataframe c by merging a and b 4. how to delete a and b from memory on this step?
Copy code
from memory_profiler import profile
from prefect import Flow
from prefect import task
from time import sleep

import pandas as pd


with Flow('test flow') as flow:

    @task(
        name="a_task",
        tags=["etc"]
    )
    def a_task():
        @profile(precision=3)
        def _func_run():
            return pd.DataFrame({'A': [1] * (12 ** 5)}).reset_index()

        sleep(3)

        return _func_run()

    a = a_task()


    @task(
        name="b_task",
        tags=["etc"],
    )
    def b_task():
        @profile(precision=3)
        def _func_run():
            return pd.DataFrame({'B': [2] * (10 ** 6)}).reset_index()

        sleep(3)

        return _func_run()

    b = b_task(upstream_tasks=[a])


    @task(
        name="Merge",
        tags=["pandas"],
    )
    def merge(left, right, how, columns):
        """Merge given dataframes."""
        return pd.merge(left=left, right=right, how=how, on=columns)


    c = merge(left=a, right=b, how='left', columns='index')


def flow_run():
    flow.run()
    sleep(3)


if __name__ == '__main__':
    flow_run()
profiling:
s

Spencer

01/24/2021, 11:07 PM
Write the dataframes to disk. As long as the dataframes are returned from the task, they will be attached to the Flow result.
upvote 2
🙏 1
⬅️ 1
g

George Shishorin

01/24/2021, 11:12 PM
So there is no way to detach tasks results during the flow running?
s

Spencer

01/24/2021, 11:17 PM
I think that would break a lot assumptions of how the flow running works. Ultimately it's retaining the results to pass the values along the flow and make them available after the flow run. Maybe there's a way to reach into the Flow object from inside a task but that's pretty dicey.
🙏 1
g

George Shishorin

01/25/2021, 9:04 AM
@Chris White, @nicholas @Zanie
2 Views