Is there a way to always run a "cleanup" task, regardless of pipeline status without setting
allow_failure
on all subsequent task?
✅ 1
Morten Hauge
11/07/2023, 10:03 PM
Answering my own question here to say that the
state change hooks
concept appears to be exactly what I need!
k
Kevin Grismore
11/07/2023, 10:03 PM
🙂
m
Morten Hauge
11/07/2023, 11:03 PM
@Kevin Grismore
on_failure
hooks should always trigger, even when the returned state is manual, right?
I have a task that returns
Failed(message="something")
when a certain condition is triggered. In the Prefect UI this is shown as an ERROR log, but my hook is not ran. However it does seem to run when the Prefect worker encounters an unexpected exception. I've registered the same hook for both on_failure, on_completion and on_crashed to cover all bases.
k
Kevin Grismore
11/07/2023, 11:17 PM
Just trying locally, this runs my hook.
Copy code
from prefect import flow, task
from prefect.states import Failed
def hook(task, task_run, state):
print("flow failed")
@flow
def my_flow():
always_fail()
@task(on_failure=[hook])
def always_fail():
return Failed(message="something")
if __name__ == "__main__":
my_flow()
m
Morten Hauge
11/08/2023, 8:12 AM
Yeah, weird, I also did the same test and observed the same thing, but it worked differently on my remote worker. Most likely an issue on my end then.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.