Steff
09/26/2025, 12:40 PMMarvin
09/26/2025, 12:40 PMMarvin
09/26/2025, 12:45 PMRunId
to scope to the current flow run
- Combine it with the default policy (inputs + task source) so different inputs within the same run don’t collide
Example:
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, DEFAULT
@task(cache_policy=CompoundCachePolicy([RunId(), DEFAULT]))
def my_task(x):
# do work
return x * 2
Notes:
- This keeps cached results available for retries in the same flow run, but a new flow run won’t hit the cache because it has a different run_id
.
- You can optionally set cache_expiration
if you want the cache to expire within the run; otherwise it’s valid for the life of that run.
- No special persist_result
setting is required for this behavior; the cache key isolation does the heavy lifting.
Docs for reference:
- Prefect 3 caching guide
- Cache policies API reference
Does per-flow-run scoping meet your needs, or do you want the cache to be even more restrictive (e.g., per task run only)?Steff
09/26/2025, 1:04 PMTypeError: Result storage must be one of the following types: 'UUID', 'Block', 'str'. Got unsupported type 'list'.
02:56:25 PM
prefect.flow_runs
Finished in state Failed("Flow run encountered an exception: TypeError: Result storage must be one of the following types: 'UUID', 'Block', 'str'. Got unsupported type 'list'.")
Marvin
09/26/2025, 1:04 PMMarvin
09/26/2025, 1:05 PMCompoundCachePolicy
was constructed.
In Prefect 3.x, the first positional argument to CompoundCachePolicy
is key_storage
, so passing a list positionally is interpreted as a filesystem/key storage and leads to the “unsupported type 'list'” error you saw. You just need to pass the list of policies by keyword.
Use one of these corrected patterns:
- Scope cache to the current flow run AND to task inputs (recommended)
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, Inputs
@task(cache_policy=CompoundCachePolicy(policies=[RunId(), Inputs()]))
def my_task(x):
return x * 2
- Or combine run scoping with Prefect’s default policy (inputs + task source)
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, DEFAULT
@task(cache_policy=CompoundCachePolicy(policies=[RunId(), DEFAULT]))
def my_task(x):
return x * 2
Either way, the cache is valid across retries within the same flow run, and a new run starts fresh because the run_id
changes.
Alternative (custom key function) if you prefer:
from prefect import task
from prefect.context import get_run_context
import hashlib, json
def run_scoped_key(ctx, inputs):
run_id = ctx.flow_run.id
key = hashlib.sha256(json.dumps(inputs, sort_keys=True, default=str).encode()).hexdigest()
return f"{run_id}:{key}"
@task(cache_key_fn=run_scoped_key)
def my_task(x):
return x * 2
References:
- Prefect 3 caching guide
- Cache policies API reference
If you share your exact decorator snippet, I’m happy to sanity check it.Steff
09/26/2025, 1:28 PMMarvin
09/26/2025, 1:28 PMMarvin
09/26/2025, 1:29 PMcache_key_fn
is called with:
- ctx: TaskRunContext
- inputs: dict of bound parameters
Your current (task, args, kwargs)
signature and FlowRunContext.get()
are from 2.x and will not work in 3.x.
2) Make the key deterministic
Using Python’s built-in hash()
and str()
can produce non-deterministic values across processes. Use a stable hash (e.g., sha256) over a canonicalized JSON of inputs.
Drop-in replacement:
import hashlib, json
from prefect import task
from prefect.context import get_run_context # optional if you use the ctx arg
def cache_key_fn_with_run_id(ctx, inputs):
# ctx is a TaskRunContext; it includes the flow_run
run_id = ctx.flow_run.id
# Deterministic hash of inputs
key = hashlib.sha256(
json.dumps(inputs, sort_keys=True, default=str).encode()
).hexdigest()
return f"{run_id}:{key}"
@task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id)
def csvs_parallel_task():
return pipeline_csvs_parallel.main()
This makes the cache valid across retries within the same flow run, but never reused by a new run because the run_id
is different.
Alternative (if you want to use policies instead of a custom function), remember to pass the policies list by keyword:
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, Inputs
@task(cache_policy=CompoundCachePolicy(policies=[RunId(), Inputs()]))
def csvs_parallel_task():
return pipeline_csvs_parallel.main()
Notes:
- persist_result=True
controls result persistence, not cache scope. With the run-scoped key, results won’t be reused across runs anyway. If you don’t need persisted results, you can set persist_result=False
.
- If you want to sanity-check, log the computed key in your function to confirm it includes the current run_id
.
Docs:
- Prefect 3 caching guide
- Cache policies API reference
Want me to tweak this to include/exclude specific parameters from the cache key?