Kelvin DeCosta
03/15/2023, 12:12 PM.map
) many tasks (10k-100k).
I'm using the SequentialTaskRunner
and have noticed that the task runner keeps a _results
dict of ALL task results. It even stores the results of tasks that are no longer "needed".
What harm do I cause by running something like get_run_context().task_runner._results = {}
, when I know that the previous task results are not needed?Zanie
Zanie
Zanie
Kelvin DeCosta
03/15/2023, 2:05 PMcache_result_in_memory
as False
in the past and it moved the problem to the disk usage.
Kept getting OSError: Code 28 No space on device
.Kelvin DeCosta
03/15/2023, 2:09 PM@flow(
name="Multi Greeter",
task_runner=SequentialTaskRunner(),
)
async def multi_greeter():
names = [
"Kelvin",
"Zanie",
"Marvin",
] * 10 # Batch size of 30
for batch in range(1000): # Approx number of batches
# First set of mapped tasks
greet_states = await greet.map(name=names, return_state=True) # type: ignore
greetings = [state.result() for state in greet_states if state.is_completed()]
# Second set of mapped tasks that take previous results as input
somethings_states = await get_something_from_greetings.map(message=greetings, return_state=True) # type: ignore
somethings = [state.result() for state in somethings_states if state.is_completed()]
task_that_saves_to_database(somethings, return_state=True)
Kelvin DeCosta
03/15/2023, 2:10 PMgreet
, get_seomthing_from_greeting~s~
, and task_that_saves_to_database
, are all configured to use cache_result_in_memory=False
,Zanie
Zanie
cache_result_in_memory
accounts for the task runner result passing mechanism though if you’re using the sequential oneKelvin DeCosta
03/15/2023, 2:16 PMcache_result_in_memory
and persist_result
would be set to False
? Would that be fine when using state.result()
?
I was under the impression that whenever state.result()
is used we must have either of configurations as True
Zanie
Zanie
Kelvin DeCosta
03/15/2023, 2:19 PMZanie
Zanie
Zanie
Zanie
Kelvin DeCosta
03/15/2023, 2:21 PMFalse
and the above code worked fine.
But when I changed
messages = [state.result() for state in greet_states if state.is_completed()]
to
messages = asyncio.gather(
*[
state.result(fetch=True)
for state in greet_states
if state.is_completed()
]
)
It failed with prefect.exceptions.MissingResult
Kelvin DeCosta
03/15/2023, 2:40 PMKelvin DeCosta
03/15/2023, 2:41 PMKelvin DeCosta
03/16/2023, 10:00 AMglobal
variables for responses.
Essentially, the global variables are reset to empty lists / dicts whenever a new batch of mapped tasks begin.
I"m also invoking a method that resets the _results
dict on the task runner.
The memory usage growth is much less than before, but there is still a steady increase.
Flows started at an avg mem consumption of 10% and after 14 hours they're at 25%.
I feel like I'm missing something.
The barebones python code shouldn't see such linear increases in memory usage over time.
I'd really appreciate any help with this matter!
Thank you again for your guidanceZanie
Kelvin DeCosta
03/30/2023, 6:42 AMKelvin DeCosta
03/30/2023, 6:43 AMZanie
Zanie
True
) then we should release all those task states from memory.Kelvin DeCosta
03/30/2023, 5:34 PM_results
dictionary?Zanie
Kelvin DeCosta
03/31/2023, 7:30 AMKelvin DeCosta
03/31/2023, 7:30 AMKelvin DeCosta
03/31/2023, 10:34 AMtask_runner=SequentialTaskRunner()
for the batch_subflow
flow would change anything. It is still creating a new task runner instance for each call to batch_subflow
right?John Horn
06/28/2023, 11:32 PMJohn Horn
06/28/2023, 11:33 PMJohn Horn
06/28/2023, 11:46 PM