Kyle Flanagan
01/14/2021, 5:21 PMZanie
import prefect
from prefect import task, Flow
import pendulum
MAX_RETRIES = 5
def state_handler(task, old_state, new_state):
if new_state.is_failed() and prefect.context.get("task_run_count", 0) > MAX_RETRIES:
print("NOTIFY!")
@task(
max_retries=MAX_RETRIES,
retry_delay=pendulum.duration(milliseconds=1),
state_handlers=[state_handler],
)
def task_that_fails():
print(prefect.context.get("task_run_count"))
raise Exception("FAILED")
with Flow("retry-count") as flow:
task_that_fails()
flow.run()
Kyle Flanagan
01/14/2021, 6:01 PMdef state_handler(task, old_state, new_state):
if new_state.is_failed() and prefect.context.get("task_run_count", 1) > task.max_retries:
handle_failure(task)
return new_state
Zanie