Olve Heitmann
09/15/2025, 9:44 PM.save()Marvin
09/15/2025, 9:44 PMMarvin
09/15/2025, 9:47 PMLocalFileSystem@taskresult_storagekey_storage.from_source().deploy(...)result_storagefrom prefect.filesystems import LocalFileSystem
from brrr.orchestration import prefect_constants as pfc
LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/results").save("brrr-results")
LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/cache").save("brrr-cache")task_cache_kwargsfrom pathlib import Path
from datetime import timedelta
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.filesystems import LocalFileSystem
from prefect.locking.filesystem import FileSystemLockManager
from brrr.orchestration import prefect_constants as pfc
def task_cache_kwargs(expiration: timedelta = timedelta(days=1)):
    # Load blocks that were saved previously
    results_fs = LocalFileSystem.load("brrr-results")
    cache_fs = LocalFileSystem.load("brrr-cache")
    # Lock manager does not need to be a block; just ensure this path is shared and writable
    locks_dir = FileSystemLockManager(
        lock_files_directory=Path(f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/locks")
    )
    return {
        "cache_policy": (INPUTS + TASK_SOURCE).configure(
            key_storage=cache_fs,             # write cache index to shared volume
            isolation_level="SERIALIZABLE",   # recommended to avoid duplicate work
            lock_manager=locks_dir,
        ),
        "result_storage": results_fs,
        "cache_expiration": expiration,
        "persist_result": True,
    }@task(tags=[pfc.borsdata_extract_tag, pfc.borsdata_scrape_tag], **task_cache_kwargs(timedelta(days=2)))
def my_task(...):
    ...LocalFileSystemRemoteFileSystemFileSystemLockManagerpfc.PREFECT_CACHE_RELATIVE_PATHOlve Heitmann
09/17/2025, 9:03 AMMarvin
09/17/2025, 9:03 AMMarvin
09/17/2025, 9:20 AMInputs(exclude=["param_name"])HashErrorfrom prefect import flow, task
from prefect.cache_policies import TASK_SOURCE, INPUTS  # or use Inputs(), TaskSource()
@task(cache_policy=TASK_SOURCE | INPUTS)  # Combine policies with |
def sum_list(xs: list[int]) -> int:
    return sum(xs)
@flow
def my_flow():
    return sum_list([1, 2, 3])
if __name__ == "__main__":
    my_flow()  # First run computes
    my_flow()  # Subsequent run within expiry should reuse cachefrom prefect.cache_policies import Inputs, TaskSource
@task(cache_policy=TaskSource() | Inputs(exclude=["client"]))
def do_work(client, xs: list[int]):
    ...TASK_SOURCE | INPUTSRUN_IDkey_storageMarvin
09/17/2025, 9:20 AMPREFECT_LOGGING_LEVEL=DEBUGOlve Heitmann
09/17/2025, 9:56 AMMarvin
09/17/2025, 9:56 AMMarvin
09/17/2025, 9:58 AMOlve Heitmann
09/17/2025, 10:24 AMMarvin
09/17/2025, 10:24 AMMarvin
09/17/2025, 10:25 AMTaskSource() | Inputs()Olve Heitmann
09/17/2025, 10:27 AMMarvin
09/17/2025, 10:27 AMMarvin
09/17/2025, 10:28 AM@task(cache_policy=TaskSource() | Inputs())
def a(x, _ns="task_a"):
    ...
@task(cache_policy=TaskSource() | Inputs())
def c(x, _ns="task_c"):
    ...