https://prefect.io logo
d

David Ojeda

07/11/2019, 4:27 PM
I have some questions regarding the caching mechanism on core when using the dask executor: 1. where are the inputs/parameters/results actually cached? In the dask scheduler, the dask worker or the python code that launches the flow, or elsewhere? 2. related to my first question, what is the lifetime of the cache (other than the duration set by
cache_for
task parameter) 3. when are the i/p/r cached? I came up with these questions because I am trying to leverage the cache for my case and I cannot manage to see the cache hit. I have some map tasks that are sequential to each other, with about 500 elements per map. After I wait until there are about ~200 finished tasks on the first map, I am purposedly killing my workers to see them re-do all of the work again and profit from the cache, but they are still doing each mapped task one by one. I ended up understanding what my problem was as I finished writing this ( 🦆 ) : we were rolling our own kind of “cache” with our data backend and we raised a state that derives Skipped, but the result caching does not occur when the resulting state is skipped. Even if I solved my problem, I am still posting these questions to be sure I understand the mechanism.
c

Chris White

07/11/2019, 4:54 PM
yea this is a great question; if you’re just using the engine, all caches persist in
prefect.context.caches
which is a dictionary of task identifier -> list of cached states
the cache is never cleared unless you manually clear it / restart your process
in Cloud, the caches persist in the database
and to be clear, some added hooks / features were merged last night w.r.t. caching, so you might see slightly different behavior on older versions of Core
d

David Ojeda

07/11/2019, 5:09 PM
Ah, cool, I am not using the bleeding edge, do you recommend updating now instead of waiting for a release ?
… to be clear, the caching mechanism occurs on the worker then ? After there is a result, it gets cached there?
c

Chris White

07/11/2019, 5:10 PM
we’re actually planning on releasing first thing next week, so if you’re willing to wait that might be the best route
well the cache is actually passed around and shared between the scheduler and the workers; updating the cache always occurs in the process where
flow.run
was called, querying the cache occurs in the workers
d

David Ojeda

07/11/2019, 5:14 PM
Understood for the cache actions (updating / querying). Ok we will wait for the next release then !
c

Chris White

07/11/2019, 5:14 PM
awesome - I’ll ping you on here when it’s out!
@Marvin archive “Questions on caching with Dask in Core”
b

Brad

08/23/2019, 11:23 AM
Hi guys, has there been any update to the caching mechanisms in the past few weeks? I’m interested in persisting results across runs to a local (or remote) cache. I would also like to be able to update a tasks status to cached based on the input parameters.
d

David Ojeda

09/20/2019, 9:00 PM
Sorry I totally missed this message ; I don’t know if there has been an update on this. I think the prefect team has this kind of functionality on their cloud version. On my case (core), until recently I was persisting my cache locally by pickling the state results of a flow and loading them before running my flows. If you are still interested, I can copy some snippets for you next week (don’t hesitate to pm or mention me so I get a notification) That approach worked for a while but only because I had a weird flow that made dask die frequently. Now, we have fixed our tasks/flows and they are more robust and we are not needing a cache mechanism so much. Where I really think I the cache would be useful is to save the partial progress of a flow with many tasks. For example, when I have several maps (let’s say about a dozen with 1k elements each) and one of my dask workers die (let’s say that kubernetes decided to evict or kill a pod). In that case, I do lose a lot of time re-running the tasks that were lost because their results were saved on that particular worker. Unfortunately, I could not find an easy way to achieve this.
b

Brad

09/21/2019, 3:57 AM
Hey David, no problem! I’ve actually been doing the same thing. I’d be keen to see how you solved this, compared with what I did. I’m currently using a subclassed
TaskRunner
, overloading
check_task_is_cached
and
cache_result
to check if something exists in the cache and writing any results to it post-run, this was the only place I could get the info I needed to cache the tasks properly. I’m interested in caching tasks indefinitely so long as the input parameters are the same, so I take a tokenization of the task module + task name + task kwargs and hash a key from it, which I write to some backend (file system, s3 etc)
d

David Ojeda

09/23/2019, 8:24 AM
Hi Bradley ! Here is how we achieved that functionality. We first realized that the cache is used through a context argument and only in a scheduled run. So what we do is to load the pevious pickled cache before running a flow and pass it through a context. We thought about subclassing our on
TaskRunner
, but we decided to go this way (no particular reason). In code, it looks like this:
Copy code
def execute_flow(func, func_kwargs, executor, context_args):
    flow_parameters = func_kwargs.copy()

    # Create flow
    flow = func(**flow_parameters)  # type: prefect.core.Flow
    for p in list(flow_parameters):
        if p not in flow.parameters():
            flow_parameters.pop(p)

    # Read cached states from a local file
    cache_filename = (
            pathlib.Path(context_args['temp_dir']) /
            'cache' /
            'task_states.pickle'
    )
    context_args.setdefault('caches', {})
    try:
        <http://logger.info|logger.info>('Trying to restore previous cache from %s', cache_filename)
        previous_cache = load_pickle(cache_filename, None) or {}
        <http://logger.info|logger.info>('Restored cached had %d elements', len(previous_cache))
        context_args['caches'] = previous_cache
    except:
        logger.warning('Could not read cache at %s', cache_filename, exc_info=True)

    with prefect.context(**context_args) as context:
        flow_state = flow.run(parameters=flow_parameters,
                              executor=executor,
                              run_on_schedule=True)
        cache = context.caches

    # Save cached states to a local file
    try:
        <http://logger.info|logger.info>('Saving cache with %d elements to %s', len(cache), cache_filename)
        dump_pickle(cache, cache_filename)
    except:
        logger.warning('Could not save cache to %s', cache_filename, exc_info=True)

    return flow, flow_state