Jonah Duffin
08/12/2025, 8:44 PMMarvin
08/12/2025, 8:44 PMMarvin
08/12/2025, 8:47 PMtask_input_hash
) and tell it to persist results. Any worker that can reach those stores will skip identical work.
In Prefect 3.x:
- Use cache_key_fn=task_input_hash
so identical args produce the same key.
- Set cache_policy=CachePolicy(key_storage=...)
where key_storage
is a shared filesystem (S3/GCS/etc.).
- Set persist_result=True
and result_storage=...
to a shared filesystem so other workers can load the result.
Example (S3), works across any number of workers/infra as long as they can reach the bucket:
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.cache_policies import CachePolicy
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import CompressedPickleSerializer
# Shared locations (adjust to your cloud/filesystem of choice)
key_store = RemoteFileSystem(basepath="<s3://my-bucket/prefect/cache-keys>")
result_store = RemoteFileSystem(basepath="<s3://my-bucket/prefect/results>")
serializer = CompressedPickleSerializer() # robust for arbitrary Python objects
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=7),
cache_policy=CachePolicy(key_storage=key_store),
persist_result=True,
result_storage=result_store,
result_serializer=serializer,
)
def expensive(x: int, y: int) -> int:
print("doing actual work...")
return x + y
@flow
def main():
a = expensive(3, 4)
b = expensive(3, 4) # will be skipped if the result is already cached (even on a different worker)
return a, b
Notes and gotchas:
- Workers must share credentials and network access to the same key_storage
and result_storage
target.
- RemoteFileSystem
supports fsspec URLs (s3://, gcs://, etc.). Install the right extra and set credentials in the environment (e.g., pip install 'prefect[aws]'
).
- Use a consistent serializer across environments; CompressedPickleSerializer
is the least fussy.
- Local filesystems won’t share across machines unless they point to a shared mount.
- You can tighten concurrency around the cache with `CachePolicy(isolation_level="SERIALIZABLE")`; add a lock manager if you expect stampedes.
- Set cache_expiration
to control TTL. Set refresh_cache=True
on the task to force recompute.
If you tell me your storage of choice (S3, GCS, Azure, NFS), I’ll hand you a drop-in snippet for it. I live to serve. Well, “live” is a strong word.Jonah Duffin
08/12/2025, 8:50 PMMarvin
08/12/2025, 8:50 PMMarvin
08/12/2025, 8:51 PMcache_policy=CachePolicy(key_storage=...)
), and
- the result store (persist_result=True
, result_storage=...
)
at a shared location (S3/GCS/Azure/NFS, etc.).
If you absolutely must centralize in a database, your option is to implement a custom WritableFileSystem
that talks to your DB and pass that to CachePolicy(key_storage=...)
(and a matching result storage). Not exactly a lazy Sunday project, but it’s supported by the interfaces.
If you’re on Prefect 2.x, server-side caching existed; in 3.x it was intentionally moved out of the API/database for scalability and control.