<@ULVA73B9P> I am trying to deploy a flow with cac...
# ask-marvin
o
@Marvin I am trying to deploy a flow with caching. I have a helper called task_cach_kwargs from pathlib import Path from datetime import timedelta from prefect.cache_policies import INPUTS, TASK_SOURCE from prefect.filesystems import LocalFileSystem from prefect.locking.filesystem import FileSystemLockManager from brrr.orchestration import prefect_constants as pfc def task_cache_kwargs(_expiration_: timedelta = timedelta(days=1)): ''' Cache Kwargs Creator for the Task Decorator. Usage example: @task(**task_cache_kwargs()) def expensive_task(x): ... ''' # TODO: Might need to add support for handling Non-Serializable Objects in INPUTS # https://docs-3.prefect.io/v3/advanced/caching#handling-non-serializable-objects # OR use INPUTS - 'kwarg' for kwargs in INPUT that are not serializable. # However makes cache key harder to invalidate (less updates). results_fs = LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/results") cache_fs = LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/cache") locks_dir = FileSystemLockManager(_lock_files_directory_=Path(f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/locks")) return { 'cache_policy': (INPUTS + TASK_SOURCE).configure( _key_storage_=cache_fs, # write cache index to shared volume _isolation_level_="SERIALIZABLE", # optional but recommended to avoid duplicate work _lock_manager_=locks_dir, ), 'result_storage': results_fs, 'cache_expiration': expiration, 'persist_result': True, } and then ran @task(tags=[pfc.borsdata_extract_tag, pfc.borsdata_scrape_tag], **task_cache_kwargs(timedelta(days=2))) as my task decorator. fails at .from_source().deploy(...) Traceback (most recent call last): File "<frozen runpy>", line 198, in _run_module_as_main File "<frozen runpy>", line 88, in _run_code File "/mnt/c/qfin/qfin/brrr/data_collection/borsdata/prefect_deploys/bd_scrape_estimates_prefect.py", line 105, in <module> @task(tags=[pfc.borsdata_extract_tag, pfc.borsdata_scrape_tag], **task_cache_kwargs(timedelta(days=2))) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/olveh/envs/myproject/lib/python3.12/site-packages/prefect/tasks.py", line 2024, in task return Task( ^^^^^ File "/home/olveh/envs/myproject/lib/python3.12/site-packages/prefect/tasks.py", line 575, in init raise TypeError( TypeError: Result storage configuration must be persisted server-side. Please call
.save()
on your block before passing it in.
m
thought for 204.6 seconds
Thanks for the traceback — the error is happening because you’re passing an unsaved
LocalFileSystem
block instance directly to the
@task
decorator. In Prefect 3.x, any block passed as
result_storage
(and the cache
key_storage
, too) must be persisted server-side first, then referenced by its saved document. Why it fails during deploy:
.from_source().deploy(...)
imports your flow module; when the decorator runs, Prefect validates
result_storage
and rejects unsaved blocks. Fix: save your blocks once, then load them in your helper - Do a one-time setup to create and save your storage blocks (either via code, UI, or API). Don’t do this at import-time in the module that defines your flow. Example one-time setup script:
Copy code
from prefect.filesystems import LocalFileSystem
from brrr.orchestration import prefect_constants as pfc

LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/results").save("brrr-results")
LocalFileSystem(basepath=f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/cache").save("brrr-cache")
- Then update your
task_cache_kwargs
to load those saved blocks:
Copy code
from pathlib import Path
from datetime import timedelta
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.filesystems import LocalFileSystem
from prefect.locking.filesystem import FileSystemLockManager
from brrr.orchestration import prefect_constants as pfc

def task_cache_kwargs(expiration: timedelta = timedelta(days=1)):
    # Load blocks that were saved previously
    results_fs = LocalFileSystem.load("brrr-results")
    cache_fs = LocalFileSystem.load("brrr-cache")

    # Lock manager does not need to be a block; just ensure this path is shared and writable
    locks_dir = FileSystemLockManager(
        lock_files_directory=Path(f"{pfc.PREFECT_CACHE_RELATIVE_PATH}/locks")
    )

    return {
        "cache_policy": (INPUTS + TASK_SOURCE).configure(
            key_storage=cache_fs,             # write cache index to shared volume
            isolation_level="SERIALIZABLE",   # recommended to avoid duplicate work
            lock_manager=locks_dir,
        ),
        "result_storage": results_fs,
        "cache_expiration": expiration,
        "persist_result": True,
    }
- Use it in your task:
Copy code
@task(tags=[pfc.borsdata_extract_tag, pfc.borsdata_scrape_tag], **task_cache_kwargs(timedelta(days=2)))
def my_task(...):
    ...
Notes and gotchas - The directories behind your
LocalFileSystem
blocks (results, cache, locks) must exist and be accessible wherever your tasks run (e.g., shared volume across workers/containers). If you’re running on multiple machines/containers, a remote/shared filesystem (e.g., S3/GCS via
RemoteFileSystem
) is usually better. - You do not need to persist the
FileSystemLockManager
— just point it to a shared lock directory. Make sure all workers can write to it. - In Prefect 3.x, blocks must be saved server-side before referencing them in code used by deployments. In 2.x you could sometimes pass in unsaved instances; 3.x enforces persistence. Relevant docs - Caching in Prefect 3: Caching - Handling non-serializable objects in cache keys: Non-serializable inputs - Blocks overview: Blocks - Filesystem blocks API: prefect.filesystems - Filesystem lock manager API: prefect.locking.filesystem If you’d like, I can suggest a short setup script or UI steps to create those blocks exactly with your
pfc.PREFECT_CACHE_RELATIVE_PATH
.
o
@Marvin the caching now doesnt return exceptions, but it fails to pick up what should be both unchanged source code and inputs, and thus reuse the cache in subsequent runs within the cache expiry. 1) For the TASK_SOURCE policy, is it looking only at source code within the task that has the cache policy tag, or also source code for eg flows that run the task? Should be restricted to the task itself if that is where the cache policy is set, no? 2) For the inputs - are lists hashable/serializable so that prefects caching system is able to see that they are not changed if they indeed are not? Or could that input cause issues
m
thought for 1020.0 seconds
Great questions — and sorry the cache isn’t reusing as expected. Short answers first, then a few debugging tips. 1) TASK_SOURCE scope - Yes, TASK_SOURCE only hashes the source of the task function that has the cache policy. It does not include flow code or other functions unless they are literally part of the task’s own source. - If your task calls helpers defined elsewhere, changes in those helpers will not invalidate the TASK_SOURCE hash. If you want that behavior, you’ll need to either: - include an additional cache dimension (e.g., INPUTS that carry a version string), or - write a custom cache policy that incorporates helper code digests. - Docs/code refs: - Prefect 3 caching guide - prefect/cache_policies.py (see TaskSource) 2) Inputs hashing and lists - INPUTS hashes the bound arguments to the task. Built-in containers like lists, tuples, and dicts are supported and will produce the same cache key if their contents are the same and consistently serializable. - What can cause issues: - Inputs that contain objects that aren’t stably serializable (e.g., open file handles, DB connections, dynamically generated objects with non-deterministic fields, functions, Prefect futures). - Unordered containers like sets: if elements aren’t consistently ordered or serializable, hashes can vary or fail. Convert sets to sorted tuples first. - Custom classes without a stable serialization. Consider passing a digest (e.g., a string checksum) rather than the object, or exclude the parameter. - You can exclude problematic parameters from the input hash: -
Inputs(exclude=["param_name"])
- Relevant code/docs: - Prefect 3 caching - prefect/cache_policies.py (see Inputs) - Hashing errors surface as
HashError
in prefect/exceptions.py About exceptions and caching - Prefect caches only successful task results. Exceptions are not cached and will not be “replayed” from cache. - If an exception occurs while computing a cache key (e.g., an unserializable input), Prefect will fail the task with a clear error indicating the hashing problem — typically advising you to exclude that argument from the cache key or make it serializable. A small example
Copy code
from prefect import flow, task
from prefect.cache_policies import TASK_SOURCE, INPUTS  # or use Inputs(), TaskSource()

@task(cache_policy=TASK_SOURCE | INPUTS)  # Combine policies with |
def sum_list(xs: list[int]) -> int:
    return sum(xs)

@flow
def my_flow():
    return sum_list([1, 2, 3])

if __name__ == "__main__":
    my_flow()  # First run computes
    my_flow()  # Subsequent run within expiry should reuse cache
If you need to exclude a problematic input:
Copy code
from prefect.cache_policies import Inputs, TaskSource

@task(cache_policy=TaskSource() | Inputs(exclude=["client"]))
def do_work(client, xs: list[int]):
    ...
Troubleshooting checklist for your case - Confirm the policy actually applied to the task is what you intend (e.g.,
TASK_SOURCE | INPUTS
). - Ensure your task result is persisted so a subsequent run can read it. If the result isn’t persisted, Prefect can find the cache key but fail to read the value. By default in Prefect 3, tasks generally persist results; if you’ve disabled it, re-enable or configure result storage. - Check your inputs: if your list contains custom objects or non-deterministic data, that can cause cache misses or hashing errors. Consider passing stable digests (e.g., checksums) or excluding those params. - Avoid including
RUN_ID
in your cache policy — that will force cache misses. - Try setting a
key_storage
on the policy so you can inspect what keys are being computed: ```
from prefect.cache_policies import TaskSource, Inputs @task(cache_policy=TaskSource(key_storage="cache_keys/") | Inputs(key_storage="cache_keys/")) def t(...): ... ``` - Turn on debug logging to see more detail: - Set
PREFECT_LOGGING_LEVEL=DEBUG
and rerun to inspect cache key computation and decisions in logs. If you can share: - your Prefect version, - the task definition (decorator and cache policy), and - an example of the inputs (esp. the list contents), I can help pinpoint why the cache isn’t being reused within the expiry window.
o
@Marvin If a tag based task concurrency limit is met in a flow run in prefect, does the flow fail, or simply wait until the limit is freed up?
m
thought for 94.0 seconds
Short answer: it waits. When a task hits a tag-based concurrency limit in Prefect 3.x, the task is put into the AwaitingConcurrencySlot state and periodically retries acquiring a slot. It does not fail the flow. Other tasks in the flow can continue to run; the waiting task resumes automatically once a slot is freed. - Behavior details: tasks loop between AwaitingConcurrencySlot and attempts to Run until a slot is available or the run is canceled. - There’s no automatic failure just because the limit is reached; if you need a deadline, consider orchestration via Automations (e.g., cancel tasks stuck in AwaitingConcurrencySlot too long) or adjust your limits. Docs: - Tag-based concurrency limits - Global concurrency limits
o
@Marvin If I have multiple tasks that are different, but share the same name (not used in the same flows or deploys), can that mess up caching?
m
thought for 93.4 seconds
Short answer: just sharing the same task name will not, by itself, mess up caching. Details: - In Prefect 3.x, cache keys are derived from the cache policy (e.g., TASK_SOURCE, INPUTS), not from the task’s display name. - With the typical/default policy (effectively TASK_SOURCE | INPUTS), two different task functions will not share cache even if they have the same name: - TASK_SOURCE includes a hash of the task function’s source (the def line and body), so different functions → different key. - INPUTS hashes the bound arguments; different inputs → different key. Where you could see collisions: - If you use only INPUTS for two different tasks and they receive identical inputs, they could share a cache entry across flows/deployments within the same tenant. If the two tasks compute different results, that would be undesirable. - If you use only TASK_SOURCE and the two task functions are literally identical (same name and body), they could share a cache entry. How to avoid collisions: - Prefer combining policies:
TaskSource() | Inputs()
(the default behavior), which disambiguates across both function code and inputs. - If you truly need INPUTS-only, add a namespace to the inputs (e.g., a fixed string parameter included in the hash), or create a custom cache key function. - Caches are tenant-wide (not per-flow/deployment), so think globally when choosing your policy. Docs: - Prefect 3 caching guide
o
@Marvin I use both inputs and task source, so that is fine. My question was more - if task "a" named "b" runs, then task "c" also named "b" runs, and i again run task "a" - could it have issues finding its old cache?
m
thought for 32.2 seconds
Short answer: No. With TASK_SOURCE | INPUTS, running another task with the same name will not disrupt task “a”’s cache. Why: - Cache keys come from the cache policy, not the task’s display name. - TASK_SOURCE hashes the task function’s source. Different functions → different hash → different cache keys. - INPUTS adds the bound arguments to the key. Different inputs → different key. - Therefore, task “c” (same name, different function and/or inputs) won’t overwrite or hide task “a”’s cached value. When you run task “a” again, it will look up its own key and find its cache as long as it hasn’t expired. When keys could collide: - If task “a” and task “c” have identical function source and identical inputs, they will share the same cache entry. That’s usually fine (shared reuse), but if you prefer strict separation, include a fixed “namespace” value in the inputs for each task. Example to force separation:
Copy code
@task(cache_policy=TaskSource() | Inputs())
def a(x, _ns="task_a"):
    ...

@task(cache_policy=TaskSource() | Inputs())
def c(x, _ns="task_c"):
    ...
Docs: - Prefect 3 caching guide - cache_policies.py