David Ojeda
07/11/2019, 4:27 PMcache_for
task parameter)
3. when are the i/p/r cached?
I came up with these questions because I am trying to leverage the cache for my case and I cannot manage to see the cache hit. I have some map tasks that are sequential to each other, with about 500 elements per map. After I wait until there are about ~200 finished tasks on the first map, I am purposedly killing my workers to see them re-do all of the work again and profit from the cache, but they are still doing each mapped task one by one.
I ended up understanding what my problem was as I finished writing this ( 🦆 ) : we were rolling our own kind of “cache” with our data backend and we raised a state that derives Skipped, but the result caching does not occur when the resulting state is skipped.
Even if I solved my problem, I am still posting these questions to be sure I understand the mechanism.Chris White
prefect.context.caches
which is a dictionary of task identifier -> list of cached statesDavid Ojeda
07/11/2019, 5:09 PMChris White
flow.run
was called, querying the cache occurs in the workersDavid Ojeda
07/11/2019, 5:14 PMChris White
Marvin
08/05/2019, 9:37 PMBrad
08/23/2019, 11:23 AMDavid Ojeda
09/20/2019, 9:00 PMBrad
09/21/2019, 3:57 AMTaskRunner
, overloading check_task_is_cached
and cache_result
to check if something exists in the cache and writing any results to it post-run, this was the only place I could get the info I needed to cache the tasks properly. I’m interested in caching tasks indefinitely so long as the input parameters are the same, so I take a tokenization of the task module + task name + task kwargs and hash a key from it, which I write to some backend (file system, s3 etc)David Ojeda
09/23/2019, 8:24 AMTaskRunner
, but we decided to go this way (no particular reason).
In code, it looks like this:
def execute_flow(func, func_kwargs, executor, context_args):
flow_parameters = func_kwargs.copy()
# Create flow
flow = func(**flow_parameters) # type: prefect.core.Flow
for p in list(flow_parameters):
if p not in flow.parameters():
flow_parameters.pop(p)
# Read cached states from a local file
cache_filename = (
pathlib.Path(context_args['temp_dir']) /
'cache' /
'task_states.pickle'
)
context_args.setdefault('caches', {})
try:
<http://logger.info|logger.info>('Trying to restore previous cache from %s', cache_filename)
previous_cache = load_pickle(cache_filename, None) or {}
<http://logger.info|logger.info>('Restored cached had %d elements', len(previous_cache))
context_args['caches'] = previous_cache
except:
logger.warning('Could not read cache at %s', cache_filename, exc_info=True)
with prefect.context(**context_args) as context:
flow_state = flow.run(parameters=flow_parameters,
executor=executor,
run_on_schedule=True)
cache = context.caches
# Save cached states to a local file
try:
<http://logger.info|logger.info>('Saving cache with %d elements to %s', len(cache), cache_filename)
dump_pickle(cache, cache_filename)
except:
logger.warning('Could not save cache to %s', cache_filename, exc_info=True)
return flow, flow_state