Hey :slightly_smiling_face:, I have a question re...
# ask-community
f
Hey 🙂, I have a question regarding caching. There are some tasks in my flow for which I cache the results using GCSResult. If I set the 
cache_for
 parameter to 30 minutes, does this mean that every 30 minutes this task is rerun and cached again? My downstream task fails because this upstream task contains a token (for 1 hour), so it seems as if this this is not happening in my case.
👋 1
m
Hello Fabrice! To narrow the issue, did you specify any
cache_validators
or only
cache_for
parameter?
f
Hi @Mariia Kerimova! Only the cache_for parameter 😀
k
Hey @Fabrice Toussaint, are you calling the same task again downstream? The
cache_for
task will not automatically re-run.
f
Hey @Kevin Kho, I am rerunning a flow multiple times, so the task is called downstream multiple times yes 🙂
How can I make it so that every 30 minutes the cached task is rerun if it is used downstream then?
k
I see. Do you know if you’re pulling the old token when you try this?
f
It is pulling the cached (old?) token it seems, even after the 30 minutes
Copy code
@task(cache_for=dt.timedelta(minutes=15), result=GCSResult(bucket="XXX",                                                      location=r"/{today}/{flow_run_id}/{task_run_id}.prefect_result"))
this is what I use
k
Can you try using a
cache-key
instead?
Copy code
@task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
f
Does it matter what value I fill in for the cache_key, I thought it was set on the taskId
k
It doesn’t as long as it’s the same across the flow runs to pull from the same location
f
I will give that a go
Thanks a lot already 🙂
k
Yeah no problem. I suspect that output caching doesn’t really play well with persisting results.
f
It doesnt work
It seems as if the result is set to None?
Copy code
TypeError: 'NoneType' object is not iterable
This is what I receive
Copy code
@task(cache_for=dt.timedelta(minutes=15), cache_key='shipment_analysis')
k
I’ll make a working example later for you.
f
@Kevin Kho did you have time to make an example already perhaps? 🙂
k
Copy code
from prefect import Flow, task, Parameter
import prefect
import datetime
from prefect.engine.results import LocalResult

@task(cache_for=datetime.timedelta(minutes=5),
      result=LocalResult(location="test/{flow_run_id}/{task_run_id}.prefect_result"), 
      cache_key="my-key")
def cache_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return x

@task
def next_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"This is the next task {x}")
    return x    

with Flow("cache-flow1") as flow:
    x = Parameter("x", "12345")
    c = cache_task(x)
    next_task(c)

flow.register('tutorial')
This works for me mostly. I get another error though related to the cache
I tested this by running it, and then running it again but changing the parameter. The old value was returned. I ran again after 5 minutes with a new parameter, and I got the new value was returned.
I have gotten more insight from the team about caching. Target is a form of caching where you look if the file exists and use it if it is there. You need to add a timestamp to the filename of the cache to “invalidate” it
cache_for, cache_key, and cache_validator are the other approach to targets. The script I gave you should work.