Steff
09/29/2025, 6:19 AM@task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id)
def csvs_parallel_task():
return pipeline_csvs_parallel.main()
@task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id)
def csvs_sequential_task():
return pipeline_csvs_sequential.main()
@flow(log_prints=True, persist_result=True)
def extract_csvs():
logger = get_run_logger()
futures = [
csvs_parallel_task.submit(),
csvs_sequential_task.submit()
]
for f in futures:
f.result()
logger.info("All extract successful!")
@flow(log_prints=True)
def run_flow():
extract_csvs()
#extract_apis()
#transform_all()
#load_analytics()
if __name__ == "__main__":
run_flow()
These tasks never get the tagged "Cached" in Prefect, and when one failes (like the dlt "pipeline_csvs_sequential"), then the other pipeline that succeeded is not cached, and hence re-run every time I do ReTry on the flow in Prefect.Marvin
09/29/2025, 6:19 AMMarvin
09/29/2025, 6:24 AMcache_key_fn
is the culprit. If it includes the run id (e.g., cache_key_fn_with_run_id
), every flow retry creates a brand-new flow run and new task run ids — which guarantees a different cache key each time. That prevents any cache hit across runs, so you’ll never see “Cached” in the UI and successful tasks will re-run on flow retries.
What to change
- Use a stable cache key that does NOT include the run id:
- If your task inputs determine the result, use the built-in task_input_hash
.
- If your tasks have no inputs and you just want to reuse the last success, return a constant key per task (and optionally version it).
- Optionally add a cache_expiration
so you don’t reuse results forever.
- Ensure results are persisted to durable storage if you want cache to work across different machines/containers.
Example fix
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
# Optional: shared storage if running across machines/containers
# from prefect.filesystems import LocalFileSystem
# shared_fs = LocalFileSystem(basepath="/mnt/prefect-results")
@task(
log_prints=True,
persist_result=True,
cache_key_fn=task_input_hash, # <- stable key based on inputs
cache_expiration=timedelta(days=1),
# result_storage=shared_fs, # <- enable if needed
)
def csvs_parallel_task():
return pipeline_csvs_parallel.main()
@task(
log_prints=True,
persist_result=True,
cache_key_fn=task_input_hash, # or: cache_key_fn=lambda *_: "csvs_sequential_v1"
cache_expiration=timedelta(days=1),
# result_storage=shared_fs,
)
def csvs_sequential_task():
return pipeline_csvs_sequential.main()
@flow(log_prints=True, persist_result=True)
def extract_csvs():
logger = get_run_logger()
futures = [csvs_parallel_task.submit(), csvs_sequential_task.submit()]
for f in futures:
f.result()
<http://logger.info|logger.info>("All extract successful!")
@flow(log_prints=True)
def run_flow():
extract_csvs()
if __name__ == "__main__":
run_flow()
Notes
- Clicking “Retry” on a flow in the UI creates a new flow run. Without a stable cache key and persisted results, Prefect will re-run all tasks. With a stable key, the successful task should show “Cached” on the next run.
- If your tasks have no parameters, task_input_hash
will produce the same key for that task’s inputs; Prefect also scopes caches by task source so different tasks don’t collide.
- For cross-machine/container retries, configure result_storage
(e.g., S3/GCS/Azure/Local on a shared volume) so Prefect can read previously persisted results.
Docs
- Results & caching (overview)
- Task caching details
A couple quick questions to make sure this will work for you:
- Are you intentionally including the run id in the cache key? If not, swapping to task_input_hash
or a constant per task should solve it.
- Are your flow retries running on the same machine/container, or do you need durable shared result storage?Steff
09/29/2025, 6:42 AMMarvin
09/29/2025, 6:42 AMMarvin
09/29/2025, 6:44 AMtask_input_hash
. Retries of the same scheduled run reuse the same input → cache hit; the next scheduled run has a different input → fresh run.
from datetime import timedelta
import pendulum
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def csvs_parallel_task(run_key: str):
return pipeline_csvs_parallel.main()
@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def csvs_sequential_task(run_key: str):
return pipeline_csvs_sequential.main()
@flow(log_prints=True, persist_result=True)
def extract_csvs(run_key: str | None = None):
logger = get_run_logger()
# e.g., scope cache to the day; use hour if you need per-hour runs
run_key = run_key or pendulum.now("UTC").to_date_string()
futures = [
csvs_parallel_task.submit(run_key),
csvs_sequential_task.submit(run_key),
]
for f in futures:
f.result()
<http://logger.info|logger.info>("All extract successful!")
- If your deployment already passes a parameter (like a partition date), use that as run_key
.
Option B: custom cache key that includes a time bucket (no extra task arg)
- Combine task_input_hash
with the flow’s scheduled date (or hour) inside a custom key function so each schedule window is its own cache:
import pendulum
from prefect.tasks import task_input_hash
from prefect import task
def cache_key_by_day(ctx, inputs):
base = task_input_hash(ctx, inputs)
# use expected_start_time if available; fallback to "today"
dt = (ctx.flow_run.expected_start_time or pendulum.now("UTC")).date().isoformat()
return f"{base}:{dt}"
@task(persist_result=True, cache_key_fn=cache_key_by_day)
def csvs_parallel_task():
return pipeline_csvs_parallel.main()
Option C: time-based expiration
- Set cache_expiration
to be less than your schedule interval. Retries shortly after failure still hit cache; the next scheduled run occurs after expiration, so it recomputes.
@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
Option D: force a refresh on demand
- When you want to bypass cache for a specific call:
csvs_parallel_task.with_options(refresh_cache=True).submit()
This does not require clearing storage; it just ignores the cache for that invocation and writes a fresh result.
On “flushing the cache” from code
- There isn’t a general “delete all cache entries” API you call at the end of a flow. The patterns above (partitioned keys, TTL, or refresh_cache
) are the supported ways to control cache usage.
- If you truly need to purge results storage, you could delete the underlying stored results yourself (e.g., remove object(s) from your S3/GCS/local path), but that’s typically unnecessary and more brittle.
Docs
- Task caching concepts
- Results and caching
If you share your schedule cadence (e.g., hourly/daily) and whether you already pass a date/partition param in your deployment, I can suggest the exact keying scheme to use.Steff
09/29/2025, 7:30 AMMarvin
09/29/2025, 7:30 AMMarvin
09/29/2025, 7:31 AMprefect.runtime.task_run.run_count
(or get_run_context().task_run.run_count
)
- In a FLOW (when using retries=
on the flow): use prefect.runtime.flow_run.run_count
(or get_run_context().flow_run.run_count
)
Examples
- Task-level:
from prefect import task, get_run_logger
from prefect.runtime import task_run
@task(retries=3)
def do_work():
logger = get_run_logger()
attempt = task_run.run_count # 1 on first try, 2/3/... on retries
is_retry = attempt > 1
<http://logger.info|logger.info>(f"Task attempt #{attempt} (retry? {is_retry})")
- Flow-level:
from prefect import flow, get_run_logger
from prefect.runtime import flow_run
@flow(retries=2)
def my_flow():
logger = get_run_logger()
attempt = flow_run.run_count # 1 on first try, 2/... on retries
<http://logger.info|logger.info>(f"Flow attempt #{attempt}")
...
- Using context (works in both):
from prefect.context import get_run_context
ctx = get_run_context()
attempt = (ctx.task_run.run_count if ctx.task_run else ctx.flow_run.run_count)
is_retry = attempt > 1
Important distinctions
- This detects automatic retries of the same run (attempts 2, 3, …).
- If you click “Retry” in the UI, Prefect creates a new flow run; that new run’s flow_run.run_count
will start at 1. There isn’t a built-in flag to tell “this new run came from a UI Retry” vs a fresh scheduled run. If you need to tag those, pass a parameter yourself when triggering runs programmatically, or use naming conventions.