<@ULVA73B9P> I want to use Prefect caching but I w...
# ask-marvin
j
@Marvin I want to use Prefect caching but I want to share the cache key across any number of execution environments, so that if a task on another compute runs with the same parameters as a task that has run before it, it'll be skipped and cached. How can I do this?
m
thought for 173.5 seconds
Short answer: use a shared cache key store and a shared result store. Then give your task a deterministic cache key (e.g.,
task_input_hash
) and tell it to persist results. Any worker that can reach those stores will skip identical work. In Prefect 3.x: - Use
cache_key_fn=task_input_hash
so identical args produce the same key. - Set
cache_policy=CachePolicy(key_storage=...)
where
key_storage
is a shared filesystem (S3/GCS/etc.). - Set
persist_result=True
and
result_storage=...
to a shared filesystem so other workers can load the result. Example (S3), works across any number of workers/infra as long as they can reach the bucket:
Copy code
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.cache_policies import CachePolicy
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import CompressedPickleSerializer

# Shared locations (adjust to your cloud/filesystem of choice)
key_store = RemoteFileSystem(basepath="<s3://my-bucket/prefect/cache-keys>")
result_store = RemoteFileSystem(basepath="<s3://my-bucket/prefect/results>")

serializer = CompressedPickleSerializer()  # robust for arbitrary Python objects

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(days=7),
    cache_policy=CachePolicy(key_storage=key_store),
    persist_result=True,
    result_storage=result_store,
    result_serializer=serializer,
)
def expensive(x: int, y: int) -> int:
    print("doing actual work...")
    return x + y

@flow
def main():
    a = expensive(3, 4)
    b = expensive(3, 4)  # will be skipped if the result is already cached (even on a different worker)
    return a, b
Notes and gotchas: - Workers must share credentials and network access to the same
key_storage
and
result_storage
target. -
RemoteFileSystem
supports fsspec URLs (s3://, gcs://, etc.). Install the right extra and set credentials in the environment (e.g.,
pip install 'prefect[aws]'
). - Use a consistent serializer across environments;
CompressedPickleSerializer
is the least fussy. - Local filesystems won’t share across machines unless they point to a shared mount. - You can tighten concurrency around the cache with `CachePolicy(isolation_level="SERIALIZABLE")`; add a lock manager if you expect stampedes. - Set
cache_expiration
to control TTL. Set
refresh_cache=True
on the task to force recompute. If you tell me your storage of choice (S3, GCS, Azure, NFS), I’ll hand you a drop-in snippet for it. I live to serve. Well, “live” is a strong word.
j
@Marvin Can I store the cache key on the Prefect Server or database itself, rather than a separate RemoteFileStorage that I configure?
m
thought for 38.3 seconds
Short answer: no. In Prefect 3.x, cache keys aren’t stored in the Prefect Server/Cloud database. Caching is entirely engine-side and backed by a filesystem-like store you provide. For cross-environment cache hits, you must point both: - the cache key store (
cache_policy=CachePolicy(key_storage=...)
), and - the result store (
persist_result=True
,
result_storage=...
) at a shared location (S3/GCS/Azure/NFS, etc.). If you absolutely must centralize in a database, your option is to implement a custom
WritableFileSystem
that talks to your DB and pass that to
CachePolicy(key_storage=...)
(and a matching result storage). Not exactly a lazy Sunday project, but it’s supported by the interfaces. If you’re on Prefect 2.x, server-side caching existed; in 3.x it was intentionally moved out of the API/database for scalability and control.