Olve Heitmann
09/15/2025, 9:44 PM.save()
on your block before passing it in.Marvin
09/15/2025, 9:44 PMMarvin
09/15/2025, 9:47 PMLocalFileSystem
block instance directly to the @task
decorator. In Prefect 3.x, any block passed as result_storage
(and the cache key_storage
, too) must be persisted server-side first, then referenced by its saved document.
Why it fails during deploy: .from_source().deploy(...)
imports your flow module; when the decorator runs, Prefect validates result_storage
and rejects unsaved blocks.
Fix: save your blocks once, then load them in your helper
- Do a one-time setup to create and save your storage blocks (either via code, UI, or API). Don’t do this at import-time in the module that defines your flow.
Example one-time setup script:
from prefect.filesystems import LocalFileSystem
from brrr.orchestration import prefect_constants as pfc
LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/results").save("brrr-results")
LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/cache").save("brrr-cache")
- Then update your task_cache_kwargs
to load those saved blocks:
from pathlib import Path
from datetime import timedelta
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.filesystems import LocalFileSystem
from prefect.locking.filesystem import FileSystemLockManager
from brrr.orchestration import prefect_constants as pfc
def task_cache_kwargs(expiration: timedelta = timedelta(days=1)):
# Load blocks that were saved previously
results_fs = LocalFileSystem.load("brrr-results")
cache_fs = LocalFileSystem.load("brrr-cache")
# Lock manager does not need to be a block; just ensure this path is shared and writable
locks_dir = FileSystemLockManager(
lock_files_directory=Path(f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/locks")
)
return {
"cache_policy": (INPUTS + TASK_SOURCE).configure(
key_storage=cache_fs, # write cache index to shared volume
isolation_level="SERIALIZABLE", # recommended to avoid duplicate work
lock_manager=locks_dir,
),
"result_storage": results_fs,
"cache_expiration": expiration,
"persist_result": True,
}
- Use it in your task:
@task(tags=[pfc.borsdata_extract_tag, pfc.borsdata_scrape_tag], **task_cache_kwargs(timedelta(days=2)))
def my_task(...):
...
Notes and gotchas
- The directories behind your LocalFileSystem
blocks (results, cache, locks) must exist and be accessible wherever your tasks run (e.g., shared volume across workers/containers). If you’re running on multiple machines/containers, a remote/shared filesystem (e.g., S3/GCS via RemoteFileSystem
) is usually better.
- You do not need to persist the FileSystemLockManager
— just point it to a shared lock directory. Make sure all workers can write to it.
- In Prefect 3.x, blocks must be saved server-side before referencing them in code used by deployments. In 2.x you could sometimes pass in unsaved instances; 3.x enforces persistence.
Relevant docs
- Caching in Prefect 3: Caching
- Handling non-serializable objects in cache keys: Non-serializable inputs
- Blocks overview: Blocks
- Filesystem blocks API: prefect.filesystems
- Filesystem lock manager API: prefect.locking.filesystem
If you’d like, I can suggest a short setup script or UI steps to create those blocks exactly with your pfc.PREFECT_CACHE_RELATIVE_PATH
.Olve Heitmann
09/17/2025, 9:03 AMMarvin
09/17/2025, 9:03 AMMarvin
09/17/2025, 9:20 AMInputs(exclude=["param_name"])
- Relevant code/docs:
- Prefect 3 caching
- prefect/cache_policies.py (see Inputs)
- Hashing errors surface as HashError
in prefect/exceptions.py
About exceptions and caching
- Prefect caches only successful task results. Exceptions are not cached and will not be “replayed” from cache.
- If an exception occurs while computing a cache key (e.g., an unserializable input), Prefect will fail the task with a clear error indicating the hashing problem — typically advising you to exclude that argument from the cache key or make it serializable.
A small example
from prefect import flow, task
from prefect.cache_policies import TASK_SOURCE, INPUTS # or use Inputs(), TaskSource()
@task(cache_policy=TASK_SOURCE | INPUTS) # Combine policies with |
def sum_list(xs: list[int]) -> int:
return sum(xs)
@flow
def my_flow():
return sum_list([1, 2, 3])
if __name__ == "__main__":
my_flow() # First run computes
my_flow() # Subsequent run within expiry should reuse cache
If you need to exclude a problematic input:
from prefect.cache_policies import Inputs, TaskSource
@task(cache_policy=TaskSource() | Inputs(exclude=["client"]))
def do_work(client, xs: list[int]):
...
Troubleshooting checklist for your case
- Confirm the policy actually applied to the task is what you intend (e.g., TASK_SOURCE | INPUTS
).
- Ensure your task result is persisted so a subsequent run can read it. If the result isn’t persisted, Prefect can find the cache key but fail to read the value. By default in Prefect 3, tasks generally persist results; if you’ve disabled it, re-enable or configure result storage.
- Check your inputs: if your list contains custom objects or non-deterministic data, that can cause cache misses or hashing errors. Consider passing stable digests (e.g., checksums) or excluding those params.
- Avoid including RUN_ID
in your cache policy — that will force cache misses.
- Try setting a key_storage
on the policy so you can inspect what keys are being computed:
```Marvin
09/17/2025, 9:20 AMPREFECT_LOGGING_LEVEL=DEBUG
and rerun to inspect cache key computation and decisions in logs.
If you can share:
- your Prefect version,
- the task definition (decorator and cache policy), and
- an example of the inputs (esp. the list contents),
I can help pinpoint why the cache isn’t being reused within the expiry window.Olve Heitmann
09/17/2025, 9:56 AMMarvin
09/17/2025, 9:56 AMMarvin
09/17/2025, 9:58 AMOlve Heitmann
09/17/2025, 10:24 AMMarvin
09/17/2025, 10:24 AMMarvin
09/17/2025, 10:25 AMTaskSource() | Inputs()
(the default behavior), which disambiguates across both function code and inputs.
- If you truly need INPUTS-only, add a namespace to the inputs (e.g., a fixed string parameter included in the hash), or create a custom cache key function.
- Caches are tenant-wide (not per-flow/deployment), so think globally when choosing your policy.
Docs:
- Prefect 3 caching guideOlve Heitmann
09/17/2025, 10:27 AMMarvin
09/17/2025, 10:27 AMMarvin
09/17/2025, 10:28 AM@task(cache_policy=TaskSource() | Inputs())
def a(x, _ns="task_a"):
...
@task(cache_policy=TaskSource() | Inputs())
def c(x, _ns="task_c"):
...
Docs:
- Prefect 3 caching guide
- cache_policies.py