Hello. I would like to understand my options for t...
# ask-community
j
Hello. I would like to understand my options for task output caching. Are my only options for caching (which can be invalidate when inputs/parameters change): • Memory, when running Prefect Core locally • PrefectResult, when running in Server or Cloud Or is it possible to use a different backend such as S3. Ideally I would like to use a combination of
target
and
cache_for
. So far I’ve not got this to work*, is this supported? *the target file is written and used by subsequent flow runs but it is never invalidated
Copy code
result_location_and_target = "cache/{project_name}/{flow_name}/{task_name}.prefect_result"

s3result = S3Result(bucket="bucket-name", location=result_location_and_target)

@task(
    cache_for=datetime.timedelta(minutes=10),
    cache_validator=all_parameters,
    checkpoint=True,
    target=result_location_and_target,
    result=s3result,
)
k
Hey @Jonathan Wright, you can add some form of timestamp to the
target
to “invalidate” it. For example, adding the date with the day would invalidate after a day. You can then use S3Result and a target instead of location.
j
Hi Kevin, thank you for the quick reply. Yes I’m sure that would work. What I would really like is the cache to invalid when the the task function parameter(s) change. I could try and include these parameters in the S3 path as well. However it looks like this behaviour is included in Prefect cache_validator classes and I was wondering if I could make use of those in combination with S3.
k
Ah gotcha. You can use
cache_for
with
cache_validator
with the S3 result. Don’t use
target
, that is a file-based caching mechanism. Just use the result
location
. Use the
all_input
validator here: https://docs.prefect.io/api/latest/engine/cache_validators.html#functions
I think
all_parameter
is Prefect parameters, not task inputs.
j
Great. I thought I’d tried that. I’ll give it another go now I know I can use a S3 result with a cache validator 👍
Hi @Kevin Kho, caching is not working as I expect it with an S3Result. I’ve included a simple example below. Please can you see if anything is wrong or if I’ve misunderstood. Thank you. The result is written to S3. But each time I execute this script I get
Can't use cache because it is now invalid
and the task is run again. Environment variables
Copy code
export PREFECT__LOGGING__LEVEL=DEBUG
export PREFECT__FLOWS__CHECKPOINTING=true
Copy code
import datetime

from prefect import Flow, Parameter, context, task
from prefect.engine.cache_validators import all_inputs
from prefect.engine.results import S3Result

@task(
    cache_for=datetime.timedelta(hours=1),
    result=S3Result(bucket="my-bucket"),
    checkpoint=True,
)
def cache_this(x):

    # make it clear in the log if task is being executed or skipped
    <http://context.logger.info|context.logger.info>("\n\nTask was run!!!!!!!\n")

    return f"The value {x} was cached at {datetime.datetime.now()}"


with Flow("cache test") as flow:
    x = Parameter(name="x")
    cached_value = cache_this(x)

state = flow.run(parameters={"x": 42})
print(state.result[cached_value]._result.__dict__)
Looking at the content of what is serailized to S3 it is clear why this doesn’t work when running locally. This is all that was stored.
The value 42 was cached at 2021-07-15 10:17:26.133250
Since no Flow Parameters or task inputs were cached there’s clearly no way the cache_validators would have the information required to know if this cached value is valid or not. I was hoping the duration cache might work since the S3 object does have a timestamp. I notice if I run this flow in Prefect Server the cached value is used. I imagine this is only working because Prefect is storing meta-data about my flow run in it’s database. Is this correct? Which leads me back to my first question. When working locally it appears I have to run subsequent flow runs in the same python session so that flow metadata is available in memory despite the fact I’m caching results to external storage (S3). I had assumed switching to external storage would remove this requirement. I can see why now I was having some success with
target
. Since this doesn’t rely on flow metadata in memory or in the Prefect database. But the only control over validation is what I choose to include in the object path. Maybe in future the framework could append the required flow meta-data to my seralized result so that the built-in cache_validators can be used.
k
I see what you are saying. Everything you are saying is pretty much right. Yes you are right that caching will not work without a backend (Cloud or Server). The cache relies on searching previous tasks in the database with a
Cached
state. When working locally, target would work because it is file-based and subsequent flow runs using
flow.run()
will not see the cache.
Are you using
flow.run()
in production? Or is this just for testing? The thing is, I’m not sure the metadata belongs in the
Result
in general because some users don’t serialize their results. Think like writing out a DataFrame as a csv or parquet, and then that file would be opened in a downstream process.
j
flow.run()
is just for testing. But still very useful to have caching at this stage of the SDLC. I’ve adjusted my dev environment setup accordingly. You’re right. You can overlook my last comment. Having state meta-data mixed up with persisted results would likely be hard to manage. As I previously mentioned, what I struggled to understand even after reading the docs, which is now clear, is why the python memory was still required despite choosing to cache my results in external storage. Thank you for the help and keep up the good work.
k
Yes definitely the docs around this is a pain point. Even the two separate mechanisms of
target
and
cache
is unclear. Will raise to the team. Thanks for the understanding!