WarioRD1
07/01/2025, 11:03 AMMarvin
07/01/2025, 11:03 AMMarvin
07/01/2025, 11:05 AMpython
from prefect import flow
from prefect.client.schemas.objects import Flow, FlowRun, State
def failure_handler(flow: Flow, flow_run: FlowRun, state: State):
# Access flow parameters
flow_params = flow_run.parameters
# Access error information
error_message = state.message
error_type = state.type
# Access flow metadata
flow_name = flow.name
flow_run_id = flow_run.id
print(f"Flow '{flow_name}' failed with parameters: {flow_params}")
print(f"Error: {error_message}")
print(f"Flow run ID: {flow_run_id}")
@flow(on_failure=[failure_handler])
def my_flow(x: int, y: str):
raise ValueError("Something went wrong")
# Example usage
my_flow(x=1, y="test")
The failure handler receives three important objects:
1. `flow`: The Flow object containing flow metadata
- `flow.name`: Name of the flow
- `flow.version`: Flow version
- Other flow-level attributes
2. `flow_run`: The FlowRun object containing runtime information
- `flow_run.id`: Unique identifier for this run
- `flow_run.parameters`: Dictionary of parameters passed to the flow
- `flow_run.start_time`: When the flow started
- `flow_run.expected_start_time`: Scheduled start time
3. `state`: The State object containing failure details
- `state.message`: Error message
- `state.type`: State type (e.g., "FAILED")
- `state.timestamp`: When the failure occurred
You can also create an async failure handler:
python
from prefect import flow
async def async_failure_handler(flow: Flow, flow_run: FlowRun, state: State):
# Access same parameters as sync handler
params = flow_run.parameters
error = state.message
# Perform async operations
await notify_slack(f"Flow failed with params {params}, error: {error}")
@flow(on_failure=[async_failure_handler])
def my_flow(x: int, y: str):
raise ValueError("Something went wrong")
The handler will be called automatically when the flow fails, and you'll have access to all the flow's runtime information to handle the failure appropriately.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by