Kelvin DeCosta
12/23/2022, 11:12 AMfrom more_itertools import chunked
for tasks_inputs in chunked(long_list_of_tasks_inputs, 25):
# specifying return state as True,
# since I don't want a failed task to fail the whole flow immediately
await my_task.map(input=tasks_inputs, return_state=True)
This seems to work nicely.
However, after some time, the infrastructure, runs out of its 8 GB
memory. (Prefect doesn't update the flow state, which isn't the issue right now).
Looking at the infra memory usage graph, there is an almost linear increase in usage with time (as new tasks are running) until it reaches 89-99% and then crashes.
For more context, the long_list_of_tasks_inputs
is just a list of 22k strings. It shouldn't be an issue.
From what I can tell, my_task
doesn't return anything and so it shouldn't be hogging RAM.
Ideally, the memory usage should only reflect the variables used by the flow and tasks, and in the case of the task, these should be dropped by the garbage collector.
What do you think I could do to solve this?
• Should I mark my_task
with cache_result_in_memory=False
and persist_result=False
as well?
• my_task
logs certain statements. Could this affect the memory usage?
• Would using the _
buffer help? _ = await my_task.map(input=tasks_inputs, return_state=True)
?Should I markThis massively improved the situation. What's weird is thatwithmy_task
andcache_result_in_memory=False
as well?persist_result=False
my_task
doesn't even return anything (i.e. None
) and yet there was a huge build up of some memory used to cache its result. I find it a bit annoying that I need to explicitly set these flags, but I'm glad I found a solutionKyle Austin
03/03/2023, 3:10 PM