https://prefect.io logo
e

Equipe AI HOC

01/04/2021, 1:06 PM
Hello! Question about caching: I've seen in the docs about
cache_for
,
cache_validator
,
cache_key
. For what I understood, this will not cache between flow runs (different process) unless you configure
target
,
result
and
checkpoint
. Is that correct? When I configure these last 3 "persistance" options, it caches between flows, but they don't seem to work integrated with
cache_validator
(and any of the first 3 "cache" options for that matter). Is that correct? Are these independent mechanisms? What I want to achieve, put simply, is to cache between flow runs/processes and have a function (cache_validator) to check if the cache/file is still valid.
k

Kyle Moon-Wright

01/04/2021, 6:40 PM
Hey @Equipe AI HOC, Your conclusions about the usage of the cache are all correct and you should be able to cache between flow runs. However, to utilize the
cache_validator
, you must configure a duration for the
cache_for
kwarg, so maybe if you provide some code we can determine the issue you were encountering.
e

Equipe AI HOC

01/04/2021, 8:43 PM
Thanks for the reply. If I have the following task:
Copy code
@task(cache_key="test", cache_for = timedelta(seconds=3600), cache_validator = lambda a,b,c: True)
def cache_test(a, b):
    return a + b
And flow (
load.
is just a namespace):
Copy code
with Flow("Cache Test") as flow:
    r1 = load.cache_test(1,2)
flow.run()
The output of 2 consecutive local runs will be:
Copy code
[2021-01-04 17:44:49-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test'
[2021-01-04 17:44:49-0300] INFO - prefect.TaskRunner | Task 'cache_test': Starting task run...
[2021-01-04 17:44:49-0300] WARNING - prefect.TaskRunner | Task 'cache_test': Can't use cache because it is now invalid
[2021-01-04 17:44:49-0300] INFO - prefect.TaskRunner | Task 'cache_test': Finished task run for task with final state: 'Cached'
[2021-01-04 17:44:49-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Copy code
[2021-01-04 17:45:02-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test'
[2021-01-04 17:45:02-0300] INFO - prefect.TaskRunner | Task 'cache_test': Starting task run...
[2021-01-04 17:45:02-0300] WARNING - prefect.TaskRunner | Task 'cache_test': Can't use cache because it is now invalid
[2021-01-04 17:45:02-0300] INFO - prefect.TaskRunner | Task 'cache_test': Finished task run for task with final state: 'Cached'
[2021-01-04 17:45:02-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
The cache is always invalid, regardless of the
cache_validator
I use (I've set one that always returns true). Which makes sense per documentation, that says it is an in-memory cache and if you want to persist cache between processes/runs, you will have to configure a storage via
target
,
checkpoint
and
result
. Now if you have a task like:
Copy code
@task(target="test2", checkpoint=True, result=LocalResult(dir=config["wd"] + "cache"))
def cache_test2(a, b):
    return a + b
And flow:
Copy code
with Flow("Cache Test 2") as flow2:
    r1 = load.cache_test2(1,2)
flow2.run()
The ouput of 2 consecutive runs will be
Copy code
[2021-01-04 17:40:59-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test 2'
[2021-01-04 17:40:59-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Starting task run...
[2021-01-04 17:40:59-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Finished task run for task with final state: 'Success'
[2021-01-04 17:40:59-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Copy code
[2021-01-04 17:41:07-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test 2'
[2021-01-04 17:41:07-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Starting task run...
[2021-01-04 17:41:07-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Finished task run for task with final state: 'Cached'
[2021-01-04 17:41:08-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Meaning the cache worked! Great! The problem I am having is if I try to mix these decorators, in like:
Copy code
@task(cache_for = timedelta(seconds=1), cache_validator = lambda a,b,c: False,
target="test3", checkpoint=True, result=LocalResult(dir=config["wd"] + "cache"))
def cache_test3(a, b):
    return a + b
The output is always (in any run):
Copy code
[2021-01-04 18:06:59-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test 3'
[2021-01-04 18:06:59-0300] INFO - prefect.TaskRunner | Task 'cache_test3': Starting task run...
[2021-01-04 18:06:59-0300] INFO - prefect.TaskRunner | Task 'cache_test3': Finished task run for task with final state: 'Cached'
[2021-01-04 18:06:59-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
It always caches or utilizes the cache, never invalidates it, regardless of
cache_for
or
cache_validator
. I want to persist the cache in file (to persist between runs/processes) and let Prefect handle the validation. But these seem to be separated mechanism. I can't use both at once. Meaning I would have to implement cache validation logic instead of using the
cache_**
decorators. Is that so? Sorry for the long post!
🤔 1
k

Kyle Moon-Wright

01/04/2021, 9:26 PM
No thanks for the detail! I’ll need to get additional insight on this, but at first glance I think your conclusion is correct that these are separate mechanisms and perhaps the best course of action is to use solely the target for persistence with a custom result validator, putting the cache_validator down altogether.
I’ll get back to you with some more insight on this.