Robin
05/05/2021, 3:46 PMfrom prefect.engine.cache_validators import all
version = "0_1_17"
s3_result_battery_id = S3Result(
bucket="accure-prefect-results",
location=f"{{flow_name}}/{version}{{task_name}}/{{battery_id}}.prefect",
)
import datetime
@task(
result=s3_result_battery_id,
# max_retries=max_retry,
# retry_delay=retry_delay,
# timeout=300,
log_stdout=True,
tags=["analysis", "task"],
cache_validator=prefect.engine.cache_validators.all_parameters,
cache_for=datetime.timedelta(hours=48),
)
However, it seems like the failed task runs aren't rerun either, as the inputs have not changed and therefore the results are read from the cache.
How to explicitly tell prefect to rerun those tasks, which have previously failed? Or am I missing something?
Bonus:
• Is that actually a good philosophy?
• I thought of introducing the package release version (simply "version" in above code snippet)
into the mix
◦ so that one can control with the package release version when to rerun all tasks
◦ e.g. due to some feature enhancements in an entirely new flow run, as opposed to just some bugfixes within the same flow release
• note: package release version != prefect flow version
• In general we are not yet quite sure how to best handle versioning and staging with prefect 🤔Robin
05/05/2021, 4:07 PMtargets
should be used instead of results to handle those cases?
So are there both targets
and results
created in that case? How do I specify the location of the target?
In Using Result targets for efficient caching it sounds like both results and targets are stored in the same place:
If you provide aAre there more docs on this? Right now I would try something like:to a task'slocation
and aResult
then the target will be used as the location of the result.target
version = "0_1_17"
location_path = f"{{flow_name}}/{version}/{{task_name}}/{{battery_id}}.prefect"
s3_result_battery_id = S3Result(
bucket="accure-prefect-results",
location=location_path,
)
import datetime
@task(
target=location_path,
result=s3_result_battery_id,
cache_validator=prefect.engine.cache_validators.all_inputs,
cache_for=datetime.timedelta(hours=48),
)
But that does not quite feel right 🤔Kevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
Robin
05/05/2021, 5:24 PMRobin
05/05/2021, 5:28 PMRobin
05/05/2021, 5:35 PMRobin
05/05/2021, 5:42 PMRobin
05/05/2021, 5:46 PMscenario-based documentation
.Kevin Kho
Kevin Kho
Kevin Kho
Robin
05/05/2021, 5:52 PMrerun_states=["failed", "cancelled", ...]
that by default is empty?Kevin Kho
Robin
05/05/2021, 5:54 PMRobin
05/05/2021, 5:55 PMKevin Kho
Robin
05/05/2021, 5:56 PMKevin Kho
try-except
then raise SUCCESS?Robin
05/05/2021, 5:58 PMRobin
05/05/2021, 5:58 PMKevin Kho
Kevin Kho
Robin
05/05/2021, 6:04 PMRobin
05/05/2021, 6:06 PMRobin
05/05/2021, 6:07 PMKevin Kho
Kevin Kho