https://prefect.io logo
#prefect-community
Title
# prefect-community
z

Zach Jablons

03/09/2023, 11:22 PM
Hi there, I'm trying to figure out how I would leverage the caching functionality to ensure that any dependent tasks are re-run after a parent is re-run. 🧵
The test code is here:
Copy code
from prefect import flow, task                                          
from prefect.tasks import task_input_hash                               
                                                                        
@task(cache_key_fn=task_input_hash, refresh_cache=True)                 
def a():                                                                
    print("executing a")                                                
    return 1                                                            
                                                                        
@task(cache_key_fn=task_input_hash)                                     
def b(result_from_a):                                                   
    print("executing b")                                                
    return result_from_a + 2                                            
                                                                        
@flow                                                                   
def test1():                                                            
    res_a = a()                                                         
    return b(res_a)                                                     
                                                                        
if __name__ == "__main__":                                              
    print(test1())
The behavior I'm hoping to see is that, when running this, no matter what the cached state of
b
is, it reruns because
a
was rerun
However that isn't what happens:
Copy code
18:23:25.064 | INFO    | prefect.engine - Created flow run 'pearl-seahorse' for flow 'test1'
18:23:25.205 | INFO    | Flow run 'pearl-seahorse' - Created task run 'a-0' for task 'a'
18:23:25.206 | INFO    | Flow run 'pearl-seahorse' - Executing 'a-0' immediately...
executing a
18:23:25.281 | INFO    | Task run 'a-0' - Finished in state Completed()
18:23:25.308 | INFO    | Flow run 'pearl-seahorse' - Created task run 'b-0' for task 'b'
18:23:25.309 | INFO    | Flow run 'pearl-seahorse' - Executing 'b-0' immediately...
18:23:25.358 | INFO    | Task run 'b-0' - Finished in state Cached(type=COMPLETED)
18:23:25.401 | INFO    | Flow run 'pearl-seahorse' - Finished in state Completed()
3
I can see that
a
is re-run, but
b
is not - and the result of course still uses the old value from
b
I've looked into the
TaskRunContext
object and I don't see anything that would give me this information that I could use to incorporate into the hash
But I might be missing something - is it possible to support this functionality?
t

Tim-Oliver

03/10/2023, 8:18 AM
I think
b
is not re-run because the result of task
a
is always
1
. So it does not matter if
a
is re-run. The input to
b
stays the same, which results in the same
cache_key
which keeps
b
from being executed again.
z

Zach Jablons

03/10/2023, 3:19 PM
Yeah, I realize that - I was wondering if there was a way to force
b
to be re-run based on
a
being re-run, in case e.g.
a
produces side effects that are not part of
b
It's not a big deal, although I haven't tried to run large objects through the caching hash - my other question is more important to me