Kiran
07/06/2025, 8:11 PMMarvin
07/06/2025, 8:11 PMMarvin
07/06/2025, 8:11 PMcache_policy=DEFAULT
, you haven't provided a cache_key_fn
, which is essential for caching to work. All the cache key functions in your code are commented out. Without a cache key function, each task run is considered unique and won't use cached results.
Here's how to fix it:
python
from prefect import flow, task
from datetime import datetime
from prefect.cache_policies import DEFAULT
@task(
cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
cache_policy=DEFAULT,
persist_result=True,
retry_delay_seconds=60,
retries=3,
log_prints=True
)
def my_daily_task_1():
print("Executing task_1")
return "Daily task result_1, custom result"
@task(
cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
cache_policy=DEFAULT,
persist_result=True,
retry_delay_seconds=60,
retries=3,
log_prints=True
)
def my_daily_task_2():
print("Executing task_2")
return "Daily task result_2, custom result"
@flow(log_prints=True, persist_result=True)
def caching_test_flow():
task_1_result = my_daily_task_1()
print(f"task 1 result is {task_1_result}")
task_2_result = my_daily_task_2()
print(f"task 2 result is {task_2_result}")
A few important points about caching:
1. The cache_key_fn
is crucial - it determines when to use cached results. Tasks with the same cache key will reuse results.
2. The default cache policy (DEFAULT
) will cache results for 24 hours.
3. For retries specifically, caching behavior is different from regular task runs:
- If a task fails and retries, it will always re-execute (not use cache)
- Caching is meant for subsequent flow runs, not for retries within the same flow run
4. If you want to ensure tasks only run once per day regardless of retries, you might want to use a simpler cache key that only includes the date:
python
cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d")
Remember that caching and retries serve different purposes:
- Retries handle temporary failures within a single flow run
- Caching prevents redundant execution across different flow runs
If you're specifically trying to handle retries, the retries
and retry_delay_seconds
parameters you've set are correct, but caching won't affect retry behavior.Kiran
07/06/2025, 8:13 PMMarvin
07/06/2025, 8:13 PMMarvin
07/06/2025, 8:14 PMflow_run.name
. Since your commented-out cache keys include ctx.flow_run.name
, each retry would generate a new cache key and thus execute the task again.
2. For manual retries where you want to use cached results, you should use a cache key that depends only on stable values like the date or task name. Here's the modified code:
python
from prefect import flow, task
from datetime import datetime
from prefect.cache_policies import DEFAULT
@task(
# This cache key will remain stable across manual retries on the same day
cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
cache_policy=DEFAULT,
persist_result=True,
retry_delay_seconds=60,
retries=3,
log_prints=True
)
def my_daily_task_1():
print("Executing task_1")
return "Daily task result_1, custom result"
@task(
cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
cache_policy=DEFAULT,
persist_result=True,
retry_delay_seconds=60,
retries=3,
log_prints=True
)
def my_daily_task_2():
print("Executing task_2")
return "Daily task result_2, custom result"
@flow(log_prints=True, persist_result=True)
def caching_test_flow():
task_1_result = my_daily_task_1()
print(f"task 1 result is {task_1_result}")
task_2_result = my_daily_task_2()
print(f"task 2 result is {task_2_result}")
The key changes are:
1. Use a cache key that only includes the task name and date: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}"}
2. Remove flow_run.name
from the cache key since it changes with each retry
With these changes:
- Tasks will cache results for the entire day
- Manual retries will use cached results if they exist
- Different tasks (task_1 vs task_2) will still have different cache keys
- The cache will reset at midnight UTC
If you want even simpler caching that only depends on the date:
python
cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d")
This would make all tasks with this cache key share the same cache on a given day.
Remember that the DEFAULT
cache policy expires after 24 hours. If you need different expiration behavior, you can create a custom cache policy using prefect.cache_policies.cache_policy
.