I've been having memory usage build-up issues with...
# best-practices
k
I've been having memory usage build-up issues with long-running tasks that call (via
.map
) many tasks (10k-100k). I'm using the
SequentialTaskRunner
and have noticed that the task runner keeps a
_results
dict of ALL task results. It even stores the results of tasks that are no longer "needed". What harm do I cause by running something like
get_run_context().task_runner._results = {}
, when I know that the previous task results are not needed?
z
Hm. The task runner could drop the reference to the results once they’re attached to a state.
I’m not sure what the implications are though. It’s definitely risky to clear it out.
👀 1
k
Thank you for the response, Zanie! We tried setting
cache_result_in_memory
as
False
in the past and it moved the problem to the disk usage. Kept getting
OSError: Code 28 No space on device
.
Here's what our flow looks like:
Copy code
@flow(
    name="Multi Greeter",
    task_runner=SequentialTaskRunner(),
)
async def multi_greeter():
    names = [
        "Kelvin",
        "Zanie",
        "Marvin",
    ] * 10 # Batch size of 30

    for batch in range(1000): # Approx number of batches
        # First set of mapped tasks
        greet_states = await greet.map(name=names, return_state=True)  # type: ignore
        greetings = [state.result() for state in greet_states if state.is_completed()]

        # Second set of mapped tasks that take previous results as input
        somethings_states = await get_something_from_greetings.map(message=greetings, return_state=True)  # type: ignore
        somethings = [state.result() for state in somethings_states if state.is_completed()]

        task_that_saves_to_database(somethings, return_state=True)
greet
,
get_seomthing_from_greeting~s~
, and
task_that_saves_to_database
, are all configured to use
cache_result_in_memory=False
,
z
You can disable persistence and caching of the result in memory if you know a task result will not be needed downstream
I don’t think
cache_result_in_memory
accounts for the task runner result passing mechanism though if you’re using the sequential one
👍🏽 1
k
So both
cache_result_in_memory
and
persist_result
would be set to
False
? Would that be fine when using
state.result()
? I was under the impression that whenever
state.result()
is used we must have either of configurations as
True
z
Yes you could set them both to false. No Yes that would fail when you try to retrieve the result.
There’s an example in the doc I linked for that case.
k
Ah okay! Thank you!
z
I don’t think it solves your problem if you do need the result downstream though
One option would be to manage your results manually, i.e. put them into a global dictionary and clear them as needed — do not return any data from the task itself.
That’s kind of meh but if you’re both disk and memory constrained it’s a little tricky for us to know when it’s okay to release the result
We can’t know if you’ll use the task downstream since the DAG is dynamic
1
k
I just ran a quick test: I made both properties
False
and the above code worked fine. But when I changed
Copy code
messages = [state.result() for state in greet_states if state.is_completed()]
to
Copy code
messages = asyncio.gather(
            *[
                state.result(fetch=True)
                for state in greet_states
                if state.is_completed()
            ]
        )
It failed with
prefect.exceptions.MissingResult
The global dictionary makes a lot of sense
I think I'll refactor one flow and see what the performance is like
Hey @Zanie I took your advice and refactored some of the flows to use
global
variables for responses. Essentially, the global variables are reset to empty lists / dicts whenever a new batch of mapped tasks begin. I"m also invoking a method that resets the
_results
dict on the task runner. The memory usage growth is much less than before, but there is still a steady increase. Flows started at an avg mem consumption of 10% and after 14 hours they're at 25%. I feel like I'm missing something. The barebones python code shouldn't see such linear increases in memory usage over time. I'd really appreciate any help with this matter! Thank you again for your guidance
z
We are tracking all of the task run states still so if you're spawning a ton of task runs the flow will be holding that information
k
Hey @Zanie , I hope you’re doing well! Is there a way I can let go of the memory used by the task run states? All of our flows are long-running and spawn many tasks over their runtimes. Without prefect, a simple 0.25vCPU and 512Mb ECS task would be sufficient, but because prefect slowly stacks up memory for tracking task run states, we need to use much more expensive compute settings to ensure that it doesn’t crash after running out of memory.
Is there a way to disable in-memory tracking of task run states or maybe have it only persist to a database / secondary memory?
z
I think it’d be expensive to collect them all from the database at the end
Can you put groups of tasks in subflows? If you return a value from the subflow (i.e.
True
) then we should release all those task states from memory.
k
Hmm yes I think I can do that! I’m mapping tasks in batches of 10-50 (depending on the flow), so I could wrap that part inside a subflow. Would it also release the states from the task runner
_results
dictionary?
z
Each subflow would get a new task runner instance which would shutdown and be released, yeah.
k
That makes so much sense!!
Will keep you posted on the update
I'm wondering if specifying
task_runner=SequentialTaskRunner()
for the
batch_subflow
flow would change anything. It is still creating a new task runner instance for each call to
batch_subflow
right?
j
@Kelvin DeCosta were you able to get the memory leak fixed via a subflow? I tried that and still have the same issue you experienced over time of an OOM due to memory build up
I also return True on that subflow
I think it is working I was looking at cpu and not mem