Owen Boyd
08/12/2025, 12:17 AMcache_key_fn
2. Disable caching by passing cache_policy=NO_CACHE
Marvin
08/12/2025, 12:17 AMMarvin
08/12/2025, 12:24 AMfrom datetime import timedelta
from pathlib import Path
from prefect import task
from prefect.cache_policies import CacheKeyFnPolicy
from prefect.utilities.hashing import hash_objects
def _canon(v):
try:
from cloudpathlib import S3Path
except Exception:
S3Path = ()
if isinstance(v, S3Path):
return v.as_uri() # or str(v)
if isinstance(v, Path):
return v.as_posix()
if isinstance(v, (list, tuple)):
return tuple(_canon(x) for x in v)
if isinstance(v, dict):
return {k: _canon(v) for k, v in sorted(v.items())}
return v
def s3_cache_key(ctx, params):
canon = {k: _canon(v) for k, v in params.items()}
# include task identity to avoid collisions across tasks/versions
prefix = f"{ctx.task.name}:{ctx.task.version or 'v0'}"
return hash_objects([prefix, canon])
policy = CacheKeyFnPolicy(cache_key_fn=s3_cache_key)
@task(cache_policy=policy, cache_expiration=timedelta(days=1), persist_result=True)
def process_s3_path(s3_path, ...):
...
2) Add a lock to prevent in-flight duplicates (recommended if the same path can be worked on concurrently)
- Prefect 3 tasks can acquire a lock via the task’s cache policy. Use a shared filesystem lock so only one attempt for a given cache key runs at a time; others will wait and then reuse the cached result once the first commits.
from pathlib import Path
from prefect.cache_policies import CacheKeyFnPolicy
from prefect.locking.filesystem import FileSystemLockManager
lock_dir = Path("/shared/prefect-locks") # must be shared across workers
policy = CacheKeyFnPolicy(
cache_key_fn=s3_cache_key,
lock_manager=FileSystemLockManager(lock_dir),
isolation_level="SERIALIZABLE", # strongest isolation to avoid overlap
)
@task(cache_policy=policy, persist_result=True)
def process_s3_path(s3_path, ...):
...
Notes:
- All workers must see the same lock_dir (NFS/PVC, etc.). MemoryLockManager only protects within a single process, which is about as helpful as a chocolate teapot for distributed runs.
3) Use Prefect’s server-side concurrency limits (if you can serialize globally or by known groups)
- Concurrency limits gate runs before they start. They’re tag-based and managed by the server.
- Create a limit and tag your task:
prefect concurrency-limit create s3-writes --limit 1
from prefect import task
@task(tags=["s3-writes"])
def process_s3_path(...):
...
- This serializes all tagged tasks. If you need per-path serialization, you’d need per-path limits, which is only practical if the set of paths is small/known.
4) Programmatic concurrency slots (advanced)
- You can also acquire a named slot in code:
from prefect.concurrency.sync import concurrency
@task
def process_s3_path(path: str, ...):
name = f"s3:{path}"
with concurrency(names=name, timeout_seconds=600, strict=True):
...
- You must pre-create a matching concurrency limit for each name. Great for a handful of hot resources; not great for arbitrary keys.
What will actually solve your issue
- Yes, define a custom cache_key_fn so the key includes a canonical string for the S3 path. That fixes the hash error and enables cache reuse.Marvin
08/12/2025, 12:24 AM