Krzysztof Nawara
10/06/2020, 5:39 PMJim Crist-Harif
10/06/2020, 6:01 PMThe bit I'm struggling the most with is invalidation of downstream tasks - if the task version get's bumped, downstreams cannot be read from cache.Hi @Krzysztof Nawara, quick question - if an upstream task has changed its result, why would you not want to rerun a downstream task? We can likely make whatever caching scheme you want to work, but Prefect's caching scheme is generally designed around passing around data, so if an upstream task changes we generally want to rerun the downstream task.
Joe Schmid
10/06/2020, 6:09 PMKrzysztof Nawara
10/06/2020, 8:21 PMJim Crist-Harif
10/06/2020, 8:28 PMtarget
specified, if the templated target exists then it's used regardless if the inputs to the task have changed. Using a target
is effectively creating your own caching scheme. If you omit the target and rely on prefect cache validators, the inputs will be hashed and compared to determine if the cache can be used.target
template, if you can template the target appropriately so that if inputs change the target path changes.Krzysztof Nawara
10/06/2020, 8:37 PMfrom prefect import task, Task, Flow
from datetime import timedelta
@task(cache_for=timedelta(days=1), cache_key="4")
def t1():
print("executing t1")
return 99
@task(cache_for=timedelta(days=1))
def t2(t1):
print("executing t2")
return t1*2
with Flow("tf", result=LocalResult(dir="/tmp/prefect/v1/", location="{task_name}")) as tf:
t1 = t1()
t2 = t2(t1)
s = tf.run()
I was expecting that after changing cache_key on t1 and modyfing returned value, it should cause t2 to be rerun, without any additional modification to it.
But it seems that only t1 gets rerun:
[2020-10-06 20:47:15] INFO - prefect.FlowRunner | Beginning Flow run for 'tf'
[2020-10-06 20:47:15] INFO - prefect.FlowRunner | Starting flow run.
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't1': Starting task run...
[2020-10-06 20:47:15] WARNING - prefect.TaskRunner | Task 't1': can't use cache because it is now invalid
executing t1
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't1': finished task run for task with final state: 'Cached'
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't2': Starting task run...
[2020-10-06 20:47:15] INFO - prefect.TaskRunner | Task 't2': finished task run for task with final state: 'Cached'
[2020-10-06 20:47:15] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
all_inputs
cache validator and now it seems to work, but only if I change both key and returned value.result=LocalResult(dir="/tmp/prefect/v3/", location="{task_name}")
to my flow, but it seems that results only get written when task gets recomputed - if it was read from cache no writing occurs. Is there a way to disable this behaviour and write always? I didn't want to use target fearing that it might interfer with caching behaviourJim Crist-Harif
10/06/2020, 9:27 PMbut it seems that results only get written when task gets recomputed - if it was read from cache no writing occurs.Sorry, I'm not sure I follow - isn't this the whole point of caching? If a task doesn't need to be run, then it doesn't need to write its result. When running with cloud/server, the cached value is stored in the result rather than in memory, so there's no way to get one without the other, either a task is run and the output is written, or the cached value is used and the task isn't run (and the value was previously written).
Krzysztof Nawara
10/07/2020, 7:23 AM