d

    David Ojeda

    3 years ago
    I have some questions regarding the caching mechanism on core when using the dask executor: 1. where are the inputs/parameters/results actually cached? In the dask scheduler, the dask worker or the python code that launches the flow, or elsewhere? 2. related to my first question, what is the lifetime of the cache (other than the duration set by
    cache_for
    task parameter) 3. when are the i/p/r cached? I came up with these questions because I am trying to leverage the cache for my case and I cannot manage to see the cache hit. I have some map tasks that are sequential to each other, with about 500 elements per map. After I wait until there are about ~200 finished tasks on the first map, I am purposedly killing my workers to see them re-do all of the work again and profit from the cache, but they are still doing each mapped task one by one. I ended up understanding what my problem was as I finished writing this ( 🦆 ) : we were rolling our own kind of “cache” with our data backend and we raised a state that derives Skipped, but the result caching does not occur when the resulting state is skipped. Even if I solved my problem, I am still posting these questions to be sure I understand the mechanism.
    Chris White

    Chris White

    3 years ago
    yea this is a great question; if you’re just using the engine, all caches persist in
    prefect.context.caches
    which is a dictionary of task identifier -> list of cached states
    the cache is never cleared unless you manually clear it / restart your process
    in Cloud, the caches persist in the database
    and to be clear, some added hooks / features were merged last night w.r.t. caching, so you might see slightly different behavior on older versions of Core
    d

    David Ojeda

    3 years ago
    Ah, cool, I am not using the bleeding edge, do you recommend updating now instead of waiting for a release ?
    … to be clear, the caching mechanism occurs on the worker then ? After there is a result, it gets cached there?
    Chris White

    Chris White

    3 years ago
    we’re actually planning on releasing first thing next week, so if you’re willing to wait that might be the best route
    well the cache is actually passed around and shared between the scheduler and the workers; updating the cache always occurs in the process where
    flow.run
    was called, querying the cache occurs in the workers
    d

    David Ojeda

    3 years ago
    Understood for the cache actions (updating / querying). Ok we will wait for the next release then !
    Chris White

    Chris White

    3 years ago
    awesome - I’ll ping you on here when it’s out!
    @Marvin archive “Questions on caching with Dask in Core”
    Marvin

    Marvin

    3 years ago
    Brad

    Brad

    3 years ago
    Hi guys, has there been any update to the caching mechanisms in the past few weeks? I’m interested in persisting results across runs to a local (or remote) cache. I would also like to be able to update a tasks status to cached based on the input parameters.
    d

    David Ojeda

    3 years ago
    Sorry I totally missed this message ; I don’t know if there has been an update on this. I think the prefect team has this kind of functionality on their cloud version. On my case (core), until recently I was persisting my cache locally by pickling the state results of a flow and loading them before running my flows. If you are still interested, I can copy some snippets for you next week (don’t hesitate to pm or mention me so I get a notification) That approach worked for a while but only because I had a weird flow that made dask die frequently. Now, we have fixed our tasks/flows and they are more robust and we are not needing a cache mechanism so much. Where I really think I the cache would be useful is to save the partial progress of a flow with many tasks. For example, when I have several maps (let’s say about a dozen with 1k elements each) and one of my dask workers die (let’s say that kubernetes decided to evict or kill a pod). In that case, I do lose a lot of time re-running the tasks that were lost because their results were saved on that particular worker. Unfortunately, I could not find an easy way to achieve this.
    Brad

    Brad

    3 years ago
    Hey David, no problem! I’ve actually been doing the same thing. I’d be keen to see how you solved this, compared with what I did. I’m currently using a subclassed
    TaskRunner
    , overloading
    check_task_is_cached
    and
    cache_result
    to check if something exists in the cache and writing any results to it post-run, this was the only place I could get the info I needed to cache the tasks properly. I’m interested in caching tasks indefinitely so long as the input parameters are the same, so I take a tokenization of the task module + task name + task kwargs and hash a key from it, which I write to some backend (file system, s3 etc)
    d

    David Ojeda

    3 years ago
    Hi Bradley ! Here is how we achieved that functionality. We first realized that the cache is used through a context argument and only in a scheduled run. So what we do is to load the pevious pickled cache before running a flow and pass it through a context. We thought about subclassing our on
    TaskRunner
    , but we decided to go this way (no particular reason). In code, it looks like this:
    def execute_flow(func, func_kwargs, executor, context_args):
        flow_parameters = func_kwargs.copy()
    
        # Create flow
        flow = func(**flow_parameters)  # type: prefect.core.Flow
        for p in list(flow_parameters):
            if p not in flow.parameters():
                flow_parameters.pop(p)
    
        # Read cached states from a local file
        cache_filename = (
                pathlib.Path(context_args['temp_dir']) /
                'cache' /
                'task_states.pickle'
        )
        context_args.setdefault('caches', {})
        try:
            <http://logger.info|logger.info>('Trying to restore previous cache from %s', cache_filename)
            previous_cache = load_pickle(cache_filename, None) or {}
            <http://logger.info|logger.info>('Restored cached had %d elements', len(previous_cache))
            context_args['caches'] = previous_cache
        except:
            logger.warning('Could not read cache at %s', cache_filename, exc_info=True)
    
        with prefect.context(**context_args) as context:
            flow_state = flow.run(parameters=flow_parameters,
                                  executor=executor,
                                  run_on_schedule=True)
            cache = context.caches
    
        # Save cached states to a local file
        try:
            <http://logger.info|logger.info>('Saving cache with %d elements to %s', len(cache), cache_filename)
            dump_pickle(cache, cache_filename)
        except:
            logger.warning('Could not save cache to %s', cache_filename, exc_info=True)
    
        return flow, flow_state