When creating a state handler, how can I tell when...
# ask-community
y
When creating a state handler, how can I tell when a task fails the last retry? tried
Copy code
def my_state_handler(obj, old_state, new_state):
    if new_state.is_finished() and new_state.is_failed():
        # send notification
but this did not work I want to send a notification only if the last retry fail
k
Hey @YD, the context has a variable called
task_run_count
. Maybe you can use that and compare against max_retries?
y
the context is not populated when
new_state.is_finished() and new_state.is_failed() and not new_state.is_retrying()
is True but I am looking into it
k
You can use
prefect.context.get("task_run_count")
inside a state handler
y
this
new_state.context.get('task_run_count') == obj.max_retries
is true in the state before it fails in the last time
I can't use the
old_state
as well, since there are couple of intermediate steps between when when is starts the last retry and when it fails
k
A bit unclear why you can’t compare them after
new_state.is_failed()
like this:
Copy code
from prefect import task, Flow
import prefect
from datetime import timedelta

def mystatehandler(task, old_state, new_state):
    if new_state.is_failed():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("RUN COUNT")
        <http://logger.info|logger.info>(prefect.context.get("task_run_count"))
        <http://logger.info|logger.info>("MAX RETRIES")
        <http://logger.info|logger.info>(task.max_retries)

@task(max_retries=3, retry_delay=timedelta(0), state_handlers=[mystatehandler])
def abc(x):
    raise ValueError()
    return x

@task
def log(x):
    return x

with Flow("test") as flow:
    a = abc(1)
    log(a)

flow.run()
You can also pull the values before you check the state:
Copy code
def mystatehandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("RUN COUNT")
    <http://logger.info|logger.info>(prefect.context.get("task_run_count"))
    <http://logger.info|logger.info>("MAX RETRIES")
    <http://logger.info|logger.info>(task.max_retries)
    if new_state.is_failed():
        <http://logger.info|logger.info>("I FAILED")
y
wouldn't this log
<http://logger.info|logger.info>("I FAILED")
every time
new_state.is_failed()
is True? and
new_state.is_failed()
is True for any retry that fail.
k
Yep you’re right. this would. You could use the
task_run_count
and
max_retries
in your condition or used the
is_finished
also.
y
if you referring to something like
Copy code
if new_state.context.get('task_run_count') == obj.max_retries and new_state.is_failed()
this does not work
k
new_state.context.get("task_run_count")
returns None I think. I think you want
prefect.context.get("task_run_count")
👍 1