hey <@ULVA73B9P> from prefect import flow, task fr...
# ask-community
k
hey @Marvin from prefect import flow, task from datetime import datetime from prefect.cache_policies import DEFAULT @task( #cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d"), #cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", cache_policy=DEFAULT, persist_result=True, retry_delay_seconds=60, # Wait 60 seconds before retrying on failure retries=3, # Allow 3 retries if the task fails log_prints=True ) def my_daily_task_1(): print("Executing task_1") # Your task logic here #raise ValueError("custom value error") return "Daily task result_1, custom result" # except Exception as e: # print(f"Task_1 failed: {e}") @task( #cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d"), #cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", cache_policy=DEFAULT, persist_result=True, retry_delay_seconds=60, # Wait 60 seconds before retrying on failure retries=3, # Allow 3 retries if the task fails log_prints=True ) def my_daily_task_2(): print("Executing task_2") # Your task logic here #raise ValueError("custom value error") return "Daily task result_2, custom result" # except Exception as e: # print(f"Task_2 failed: {e}") # raise Value @flow(log_prints=True,persist_result=True) def caching_test_flow(): #result = my_daily_task_2() #print(f"Task result: {result}") task_1_result=my_daily_task_1() print(f"task 1 result is {task_1_result}") task_2_result=my_daily_task_2() print(f"task 2 result is {task_2_result}") why is the code running again insted of caching for the retry
m
thought for 12.5 seconds
Let me help you understand why your tasks aren't being cached as expected. The issue lies in how you're implementing the caching mechanism. While you've set
cache_policy=DEFAULT
, you haven't provided a
cache_key_fn
, which is essential for caching to work. All the cache key functions in your code are commented out. Without a cache key function, each task run is considered unique and won't use cached results. Here's how to fix it:
Copy code
python
from prefect import flow, task
from datetime import datetime
from prefect.cache_policies import DEFAULT

@task(
    cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
    cache_policy=DEFAULT,
    persist_result=True,
    retry_delay_seconds=60,
    retries=3,
    log_prints=True
)
def my_daily_task_1():
    print("Executing task_1")
    return "Daily task result_1, custom result"

@task(
    cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
    cache_policy=DEFAULT,
    persist_result=True,
    retry_delay_seconds=60,
    retries=3,
    log_prints=True
)
def my_daily_task_2():
    print("Executing task_2")
    return "Daily task result_2, custom result"

@flow(log_prints=True, persist_result=True)
def caching_test_flow():
    task_1_result = my_daily_task_1()
    print(f"task 1 result is {task_1_result}")
    task_2_result = my_daily_task_2()
    print(f"task 2 result is {task_2_result}")
A few important points about caching: 1. The
cache_key_fn
is crucial - it determines when to use cached results. Tasks with the same cache key will reuse results. 2. The default cache policy (
DEFAULT
) will cache results for 24 hours. 3. For retries specifically, caching behavior is different from regular task runs: - If a task fails and retries, it will always re-execute (not use cache) - Caching is meant for subsequent flow runs, not for retries within the same flow run 4. If you want to ensure tasks only run once per day regardless of retries, you might want to use a simpler cache key that only includes the date:
Copy code
python
cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d")
Remember that caching and retries serve different purposes: - Retries handle temporary failures within a single flow run - Caching prevents redundant execution across different flow runs If you're specifically trying to handle retries, the
retries
and
retry_delay_seconds
parameters you've set are correct, but caching won't affect retry behavior.
k
@Marvin I mean the manual retry done by me
m
thought for 12.5 seconds
Ah, now I understand - you're talking about manually retrying a flow run from the Prefect UI or CLI, and you're seeing the tasks execute again instead of using cached results. Let me explain why this happens and how to fix it: 1. When you manually retry a flow run, it creates a new flow run with a new
flow_run.name
. Since your commented-out cache keys include
ctx.flow_run.name
, each retry would generate a new cache key and thus execute the task again. 2. For manual retries where you want to use cached results, you should use a cache key that depends only on stable values like the date or task name. Here's the modified code:
Copy code
python
from prefect import flow, task
from datetime import datetime
from prefect.cache_policies import DEFAULT

@task(
    # This cache key will remain stable across manual retries on the same day
    cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
    cache_policy=DEFAULT,
    persist_result=True,
    retry_delay_seconds=60,
    retries=3,
    log_prints=True
)
def my_daily_task_1():
    print("Executing task_1")
    return "Daily task result_1, custom result"

@task(
    cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}",
    cache_policy=DEFAULT,
    persist_result=True,
    retry_delay_seconds=60,
    retries=3,
    log_prints=True
)
def my_daily_task_2():
    print("Executing task_2")
    return "Daily task result_2, custom result"

@flow(log_prints=True, persist_result=True)
def caching_test_flow():
    task_1_result = my_daily_task_1()
    print(f"task 1 result is {task_1_result}")
    task_2_result = my_daily_task_2()
    print(f"task 2 result is {task_2_result}")
The key changes are: 1. Use a cache key that only includes the task name and date:
f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}"}
2. Remove
flow_run.name
from the cache key since it changes with each retry With these changes: - Tasks will cache results for the entire day - Manual retries will use cached results if they exist - Different tasks (task_1 vs task_2) will still have different cache keys - The cache will reset at midnight UTC If you want even simpler caching that only depends on the date:
Copy code
python
cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d")
This would make all tasks with this cache key share the same cache on a given day. Remember that the
DEFAULT
cache policy expires after 24 hours. If you need different expiration behavior, you can create a custom cache policy using
prefect.cache_policies.cache_policy
.