Valerio
03/11/2025, 12:48 PM@task(cache_policy=INPUTS+TASK_SOURCE)
, but this way if the source of any of the functions called inside the task changed the cached result is used instead of rerunning the task. How can I solve this?Marvin
03/11/2025, 12:48 PMpython
from prefect import task, TaskRunContext
def custom_cache_key(context: TaskRunContext, inputs: dict):
# Compute a unique hash based on inputs and source codes
task_source_hash = hash(context.task.fn.__code__.co_code)
utility_function_hash = hash(your_utility_function.__code__.co_code)
return (task_source_hash, utility_function_hash, hash(tuple(inputs.items())))
@task(cache_key_fn=custom_cache_key)
def your_task(input_1, input_2):
# Call your utility functions
result = your_utility_function(input_1) + another_utility_function(input_2)
return result
In this example, custom_cache_key
generates a cache key that includes hashes of the source code of the task and the utility functions, as well as the task inputs. Adjust the function to include any other utility functions you use.
For further reading on cache policies and custom cache key functions, you may find the Prefect documentation on task caching helpful.