Kelvin DeCosta
12/23/2022, 11:12 AMfrom more_itertools import chunked
for tasks_inputs in chunked(long_list_of_tasks_inputs, 25):
# specifying return state as True,
# since I don't want a failed task to fail the whole flow immediately
await my_task.map(input=tasks_inputs, return_state=True)
This seems to work nicely.
However, after some time, the infrastructure, runs out of its 8 GB memory. (Prefect doesn't update the flow state, which isn't the issue right now).
Looking at the infra memory usage graph, there is an almost linear increase in usage with time (as new tasks are running) until it reaches 89-99% and then crashes.
For more context, the long_list_of_tasks_inputs is just a list of 22k strings. It shouldn't be an issue.
From what I can tell, my_task doesn't return anything and so it shouldn't be hogging RAM.
Ideally, the memory usage should only reflect the variables used by the flow and tasks, and in the case of the task, these should be dropped by the garbage collector.
What do you think I could do to solve this?
• Should I mark my_task with cache_result_in_memory=False and persist_result=False as well?
• my_task logs certain statements. Could this affect the memory usage?
• Would using the _ buffer help? _ = await my_task.map(input=tasks_inputs, return_state=True)?Kelvin DeCosta
12/23/2022, 11:13 AMKelvin DeCosta
12/27/2022, 3:40 PMShould I markThis massively improved the situation. What's weird is thatwithmy_taskandcache_result_in_memory=Falseas well?persist_result=False
my_task doesn't even return anything (i.e. None) and yet there was a huge build up of some memory used to cache its result. I find it a bit annoying that I need to explicitly set these flags, but I'm glad I found a solutionKyle Austin
03/03/2023, 3:10 PM