Jitesh Khandelwal
03/04/2021, 5:52 PMZanie
state_handler
import prefect
from prefect import task, Flow
import pendulum
from prefect.engine.state import Skipped
MAX_RETRIES = 5
def state_handler(task, old_state, new_state):
if new_state.is_failed() and prefect.context.get("task_run_count", 0) > MAX_RETRIES:
new_state = Skipped("Exceeded retry count")
return new_state
@task(
max_retries=MAX_RETRIES,
retry_delay=pendulum.duration(milliseconds=1),
state_handlers=[state_handler],
)
def task_that_fails():
print(prefect.context.get("task_run_count"))
raise Exception("FAILED")
with Flow("retry-count") as flow:
task_that_fails()
flow.run()
Jitesh Khandelwal
03/04/2021, 5:58 PMtask_run_count
magic. Thanks for the prompt response. 🙂