<@ULVA73B9P> How do use my task decorator to set t...
# ask-marvin
s
@Marvin How do use my task decorator to set the cache to only be valid during the specific run/run_id? I want cache on certain task to be valid as long as I run re-try, but once completed, and a new run starts, I want to start fresh with no cache on those tasks.
m
thought for 285.8 seconds
Great question — you can scope a task’s cache to a single flow run in Prefect 3.x by using a cache policy that includes the current flow’s run_id. This makes the cache valid across retries of that run, but it won’t be used by the next run. Recommended setup: - Use
RunId
to scope to the current flow run - Combine it with the default policy (inputs + task source) so different inputs within the same run don’t collide Example:
Copy code
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, DEFAULT

@task(cache_policy=CompoundCachePolicy([RunId(), DEFAULT]))
def my_task(x):
    # do work
    return x * 2
Notes: - This keeps cached results available for retries in the same flow run, but a new flow run won’t hit the cache because it has a different
run_id
. - You can optionally set
cache_expiration
if you want the cache to expire within the run; otherwise it’s valid for the life of that run. - No special
persist_result
setting is required for this behavior; the cache key isolation does the heavy lifting. Docs for reference: - Prefect 3 caching guide - Cache policies API reference Does per-flow-run scoping meet your needs, or do you want the cache to be even more restrictive (e.g., per task run only)?
s
@Marvin That's not quite right, it yields this:
Copy code
TypeError: Result storage must be one of the following types: 'UUID', 'Block', 'str'. Got unsupported type 'list'.
02:56:25 PM
prefect.flow_runs
Finished in state Failed("Flow run encountered an exception: TypeError: Result storage must be one of the following types: 'UUID', 'Block', 'str'. Got unsupported type 'list'.")
m
thought for 30.4 seconds
Thanks for the error detail — that helps. The issue is just how
CompoundCachePolicy
was constructed. In Prefect 3.x, the first positional argument to
CompoundCachePolicy
is
key_storage
, so passing a list positionally is interpreted as a filesystem/key storage and leads to the “unsupported type 'list'” error you saw. You just need to pass the list of policies by keyword. Use one of these corrected patterns: - Scope cache to the current flow run AND to task inputs (recommended)
Copy code
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, Inputs

@task(cache_policy=CompoundCachePolicy(policies=[RunId(), Inputs()]))
def my_task(x):
    return x * 2
- Or combine run scoping with Prefect’s default policy (inputs + task source)
Copy code
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, DEFAULT

@task(cache_policy=CompoundCachePolicy(policies=[RunId(), DEFAULT]))
def my_task(x):
    return x * 2
Either way, the cache is valid across retries within the same flow run, and a new run starts fresh because the
run_id
changes. Alternative (custom key function) if you prefer:
Copy code
from prefect import task
from prefect.context import get_run_context
import hashlib, json

def run_scoped_key(ctx, inputs):
    run_id = ctx.flow_run.id
    key = hashlib.sha256(json.dumps(inputs, sort_keys=True, default=str).encode()).hexdigest()
    return f"{run_id}:{key}"

@task(cache_key_fn=run_scoped_key)
def my_task(x):
    return x * 2
References: - Prefect 3 caching guide - Cache policies API reference If you share your exact decorator snippet, I’m happy to sanity check it.
s
@Marvin I removed task, and did this: def cache_key_fn_with_run_id(task, args, kwargs): # Grab the current flow run ID run_id = FlowRunContext.get().flow_run.id # Make the key unique per run + task + inputs return f"{run_id}-{hash(str(args) + str(kwargs))}" #@task(log_prints=True, persist_result=True, cache_policy=DEFAULT) #CompoundCachePolicy(policies=[RunId(), Inputs()])) #@task(log_prints=True, persist_result=True, cache_expiration=timedelta(days=1), cache_policy=CompoundCachePolicy(policies=[RunId()])) @task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id) def csvs_parallel_task(): """Prefect wrapper that calls the dlt pipeline""" #return pipeline_csvs_cust_and_orgs.main() return pipeline_csvs_parallel.main()
m
thought for 48.1 seconds
You’re super close — two quick fixes for Prefect 3.x: 1) cache_key_fn signature In Prefect 3.x the
cache_key_fn
is called with: - ctx: TaskRunContext - inputs: dict of bound parameters Your current
(task, args, kwargs)
signature and
FlowRunContext.get()
are from 2.x and will not work in 3.x. 2) Make the key deterministic Using Python’s built-in
hash()
and
str()
can produce non-deterministic values across processes. Use a stable hash (e.g., sha256) over a canonicalized JSON of inputs. Drop-in replacement:
Copy code
import hashlib, json
from prefect import task
from prefect.context import get_run_context  # optional if you use the ctx arg

def cache_key_fn_with_run_id(ctx, inputs):
    # ctx is a TaskRunContext; it includes the flow_run
    run_id = ctx.flow_run.id
    # Deterministic hash of inputs
    key = hashlib.sha256(
        json.dumps(inputs, sort_keys=True, default=str).encode()
    ).hexdigest()
    return f"{run_id}:{key}"

@task(log_prints=True, persist_result=True, cache_key_fn=cache_key_fn_with_run_id)
def csvs_parallel_task():
    return pipeline_csvs_parallel.main()
This makes the cache valid across retries within the same flow run, but never reused by a new run because the
run_id
is different. Alternative (if you want to use policies instead of a custom function), remember to pass the policies list by keyword:
Copy code
from prefect import task
from prefect.cache_policies import CompoundCachePolicy, RunId, Inputs

@task(cache_policy=CompoundCachePolicy(policies=[RunId(), Inputs()]))
def csvs_parallel_task():
    return pipeline_csvs_parallel.main()
Notes: -
persist_result=True
controls result persistence, not cache scope. With the run-scoped key, results won’t be reused across runs anyway. If you don’t need persisted results, you can set
persist_result=False
. - If you want to sanity-check, log the computed key in your function to confirm it includes the current
run_id
. Docs: - Prefect 3 caching guide - Cache policies API reference Want me to tweak this to include/exclude specific parameters from the cache key?