YD
08/09/2021, 11:18 PMdef my_state_handler(obj, old_state, new_state):
    if new_state.is_finished() and new_state.is_failed():
        # send notificationKevin Kho
task_run_countYD
08/09/2021, 11:33 PMnew_state.is_finished() and new_state.is_failed() and not new_state.is_retrying()Kevin Kho
prefect.context.get("task_run_count")YD
08/09/2021, 11:37 PMnew_state.context.get('task_run_count') == obj.max_retriesYD
08/09/2021, 11:42 PMold_stateKevin Kho
new_state.is_failed()from prefect import task, Flow
import prefect
from datetime import timedelta
def mystatehandler(task, old_state, new_state):
    if new_state.is_failed():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("RUN COUNT")
        <http://logger.info|logger.info>(prefect.context.get("task_run_count"))
        <http://logger.info|logger.info>("MAX RETRIES")
        <http://logger.info|logger.info>(task.max_retries)
@task(max_retries=3, retry_delay=timedelta(0), state_handlers=[mystatehandler])
def abc(x):
    raise ValueError()
    return x
@task
def log(x):
    return x
with Flow("test") as flow:
    a = abc(1)
    log(a)
flow.run()Kevin Kho
def mystatehandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("RUN COUNT")
    <http://logger.info|logger.info>(prefect.context.get("task_run_count"))
    <http://logger.info|logger.info>("MAX RETRIES")
    <http://logger.info|logger.info>(task.max_retries)
    if new_state.is_failed():
        <http://logger.info|logger.info>("I FAILED")YD
08/09/2021, 11:52 PM<http://logger.info|logger.info>("I FAILED")new_state.is_failed()new_state.is_failed()Kevin Kho
task_run_countmax_retriesis_finishedYD
08/09/2021, 11:58 PMif new_state.context.get('task_run_count') == obj.max_retries and new_state.is_failed()Kevin Kho
new_state.context.get("task_run_count")prefect.context.get("task_run_count")