Robert Hales
08/02/2021, 1:59 PMmax_retries
on a task at flow run time?Kevin Kho
Robert Hales
08/02/2021, 2:04 PMKevin Kho
Robert Hales
08/02/2021, 2:08 PMKevin Kho
Kevin Kho
Robert Hales
08/02/2021, 2:15 PMSam Cook
08/02/2021, 5:24 PMprefect.context.get("task_run_count", 1)
If you pass the retry number as a parameter the retry conditional can managed internally to the flow/task, since raising the Prefect Retry signal will trigger a retry even if max_retries has been exceeded.
https://docs.prefect.io/api/latest/core/task.html#task-2Kevin Kho
Anna Geller (old account)
08/02/2021, 10:46 PM"""
First set the key:
from prefect.backend import set_key_value
key_value_uuid = set_key_value(key="variable_nr_retries", value=5)
"""
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.backend import get_key_value
KEY_NAME = "variable_nr_retries"
@task(max_retries=2, retry_delay=timedelta(seconds=2))
def retry_test():
logger = prefect.context.get("logger")
run_count = prefect.context.get("task_run_count")
<http://logger.info|logger.info>("%s. TaskRun", run_count)
raise Exception("Failing to test retries...")
with Flow("retry-tester") as flow:
nr_retries = int(get_key_value(key="variable_nr_retries"))
retry_test(task_args=dict(max_retries=nr_retries))
if __name__ == "__main__":
flow.run()
Robert Hales
08/03/2021, 7:59 AM