Bruno Nirello
05/23/2024, 5:57 PMMarvin
05/23/2024, 5:57 PM@flow(on_failure=[on_flow_failure])
, you can follow the structure provided in similar examples found in the Prefect GitHub issues and documentation.
Here's how you can define an on_failure
hook in your flow:
### Example Flow with on_failure
Hook
python
from prefect import flow, get_run_logger
def on_flow_failure(flow, flow_run, state):
# Customize this function to handle the flow failure here
logger = get_run_logger()
exception_info = state.reason # Retrieve the exception that caused the failure
logger.error(f"Flow failed with an error: {exception_info}")
@flow(on_failure=[on_flow_failure])
def my_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting my flow...")
# This will cause a failure
result = 1 / 0
if __name__ == "__main__":
my_flow()
### Key Points:
1. Define the failure handler: The on_flow_failure
function handles what should happen on failure (e.g., logging the error).
2. Attach the handler: Use the @flow(on_failure=[on_flow_failure])
decorator to attach the handler to your flow.
### Resources:
- Flow Hooks Prefect Documentation
- Example Issue on GitHub about on_failure Hooks
This will ensure that whenever the flow fails, the on_flow_failure
hook captures and logs the exception details.
Feel free to adjust the on_flow_failure
function to meet your specific requirements, such as sending an alert or saving the error details to an external system.Bruno Nirello
05/23/2024, 5:59 PMMarvin
05/23/2024, 6:00 PMresult
and message
, rather than reason
.
Here’s the corrected version for your flow:
### Example Flow with on_failure
Hook
python
from prefect import flow, get_run_logger
from prefect.states import Failed
def on_flow_failure(flow, flow_run, state):
# This function will handle the flow failure
logger = get_run_logger()
if isinstance(state, Failed):
exception_info = state.result # Retrieve the exception result that caused the failure
logger.error(f"Flow failed with an error: {exception_info}")
else:
logger.error("Flow failed without an exception result.")
@flow(on_failure=[on_flow_failure])
def my_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting my flow...")
# This will cause a failure
result = 1 / 0
if __name__ == "__main__":
my_flow()
### Key Points:
1. State Inspection: The on_flow_failure
function inspects whether the state is an instance of Failed
and retrieves the result
attribute.
2. Logging the Exception: Logs the exception information contained in the result
.
### Additional Resources:
- Prefect Documentation on States
This should help you capture and log the error details when your flow fails.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by