Is there a way to always run a "cleanup" task, reg...
# ask-community
m
Is there a way to always run a "cleanup" task, regardless of pipeline status without setting
allow_failure
on all subsequent task?
1
Answering my own question here to say that the
state change hooks
concept appears to be exactly what I need!
k
🙂
m
@Kevin Grismore
on_failure
hooks should always trigger, even when the returned state is manual, right? I have a task that returns
Failed(message="something")
when a certain condition is triggered. In the Prefect UI this is shown as an ERROR log, but my hook is not ran. However it does seem to run when the Prefect worker encounters an unexpected exception. I've registered the same hook for both on_failure, on_completion and on_crashed to cover all bases.
k
Just trying locally, this runs my hook.
Copy code
from prefect import flow, task
from prefect.states import Failed


def hook(task, task_run, state):
    print("flow failed")


@flow
def my_flow():
    always_fail()


@task(on_failure=[hook])
def always_fail():
    return Failed(message="something")


if __name__ == "__main__":
    my_flow()
m
Yeah, weird, I also did the same test and observed the same thing, but it worked differently on my remote worker. Most likely an issue on my end then.