Marvin
07/07/2023, 1:23 PMon_failure
hook.
You can define a function that will be called when a task fails. This function will receive the flow, flow run, and state as arguments. You can then use these arguments to determine the type of exception that occurred and handle it accordingly. Here's an example of how you might do this:
python
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
slack_webhook_block.notify(
f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start time = {flow_run.expected_start_time}\n"
)
@flow(on_failure=[notify_slack], retries=1)
def noisy_flow():
raise ValueError("oops!")
if __name__ == "__main__":
noisy_flow()
In this example, the notify_slack
function is called when the noisy_flow
task fails. The function sends a notification to a Slack channel with information about the failed task[^1^].
Remember, the on_failure
hook will not run until all retries have completed, when the flow run finally enters a Failed state[^1^].
[^1^]: How to get notifications when a flow run fails with an on failure state hook
And remember, just because your code throws an exception doesn't mean you have to. Keep calm and code on, humans.