<@ULVA73B9P> I'm trying to resolve an issue where ...
# ask-marvin
o
@Marvin I'm trying to resolve an issue where we are unable to hash s3 paths in our code for cache keys, which appears to cause us to retry the processing on the s3 path. This can be a problem if the retry attempts to persist its result before the original attempt is able to persist its data back to s3 because the retry sees the file dirty locally from the original, and crashes to avoid overwriting the local file. I don't want to enable an overwrite here, but I'd like to prevent the retry from occurring while the first attempt is still in progress. what are my options? will defining a custom cache key function fix this? Here is the full error: Error encountered when computing cache key - result will not be persisted. Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/prefect/cache_policies.py", line 374, in compute_key return hash_objects(hashed_inputs, raise_on_failure=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/prefect/utilities/hashing.py", line 89, in hash_objects raise HashError(msg) prefect.exceptions.HashError: Unable to create hash - objects could not be serialized. JSON error: Unable to serialize unknown type: <class 'cloudpathlib.s3.s3path.S3Path'> Pickle error: cannot pickle 'BufferedWriter' instances The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 170, in compute_transaction_key key = self.task.cache_policy.compute_key( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/prefect/cache_policies.py", line 214, in compute_key policy_key = policy.compute_key( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/prefect/cache_policies.py", line 384, in compute_key raise ValueError(msg) from exc ValueError: Unable to create hash - objects could not be serialized. JSON error: Unable to serialize unknown type: <class 'cloudpathlib.s3.s3path.S3Path'> Pickle error: cannot pickle 'BufferedWriter' instances This often occurs when task inputs contain objects that cannot be cached like locks, file handles, or other system resources. To resolve this, you can: 1. Exclude these arguments by defining a custom
cache_key_fn
2. Disable caching by passing
cache_policy=NO_CACHE
m
thought for 409.8 seconds
Short version: the hash error is just disabling caching; it’s not what triggers the retry. A custom cache_key_fn will fix the hashing/persistence problem, but it won’t, by itself, stop two attempts from overlapping in-flight. To prevent overlap, combine a custom cache key with a lock or a concurrency limit. Your options in Prefect 3.x: 1) Define a custom cache key (fix your S3Path hashing) - Map unhashable inputs (cloudpathlib.S3Path, pathlib.Path) to stable strings. - This restores result persistence and lets later attempts reuse the cache. - Example:
Copy code
from datetime import timedelta
from pathlib import Path
from prefect import task
from prefect.cache_policies import CacheKeyFnPolicy
from prefect.utilities.hashing import hash_objects

def _canon(v):
    try:
        from cloudpathlib import S3Path
    except Exception:
        S3Path = ()
    if isinstance(v, S3Path):
        return v.as_uri()  # or str(v)
    if isinstance(v, Path):
        return v.as_posix()
    if isinstance(v, (list, tuple)):
        return tuple(_canon(x) for x in v)
    if isinstance(v, dict):
        return {k: _canon(v) for k, v in sorted(v.items())}
    return v

def s3_cache_key(ctx, params):
    canon = {k: _canon(v) for k, v in params.items()}
    # include task identity to avoid collisions across tasks/versions
    prefix = f"{ctx.task.name}:{ctx.task.version or 'v0'}"
    return hash_objects([prefix, canon])

policy = CacheKeyFnPolicy(cache_key_fn=s3_cache_key)

@task(cache_policy=policy, cache_expiration=timedelta(days=1), persist_result=True)
def process_s3_path(s3_path, ...):
    ...
2) Add a lock to prevent in-flight duplicates (recommended if the same path can be worked on concurrently) - Prefect 3 tasks can acquire a lock via the task’s cache policy. Use a shared filesystem lock so only one attempt for a given cache key runs at a time; others will wait and then reuse the cached result once the first commits.
Copy code
from pathlib import Path
from prefect.cache_policies import CacheKeyFnPolicy
from prefect.locking.filesystem import FileSystemLockManager

lock_dir = Path("/shared/prefect-locks")  # must be shared across workers
policy = CacheKeyFnPolicy(
    cache_key_fn=s3_cache_key,
    lock_manager=FileSystemLockManager(lock_dir),
    isolation_level="SERIALIZABLE",  # strongest isolation to avoid overlap
)

@task(cache_policy=policy, persist_result=True)
def process_s3_path(s3_path, ...):
    ...
Notes: - All workers must see the same lock_dir (NFS/PVC, etc.). MemoryLockManager only protects within a single process, which is about as helpful as a chocolate teapot for distributed runs. 3) Use Prefect’s server-side concurrency limits (if you can serialize globally or by known groups) - Concurrency limits gate runs before they start. They’re tag-based and managed by the server. - Create a limit and tag your task:
Copy code
prefect concurrency-limit create s3-writes --limit 1
Copy code
from prefect import task

@task(tags=["s3-writes"])
def process_s3_path(...):
    ...
- This serializes all tagged tasks. If you need per-path serialization, you’d need per-path limits, which is only practical if the set of paths is small/known. 4) Programmatic concurrency slots (advanced) - You can also acquire a named slot in code:
Copy code
from prefect.concurrency.sync import concurrency

@task
def process_s3_path(path: str, ...):
    name = f"s3:{path}"
    with concurrency(names=name, timeout_seconds=600, strict=True):
        ...
- You must pre-create a matching concurrency limit for each name. Great for a handful of hot resources; not great for arbitrary keys. What will actually solve your issue - Yes, define a custom cache_key_fn so the key includes a canonical string for the S3 path. That fixes the hash error and enables cache reuse.
- To prevent overlap while the first attempt is still running, add a lock via CacheKeyFnPolicy(lock_manager=...) with a shared FileSystemLockManager and SERIALIZABLE isolation. That gives you in-flight dedup: second attempt waits, then finds the cached result instead of racing and tripping over a “dirty” local file. - If you can’t share a filesystem across workers, fall back to server-side concurrency limits to serialize work at a coarser grain (e.g., one writer at a time). If you share a bit more about your runtime (single node vs K8s, shared volumes available, Cloud vs OSS), I can suggest the exact wiring for your environment. I know, thrilling.