WarioRD1
07/01/2025, 11:03 AMMarvin
07/01/2025, 11:03 AMMarvin
07/01/2025, 11:05 AMpython
from prefect import flow
from prefect.client.schemas.objects import Flow, FlowRun, State
def failure_handler(flow: Flow, flow_run: FlowRun, state: State):
# Access flow parameters
flow_params = flow_run.parameters
# Access error information
error_message = state.message
error_type = state.type
# Access flow metadata
flow_name = flow.name
flow_run_id = flow_run.id
print(f"Flow '{flow_name}' failed with parameters: {flow_params}")
print(f"Error: {error_message}")
print(f"Flow run ID: {flow_run_id}")
@flow(on_failure=[failure_handler])
def my_flow(x: int, y: str):
raise ValueError("Something went wrong")
# Example usage
my_flow(x=1, y="test")
The failure handler receives three important objects:
1. `flow`: The Flow object containing flow metadata
- `flow.name`: Name of the flow
- `flow.version`: Flow version
- Other flow-level attributes
2. `flow_run`: The FlowRun object containing runtime information
- `flow_run.id`: Unique identifier for this run
- `flow_run.parameters`: Dictionary of parameters passed to the flow
- `flow_run.start_time`: When the flow started
- `flow_run.expected_start_time`: Scheduled start time
3. `state`: The State object containing failure details
- `state.message`: Error message
- `state.type`: State type (e.g., "FAILED")
- `state.timestamp`: When the failure occurred
You can also create an async failure handler:
python
from prefect import flow
async def async_failure_handler(flow: Flow, flow_run: FlowRun, state: State):
# Access same parameters as sync handler
params = flow_run.parameters
error = state.message
# Perform async operations
await notify_slack(f"Flow failed with params {params}, error: {error}")
@flow(on_failure=[async_failure_handler])
def my_flow(x: int, y: str):
raise ValueError("Something went wrong")
The handler will be called automatically when the flow fails, and you'll have access to all the flow's runtime information to handle the failure appropriately.