Pooyan
11/13/2023, 8:44 PMmy_task_failure_hook
does not get triggered when I run my_flow
. Any thoughts why?
def my_task_failure_hook(task, task_run, state):
print(f"*******failure hook state: {state}, task run is {task_run} and params are {task_run.parameters}***********")
@task(retries=5, retry_delay_seconds=3, on_failure=[my_task_failure_hook])
def my_task(parameter):
raise ValueError
@flow()
def my_flow(parameter):
parameter = my_task(parameter)
return parameter
Marvin
11/13/2023, 8:45 PMmy_task_failure_hook
function should be defined with the following signature: my_task_failure_hook(task: Task, old_state: State, new_state: State) -> State
. The function takes three arguments: the task that is changing state, the state the task is changing from, and the state the task is changing to. The function should return a state which the task will be set to.
Here's how you can modify your code:
from prefect import task, Flow, State
from prefect.engine.state import Failed
def my_task_failure_hook(task, old_state, new_state):
if isinstance(new_state, Failed):
print(f"****failure hook state: {new_state}, task run is {task}********")
return new_state
@task(retries=5, retry_delay_seconds=3, on_failure=[my_task_failure_hook])
def my_task(parameter):
raise ValueError
with Flow("my_flow") as my_flow:
parameter = my_task('some_parameter')
In this code, the my_task_failure_hook
function checks if the new state is a Failed
state, and if so, it prints the failure message. The function then returns the new state, which is the state that the task will be set to.
Please try this and let me know if it works for you.