https://prefect.io logo
k

Krzysztof Nawara

10/06/2020, 5:39 PM
Hey, nice to meet everyone 🙂 I've got question about caching in Prefect - I'm trying to build a ML pipeline which can take advantage of caching, but can also selectively recompute some of the nodes (e.g. because logic changed, or new node was added). For that I was planning to keep track of version for each task and make it part of the cache key/target location. The bit I'm struggling the most with is invalidation of downstream tasks - if the task version get's bumped, downstreams cannot be read from cache. So far I've looked at _cache_key_ and _cache_validator_. If validators could access states of current execution of upstream tasks, they would be able to return False is state of at least one of the upstreams is different from Cached. Really neat solution, but from what I'm seeing cache_validator only get's access to previous state of current tasks and values of inputs, but not their states. I also looked at Result and target, but they seem even more restrictive, at least until PIN 16 is fully implemented. The only workaround I can think of is making version of each upstream task part of the cache key for the task. This is going to get tedious real fast unless there is a way to propagate that version through the pipeline and generate the cache keys/targets. But this feels a lot like a hack, so before attempting that I wanted to ask here if experts here can think of a better options. Thank you very much in advance, Chris
j

Jim Crist-Harif

10/06/2020, 6:01 PM
The bit I'm struggling the most with is invalidation of downstream tasks - if the task version get's bumped, downstreams cannot be read from cache.
Hi @Krzysztof Nawara, quick question - if an upstream task has changed its result, why would you not want to rerun a downstream task? We can likely make whatever caching scheme you want to work, but Prefect's caching scheme is generally designed around passing around data, so if an upstream task changes we generally want to rerun the downstream task.
j

Joe Schmid

10/06/2020, 6:09 PM
@Krzysztof Nawara FYI, I talked about similar functionality in this very old github issue: https://github.com/PrefectHQ/prefect/issues/1509 We haven't implemented that idea and I haven't revisited it. Prefect's caching has been much improved since then, but I've not looked at it closely.
k

Krzysztof Nawara

10/06/2020, 8:21 PM
@Jim Crist-Harif Sorry, I probably not explained it clearly. The behaviour your are describing is exactly what I'm going for - if upstream task has changed it results, downstream needs to be rerun. In case of Result and target setup the downstream doesn't care, it just reads the checkpoint. It seems that the same applies to cache_for & cache_key. Is that not the expected behaviour? Because that'd mean that what I'm looking for is already in Prefect and I must be doing something wrong
@Joe Schmid Thanks for the reference, I'm reading through it right now, although it'll take some time to digest it
j

Jim Crist-Harif

10/06/2020, 8:28 PM
If you have a
target
specified, if the templated target exists then it's used regardless if the inputs to the task have changed. Using a
target
is effectively creating your own caching scheme. If you omit the target and rely on prefect cache validators, the inputs will be hashed and compared to determine if the cache can be used.
But you could also use a
target
template, if you can template the target appropriately so that if inputs change the target path changes.
k

Krzysztof Nawara

10/06/2020, 8:37 PM
Inputs are hashed? That sounds like exactly what I need, I need to test it. Thanks!
👍 1
Somehow I was so stuck in a mode that cache should rely only on cache_key that I completly missed that 😄
I made this toy example to test this behaviour:
Copy code
from prefect import task, Task, Flow

from datetime import timedelta

@task(cache_for=timedelta(days=1), cache_key="4")
def t1():
    print("executing t1")
    return 99

@task(cache_for=timedelta(days=1))
def t2(t1):
    print("executing t2")
    return t1*2

with Flow("tf", result=LocalResult(dir="/tmp/prefect/v1/", location="{task_name}")) as tf:
    t1 = t1()
    t2 = t2(t1)

s = tf.run()
I was expecting that after changing cache_key on t1 and modyfing returned value, it should cause t2 to be rerun, without any additional modification to it. But it seems that only t1 gets rerun:
Copy code
[2020-10-06 20:47:15] INFO - prefect.FlowRunner | Beginning Flow run for 'tf'
[2020-10-06 20:47:15] INFO - prefect.FlowRunner | Starting flow run.
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't1': Starting task run...
[2020-10-06 20:47:15] WARNING - prefect.TaskRunner | Task 't1': can't use cache because it is now invalid
executing t1
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't1': finished task run for task with final state: 'Cached'
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't2': Starting task run...
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't2': finished task run for task with final state: 'Cached'
[2020-10-06 20:47:15] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Hm I added
all_inputs
cache validator and now it seems to work, but only if I change both key and returned value.
Okay, so the caching part is working. Now I'm trying to also save the results for inspection by external tools (example: train model first, then evaluate and save scores as pandas dataframe for further analysis). So I added
result=LocalResult(dir="/tmp/prefect/v3/", location="{task_name}")
to my flow, but it seems that results only get written when task gets recomputed - if it was read from cache no writing occurs. Is there a way to disable this behaviour and write always? I didn't want to use target fearing that it might interfer with caching behaviour
j

Jim Crist-Harif

10/06/2020, 9:27 PM
but it seems that results only get written when task gets recomputed - if it was read from cache no writing occurs.
Sorry, I'm not sure I follow - isn't this the whole point of caching? If a task doesn't need to be run, then it doesn't need to write its result. When running with cloud/server, the cached value is stored in the result rather than in memory, so there's no way to get one without the other, either a task is run and the output is written, or the cached value is used and the task isn't run (and the value was previously written).
What are you hoping/expecting to happen here?
k

Krzysztof Nawara

10/07/2020, 7:23 AM
Good point, let me take a step back maybe I'm making things unnecessarily complicated. 1. I started with a simple local flow which runs my experiments, and then I use flow.get_tasks() to access scores and other statistics for those models 2. But it started taking a long time. And when I only add a new model, I don't need to recompute all the rest - so I want to cache it. 3. But I still want to have complete set of results for analysis, for all of the models (e.g. to plot all of them on a single plot to compare their performance). 4. If I want to have persistent cache, I need to use server. But unlike flow.run() when submitting to server you can't simply get state of the pipeline (with all task results) back, so they need to be written to external location. EDIT: If I just tell all runs to save in the same folder, I'd have exactly the behaviour I need, so it's all good