Another question concerning result caching. Let's...
# ask-community
r
Another question concerning result caching. Let's say we run a flow and there are some failures caused by bugs. Assuming that these bugs only affect the tasks that failed, we would want to fix those bugs locally, reregister the flow and rerun only those tasks that have not been successful before. So I configured the task results to be cached based on all inputs for 48 h as follows:
Copy code
from prefect.engine.cache_validators import all

version = "0_1_17"

s3_result_battery_id = S3Result(
    bucket="accure-prefect-results",
    location=f"{{flow_name}}/{version}{{task_name}}/{{battery_id}}.prefect",
)

import datetime


@task(
    result=s3_result_battery_id,
    # max_retries=max_retry,
    # retry_delay=retry_delay,
    # timeout=300,
    log_stdout=True,
    tags=["analysis", "task"],
    cache_validator=prefect.engine.cache_validators.all_parameters,
    cache_for=datetime.timedelta(hours=48),
)
However, it seems like the failed task runs aren't rerun either, as the inputs have not changed and therefore the results are read from the cache. How to explicitly tell prefect to rerun those tasks, which have previously failed? Or am I missing something? Bonus: • Is that actually a good philosophy? • I thought of introducing the
package release version (simply "version" in above code snippet)
into the mix ◦ so that one can control with the package release version when to rerun all tasks ◦ e.g. due to some feature enhancements in an entirely new flow run, as opposed to just some bugfixes within the same flow release • note:
package release version != prefect flow version
• In general we are not yet quite sure how to best handle versioning and staging with prefect 🤔
1
Ah, rereading the docs on output caching based on a file target it seems like
targets
should be used instead of results to handle those cases? So are there both
targets
and
results
created in that case? How do I specify the location of the target? In Using Result targets for efficient caching it sounds like both results and targets are stored in the same place:
If you provide a 
location
 to a task's 
Result
 and a 
target
 then the target will be used as the location of the result.
Are there more docs on this? Right now I would try something like:
Copy code
version = "0_1_17"

location_path = f"{{flow_name}}/{version}/{{task_name}}/{{battery_id}}.prefect"

s3_result_battery_id = S3Result(
    bucket="accure-prefect-results",
    location=location_path,
)

import datetime


@task(
    target=location_path,
    result=s3_result_battery_id,
cache_validator=prefect.engine.cache_validators.all_inputs,
    cache_for=datetime.timedelta(hours=48),
)
But that does not quite feel right 🤔
k
Are you mapping the task that is being stored?
👍 1
On the bonus, you can up the Python package version if your project is a Python library
No there aren’t more docs unfortunately.. We certainly need more around caching and persisting.
😕 1
👍 1
I ask about the mapped task because someone else mentioned a similar issue last week
r
yes, it is a mapped task
Thanks for answering the philosophical part 🙂 Wanted to make sure to not do something totally off.
So is there a way to tell prefect to only rerun the failing tasks? Basically making these checks: • result already there? ◦ No -> run task ◦ Yes ▪︎ failed state? • Yes -> run task • No ◦ inputs all the same? ◦ No -> run task Is it doable with targets? How would I identify whether the targets were created? If that is not doable with targets, maybe it makes sense to implement it as cache_validator in a merge request?
Ah, one moment
targets solved the issue 🙂 So I guess this could be reformulated in a documentation improvement issue. In general would be appreciated if there were some
scenario-based documentation
.
k
I think there is a related issue . There is something wrong with the cache where cached results of a mapped task seem be used when retrying the failed ones.
I’ll make a more specific issue for this
Oh man targets solved it for you? That’s good to know. Will try to replicate on my end.
r
Yes, seems like it works with targets. However, • documentation could be improved • I am still not 100% clear whether results are now also stored within the target or not stored at all? • I am not sure whether I always would want to rerun the failed tasks, so it might make sense to leave it to the user with a parameter
rerun_states=["failed", "cancelled", ...]
that by default is empty?
k
Yes on docs. I believe they are stored. Why would you not want to rerun the failed task? Just trying to get a better sense.
r
I am not sure 🤔
In most cases one probably wants to rerun the failed tasks 👍
k
Cuz we have tools to change it to success if you don’t want to re-run
r
You mean via interactive API?
k
State handlers for programmatic control. Also you can
try-except
then raise SUCCESS?
r
Ah, yeah, got it 👍 Or set other custom states, I guess?
One last question: Is there a way to circumvent caching, e.g. with a parameter that can be set from prefect UI? Otherwise one would have to delete the targets I guess, which would be a bit manual 🤔
k
I think you can raise an issue for that one and the core team will see it
Is the goal to invalidate the current cache?
r
Yes, also, I am not sure whether target and results are working perfectly yet. Will keep you posted.
But we can "close" this thread 🙂
Thanks for the help @Kevin Kho!
k
Sure @Robin!
I revisited this and it looks like it should work. Will give it a shot tom.