YD
08/09/2021, 11:18 PMdef my_state_handler(obj, old_state, new_state):
if new_state.is_finished() and new_state.is_failed():
# send notification
but this did not work
I want to send a notification only if the last retry failKevin Kho
task_run_count
. Maybe you can use that and compare against max_retries?YD
08/09/2021, 11:33 PMnew_state.is_finished() and new_state.is_failed() and not new_state.is_retrying()
is True
but I am looking into itKevin Kho
prefect.context.get("task_run_count")
inside a state handlerYD
08/09/2021, 11:37 PMnew_state.context.get('task_run_count') == obj.max_retries
is true in the state before it fails in the last timeYD
08/09/2021, 11:42 PMold_state
as well, since there are couple of intermediate steps between when when is starts the last retry and when it failsKevin Kho
new_state.is_failed()
like this:
from prefect import task, Flow
import prefect
from datetime import timedelta
def mystatehandler(task, old_state, new_state):
if new_state.is_failed():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("RUN COUNT")
<http://logger.info|logger.info>(prefect.context.get("task_run_count"))
<http://logger.info|logger.info>("MAX RETRIES")
<http://logger.info|logger.info>(task.max_retries)
@task(max_retries=3, retry_delay=timedelta(0), state_handlers=[mystatehandler])
def abc(x):
raise ValueError()
return x
@task
def log(x):
return x
with Flow("test") as flow:
a = abc(1)
log(a)
flow.run()
Kevin Kho
def mystatehandler(task, old_state, new_state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("RUN COUNT")
<http://logger.info|logger.info>(prefect.context.get("task_run_count"))
<http://logger.info|logger.info>("MAX RETRIES")
<http://logger.info|logger.info>(task.max_retries)
if new_state.is_failed():
<http://logger.info|logger.info>("I FAILED")
YD
08/09/2021, 11:52 PM<http://logger.info|logger.info>("I FAILED")
every time new_state.is_failed()
is True?
and new_state.is_failed()
is True for any retry that fail.Kevin Kho
task_run_count
and max_retries
in your condition is_finished
also.YD
08/09/2021, 11:58 PMif new_state.context.get('task_run_count') == obj.max_retries and new_state.is_failed()
this does not workKevin Kho
new_state.context.get("task_run_count")
returns None I think. I think you want prefect.context.get("task_run_count")