https://prefect.io logo
k

Kyle Flanagan

01/14/2021, 5:21 PM
I think there's something I'm missing about states that I just can't figure out. I want to send a notification of a Task failure but ONLY if the task has exceeded its retries. I can't use is_failed(), because I'd get a notification for each failure, same for is_retrying(). I could look at the run_count and if it's on its very last retry go ahead and send a failure notice, but that's not necessarily true (the final attempt may succeed). I thought maybe is_finished() would only be True on the final execution, but that's not the case. Is there a way to know if a task failed and exceeded its retries? (I'm using a task state handler for this)
đź‘€ 1
z

Zanie

01/14/2021, 5:37 PM
Hi @Kyle Flanagan, is this what you’re looking for?
Copy code
import prefect
from prefect import task, Flow
import pendulum

MAX_RETRIES = 5


def state_handler(task, old_state, new_state):
    if new_state.is_failed() and prefect.context.get("task_run_count", 0) > MAX_RETRIES:
        print("NOTIFY!")


@task(
    max_retries=MAX_RETRIES,
    retry_delay=pendulum.duration(milliseconds=1),
    state_handlers=[state_handler],
)
def task_that_fails():
    print(prefect.context.get("task_run_count"))
    raise Exception("FAILED")


with Flow("retry-count") as flow:
    task_that_fails()

flow.run()
k

Kyle Flanagan

01/14/2021, 6:01 PM
Thank you for the guidance. The prefect context is what I needed! Tweaked your example just a bit to use the task's max retries, since, in my case, the Task is defined in a separate module than the handler (for any future readers this may help):
Copy code
def state_handler(task, old_state, new_state):
    if new_state.is_failed() and prefect.context.get("task_run_count", 1) > task.max_retries:
        handle_failure(task)
        return new_state
z

Zanie

01/14/2021, 6:42 PM
Wonderful! That’s definitely a better implementation.