Equipe AI HOC
01/04/2021, 1:06 PMcache_for
, cache_validator
, cache_key
. For what I understood, this will not cache between flow runs (different process) unless you configure target
, result
and checkpoint
. Is that correct? When I configure these last 3 "persistance" options, it caches between flows, but they don't seem to work integrated with cache_validator
(and any of the first 3 "cache" options for that matter). Is that correct? Are these independent mechanisms? What I want to achieve, put simply, is to cache between flow runs/processes and have a function (cache_validator) to check if the cache/file is still valid.Kyle Moon-Wright
01/04/2021, 6:40 PMcache_validator
, you must configure a duration for the cache_for
kwarg, so maybe if you provide some code we can determine the issue you were encountering.Equipe AI HOC
01/04/2021, 8:43 PM@task(cache_key="test", cache_for = timedelta(seconds=3600), cache_validator = lambda a,b,c: True)
def cache_test(a, b):
return a + b
And flow (load.
is just a namespace):
with Flow("Cache Test") as flow:
r1 = load.cache_test(1,2)
flow.run()
The output of 2 consecutive local runs will be:
[2021-01-04 17:44:49-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test'
[2021-01-04 17:44:49-0300] INFO - prefect.TaskRunner | Task 'cache_test': Starting task run...
[2021-01-04 17:44:49-0300] WARNING - prefect.TaskRunner | Task 'cache_test': Can't use cache because it is now invalid
[2021-01-04 17:44:49-0300] INFO - prefect.TaskRunner | Task 'cache_test': Finished task run for task with final state: 'Cached'
[2021-01-04 17:44:49-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-04 17:45:02-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test'
[2021-01-04 17:45:02-0300] INFO - prefect.TaskRunner | Task 'cache_test': Starting task run...
[2021-01-04 17:45:02-0300] WARNING - prefect.TaskRunner | Task 'cache_test': Can't use cache because it is now invalid
[2021-01-04 17:45:02-0300] INFO - prefect.TaskRunner | Task 'cache_test': Finished task run for task with final state: 'Cached'
[2021-01-04 17:45:02-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
The cache is always invalid, regardless of the cache_validator
I use (I've set one that always returns true). Which makes sense per documentation, that says it is an in-memory cache and if you want to persist cache between processes/runs, you will have to configure a storage via target
, checkpoint
and result
.
Now if you have a task like:
@task(target="test2", checkpoint=True, result=LocalResult(dir=config["wd"] + "cache"))
def cache_test2(a, b):
return a + b
And flow:
with Flow("Cache Test 2") as flow2:
r1 = load.cache_test2(1,2)
flow2.run()
The ouput of 2 consecutive runs will be
[2021-01-04 17:40:59-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test 2'
[2021-01-04 17:40:59-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Starting task run...
[2021-01-04 17:40:59-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Finished task run for task with final state: 'Success'
[2021-01-04 17:40:59-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-04 17:41:07-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test 2'
[2021-01-04 17:41:07-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Starting task run...
[2021-01-04 17:41:07-0300] INFO - prefect.TaskRunner | Task 'cache_test2': Finished task run for task with final state: 'Cached'
[2021-01-04 17:41:08-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Meaning the cache worked! Great!
The problem I am having is if I try to mix these decorators, in like:
@task(cache_for = timedelta(seconds=1), cache_validator = lambda a,b,c: False,
target="test3", checkpoint=True, result=LocalResult(dir=config["wd"] + "cache"))
def cache_test3(a, b):
return a + b
The output is always (in any run):
[2021-01-04 18:06:59-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'Cache Test 3'
[2021-01-04 18:06:59-0300] INFO - prefect.TaskRunner | Task 'cache_test3': Starting task run...
[2021-01-04 18:06:59-0300] INFO - prefect.TaskRunner | Task 'cache_test3': Finished task run for task with final state: 'Cached'
[2021-01-04 18:06:59-0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
It always caches or utilizes the cache, never invalidates it, regardless of cache_for
or cache_validator
.
I want to persist the cache in file (to persist between runs/processes) and let Prefect handle the validation. But these seem to be separated mechanism. I can't use both at once. Meaning I would have to implement cache validation logic instead of using the cache_**
decorators. Is that so?
Sorry for the long post!Kyle Moon-Wright
01/04/2021, 9:26 PMKyle Moon-Wright
01/04/2021, 9:27 PM