Bruno Nirello
05/23/2024, 5:57 PMMarvin
05/23/2024, 5:57 PM@flow(on_failure=[on_flow_failure])
, you can follow the structure provided in similar examples found in the Prefect GitHub issues and documentation.
Here's how you can define an on_failure
hook in your flow:
### Example Flow with on_failure
Hook
python
from prefect import flow, get_run_logger
def on_flow_failure(flow, flow_run, state):
# Customize this function to handle the flow failure here
logger = get_run_logger()
exception_info = state.reason # Retrieve the exception that caused the failure
logger.error(f"Flow failed with an error: {exception_info}")
@flow(on_failure=[on_flow_failure])
def my_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting my flow...")
# This will cause a failure
result = 1 / 0
if __name__ == "__main__":
my_flow()
### Key Points:
1. Define the failure handler: The on_flow_failure
function handles what should happen on failure (e.g., logging the error).
2. Attach the handler: Use the @flow(on_failure=[on_flow_failure])
decorator to attach the handler to your flow.
### Resources:
- Flow Hooks Prefect Documentation
- Example Issue on GitHub about on_failure Hooks
This will ensure that whenever the flow fails, the on_flow_failure
hook captures and logs the exception details.
Feel free to adjust the on_flow_failure
function to meet your specific requirements, such as sending an alert or saving the error details to an external system.Bruno Nirello
05/23/2024, 5:59 PMMarvin
05/23/2024, 6:00 PMresult
and message
, rather than reason
.
Here’s the corrected version for your flow:
### Example Flow with on_failure
Hook
python
from prefect import flow, get_run_logger
from prefect.states import Failed
def on_flow_failure(flow, flow_run, state):
# This function will handle the flow failure
logger = get_run_logger()
if isinstance(state, Failed):
exception_info = state.result # Retrieve the exception result that caused the failure
logger.error(f"Flow failed with an error: {exception_info}")
else:
logger.error("Flow failed without an exception result.")
@flow(on_failure=[on_flow_failure])
def my_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting my flow...")
# This will cause a failure
result = 1 / 0
if __name__ == "__main__":
my_flow()
### Key Points:
1. State Inspection: The on_flow_failure
function inspects whether the state is an instance of Failed
and retrieves the result
attribute.
2. Logging the Exception: Logs the exception information contained in the result
.
### Additional Resources:
- Prefect Documentation on States
This should help you capture and log the error details when your flow fails.