<@ULVA73B9P> I'm struggling getting my tasks cache...
# ask-marvin
s
@Marvin I'm struggling getting my tasks cached - ever. This is my code:
Copy code
@task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id)
def csvs_parallel_task():
    return pipeline_csvs_parallel.main()

@task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id)
def csvs_sequential_task():
    return pipeline_csvs_sequential.main()

@flow(log_prints=True, persist_result=True)
def extract_csvs():
    logger = get_run_logger()

    futures = [
        csvs_parallel_task.submit(),
        csvs_sequential_task.submit()
    ]
    
    for f in futures:
        f.result()

    logger.info("All extract successful!")

@flow(log_prints=True)
def run_flow():
    extract_csvs()
    #extract_apis()
    #transform_all()
    #load_analytics()

if __name__ == "__main__":
    run_flow()
These tasks never get the tagged "Cached" in Prefect, and when one failes (like the dlt "pipeline_csvs_sequential"), then the other pipeline that succeeded is not cached, and hence re-run every time I do ReTry on the flow in Prefect.
m
thought for 264.0 seconds
Thanks for sharing the snippet — I see what’s going on. Short answer: your
cache_key_fn
is the culprit. If it includes the run id (e.g.,
cache_key_fn_with_run_id
), every flow retry creates a brand-new flow run and new task run ids — which guarantees a different cache key each time. That prevents any cache hit across runs, so you’ll never see “Cached” in the UI and successful tasks will re-run on flow retries. What to change - Use a stable cache key that does NOT include the run id: - If your task inputs determine the result, use the built-in
task_input_hash
. - If your tasks have no inputs and you just want to reuse the last success, return a constant key per task (and optionally version it). - Optionally add a
cache_expiration
so you don’t reuse results forever. - Ensure results are persisted to durable storage if you want cache to work across different machines/containers. Example fix
Copy code
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
# Optional: shared storage if running across machines/containers
# from prefect.filesystems import LocalFileSystem
# shared_fs = LocalFileSystem(basepath="/mnt/prefect-results")

@task(
    log_prints=True,
    persist_result=True,
    cache_key_fn=task_input_hash,              # <- stable key based on inputs
    cache_expiration=timedelta(days=1),
    # result_storage=shared_fs,                # <- enable if needed
)
def csvs_parallel_task():
    return pipeline_csvs_parallel.main()

@task(
    log_prints=True,
    persist_result=True,
    cache_key_fn=task_input_hash,              # or: cache_key_fn=lambda *_: "csvs_sequential_v1"
    cache_expiration=timedelta(days=1),
    # result_storage=shared_fs,
)
def csvs_sequential_task():
    return pipeline_csvs_sequential.main()

@flow(log_prints=True, persist_result=True)
def extract_csvs():
    logger = get_run_logger()
    futures = [csvs_parallel_task.submit(), csvs_sequential_task.submit()]
    for f in futures:
        f.result()
    <http://logger.info|logger.info>("All extract successful!")

@flow(log_prints=True)
def run_flow():
    extract_csvs()

if __name__ == "__main__":
    run_flow()
Notes - Clicking “Retry” on a flow in the UI creates a new flow run. Without a stable cache key and persisted results, Prefect will re-run all tasks. With a stable key, the successful task should show “Cached” on the next run. - If your tasks have no parameters,
task_input_hash
will produce the same key for that task’s inputs; Prefect also scopes caches by task source so different tasks don’t collide. - For cross-machine/container retries, configure
result_storage
(e.g., S3/GCS/Azure/Local on a shared volume) so Prefect can read previously persisted results. Docs - Results &amp; caching (overview) - Task caching details A couple quick questions to make sure this will work for you: - Are you intentionally including the run id in the cache key? If not, swapping to
task_input_hash
or a constant per task should solve it. - Are your flow retries running on the same machine/container, or do you need durable shared result storage?
s
@Marvin Ok, that fixed it. However, the next time the main flow runs (from Deployment), the tasks are still Cached. I need them to re-run every new run, not just on retries. Can I flush the cache for these flows/tasks manually in code, when completed somehow? Or what do you recommend?
m
thought for 135.7 seconds
Got it — you want cache hits for retries, but fresh runs for new scheduled executions. You have a few good options; pick the one that best fits your schedule cadence and needs: Option A (recommended): add a run “partition” input so each scheduled run gets a new cache key - Pass a logical run key (e.g., a date or batch id) into your tasks and keep
task_input_hash
. Retries of the same scheduled run reuse the same input → cache hit; the next scheduled run has a different input → fresh run.
Copy code
from datetime import timedelta
import pendulum
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash

@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def csvs_parallel_task(run_key: str):
    return pipeline_csvs_parallel.main()

@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def csvs_sequential_task(run_key: str):
    return pipeline_csvs_sequential.main()

@flow(log_prints=True, persist_result=True)
def extract_csvs(run_key: str | None = None):
    logger = get_run_logger()
    # e.g., scope cache to the day; use hour if you need per-hour runs
    run_key = run_key or pendulum.now("UTC").to_date_string()

    futures = [
        csvs_parallel_task.submit(run_key),
        csvs_sequential_task.submit(run_key),
    ]
    for f in futures:
        f.result()
    <http://logger.info|logger.info>("All extract successful!")
- If your deployment already passes a parameter (like a partition date), use that as
run_key
. Option B: custom cache key that includes a time bucket (no extra task arg) - Combine
task_input_hash
with the flow’s scheduled date (or hour) inside a custom key function so each schedule window is its own cache:
Copy code
import pendulum
from prefect.tasks import task_input_hash
from prefect import task

def cache_key_by_day(ctx, inputs):
    base = task_input_hash(ctx, inputs)
    # use expected_start_time if available; fallback to "today"
    dt = (ctx.flow_run.expected_start_time or pendulum.now("UTC")).date().isoformat()
    return f"{base}:{dt}"

@task(persist_result=True, cache_key_fn=cache_key_by_day)
def csvs_parallel_task():
    return pipeline_csvs_parallel.main()
Option C: time-based expiration - Set
cache_expiration
to be less than your schedule interval. Retries shortly after failure still hit cache; the next scheduled run occurs after expiration, so it recomputes.
Copy code
@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
Option D: force a refresh on demand - When you want to bypass cache for a specific call:
Copy code
csvs_parallel_task.with_options(refresh_cache=True).submit()
This does not require clearing storage; it just ignores the cache for that invocation and writes a fresh result. On “flushing the cache” from code - There isn’t a general “delete all cache entries” API you call at the end of a flow. The patterns above (partitioned keys, TTL, or
refresh_cache
) are the supported ways to control cache usage. - If you truly need to purge results storage, you could delete the underlying stored results yourself (e.g., remove object(s) from your S3/GCS/local path), but that’s typically unnecessary and more brittle. Docs - Task caching concepts - Results and caching If you share your schedule cadence (e.g., hourly/daily) and whether you already pass a date/partition param in your deployment, I can suggest the exact keying scheme to use.
s
@Marvin Is there a way to detect if the run is a retry, in code?
m
thought for 49.4 seconds
Yes — you can detect automatic retry attempts in code via the run attempt counter. - In a TASK: use
prefect.runtime.task_run.run_count
(or
get_run_context().task_run.run_count
) - In a FLOW (when using
retries=
on the flow): use
prefect.runtime.flow_run.run_count
(or
get_run_context().flow_run.run_count
) Examples - Task-level:
Copy code
from prefect import task, get_run_logger
from prefect.runtime import task_run

@task(retries=3)
def do_work():
    logger = get_run_logger()
    attempt = task_run.run_count  # 1 on first try, 2/3/... on retries
    is_retry = attempt > 1
    <http://logger.info|logger.info>(f"Task attempt #{attempt} (retry? {is_retry})")
- Flow-level:
Copy code
from prefect import flow, get_run_logger
from prefect.runtime import flow_run

@flow(retries=2)
def my_flow():
    logger = get_run_logger()
    attempt = flow_run.run_count  # 1 on first try, 2/... on retries
    <http://logger.info|logger.info>(f"Flow attempt #{attempt}")
    ...
- Using context (works in both):
Copy code
from prefect.context import get_run_context

ctx = get_run_context()
attempt = (ctx.task_run.run_count if ctx.task_run else ctx.flow_run.run_count)
is_retry = attempt > 1
Important distinctions - This detects automatic retries of the same run (attempts 2, 3, …). - If you click “Retry” in the UI, Prefect creates a new flow run; that new run’s
flow_run.run_count
will start at 1. There isn’t a built-in flag to tell “this new run came from a UI Retry” vs a fresh scheduled run. If you need to tag those, pass a parameter yourself when triggering runs programmatically, or use naming conventions.