Lester Cayabyab
07/18/2025, 6:07 PMMarvin
07/18/2025, 6:07 PMMarvin
07/18/2025, 6:08 PMon_failure
handler. Here's a complete example:
python
from prefect import flow
@flow
def my_flow():
# Define what happens when the flow fails
@flow.on_failure
def alert_on_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed! Error: {state.message}")
# Simulate a failure by raising an exception
raise ValueError("Something went wrong!")
if __name__ == "__main__":
my_flow()
You can trigger the on_failure
handler in several ways:
1. Raise any unhandled exception (e.g., ValueError
, RuntimeError
, Exception
)
2. Task failures that propagate to the flow level
3. System-level issues that cause the flow to crash
Here's a more realistic example showing different failure scenarios:
python
from prefect import flow, task
@task
def risky_task():
raise ValueError("Task failed!")
@flow
def my_flow():
@flow.on_failure
def alert_on_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed!")
print(f"Error message: {state.message}")
# You could send notifications, cleanup resources, etc.
# Any of these will trigger the on_failure handler:
# Option 1: Direct exception in flow
raise RuntimeError("Flow-level error")
# Option 2: Unhandled task exception
result = risky_task()
# Option 3: Explicit failure check
if some_condition:
raise Exception("Business logic failure")
Key points to remember:
1. Any unhandled exception will trigger the on_failure
handler
2. The handler receives the flow object, flow run, and state information
3. You can use the handler to perform cleanup, send notifications, or log information
4. If you want to ensure task failures trigger the flow's on_failure
handler, make sure exceptions can propagate (don't catch them or use raise_on_failure=False
)
There's no specific exception you must use - any unhandled exception will work. Choose the exception type that best describes the error condition you're handling (e.g., ValueError
for invalid input, RuntimeError
for operational failures, etc.).
Remember that if you want to prevent a task failure from triggering the flow's on_failure
handler, you can catch the exception or use raise_on_failure=False
on the task. However, this means you'll need to handle the failure logic yourself.Lester Cayabyab
07/18/2025, 6:14 PM@flow(log_prints=True, on_failure=[handle_failure]) # type: ignore[misc]
def build(
build_client_name: str,
build_heartbeat_monitor_url: str,
build_scripts: list[dict[str, Any]],
) -> None:
?Marvin
07/18/2025, 6:15 PMMarvin
07/18/2025, 6:15 PM@flow
decorator accepts on_failure
as a list of handler functions, which is exactly what you're doing with on_failure=[handle_failure]
. This is actually an alternative (and equally valid) way to define failure handlers compared to the @flow.on_failure
decorator approach I showed earlier.
Your setup has these components:
1. log_prints=True
- This will automatically log any print statements in your flow
2. on_failure=[handle_failure]
- This defines a list of handlers to call when the flow fails
3. The type ignore comment is fine if you need it for your type checker
Just make sure your handle_failure
function has the correct signature. It should look something like this:
python
def handle_failure(flow, flow_run, state):
# Your failure handling logic here
print(f"Flow {flow.name} failed with message: {state.message}")
This approach is particularly useful when:
1. You want to reuse the same failure handler across multiple flows
2. You want to attach multiple failure handlers (since you can pass a list)
3. You prefer to define your handlers separately from the flow definition
Here's a complete example showing how your setup would work:
python
from prefect import flow
from typing import Any
def handle_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed!")
print(f"Error message: {state.message}")
# Add your failure handling logic here
@flow(log_prints=True, on_failure=[handle_failure]) # type: ignore[misc]
def build(
build_client_name: str,
build_heartbeat_monitor_url: str,
build_scripts: list[dict[str, Any]],
) -> None:
print(f"Starting build for client: {build_client_name}")
# Your build logic here
if something_goes_wrong:
raise ValueError("Build failed!")
Both this approach and the decorator approach (@flow.on_failure
) achieve the same result - it's mainly a matter of coding style and organization preference. Your setup is perfectly valid and follows Prefect's best practices!Lester Cayabyab
07/18/2025, 6:17 PMMarvin
07/18/2025, 6:18 PMMarvin
07/18/2025, 6:18 PMon_failure
handler:
1. Option 1: Set return_state=True
and check the task state (Recommended):
python
from prefect import flow, task
def handle_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed with message: {state.message}")
@task
def risky_task():
raise RuntimeError("Task failed!")
@flow(on_failure=[handle_failure])
def my_flow():
# Use return_state=True to get the task's state
state = risky_task.with_options(return_state=True)()
# Check if the task failed and raise an exception if it did
if state.is_failed():
raise RuntimeError(f"Task failed: {state.message}")
2. Option 2: Use raise_on_failure=True
(Also recommended):
python
from prefect import flow, task
def handle_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed with message: {state.message}")
@task
def risky_task():
raise RuntimeError("Task failed!")
@flow(on_failure=[handle_failure])
def my_flow():
# This will automatically propagate the failure to the flow
result = risky_task.with_options(raise_on_failure=True)()
3. Option 3: Use try/except to explicitly handle and re-raise:
python
from prefect import flow, task
def handle_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed with message: {state.message}")
@task
def risky_task():
raise RuntimeError("Task failed!")
@flow(on_failure=[handle_failure])
def my_flow():
try:
result = risky_task()
except Exception as e:
# You can do additional handling here if needed
raise RuntimeError(f"Flow failed due to task error: {str(e)}")
4. Option 4: Set raise_on_failure globally for the task:
python
from prefect import flow, task
def handle_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed with message: {state.message}")
@task(raise_on_failure=True) # Set at task definition
def risky_task():
raise RuntimeError("Task failed!")
@flow(on_failure=[handle_failure])
def my_flow():
result = risky_task()
The reason your current setup isn't working is that by default, Prefect tasks capture their exceptions and convert them to failed states without propagating the failure to the flow. This is by design to allow flows to continue running even if some tasks fail.
Key points to remember:
- Task failures don't automatically cause flow failures
- You need to explicitly handle task failures using one of the methods above
- The flow's on_failure
handler only triggers when the flow itself fails
- Using raise_on_failure=True
or checking task states are the most common solutions
Choose the approach that best fits your needs:
- Use raise_on_failure=True
if you want all task failures to automatically fail the flow
- Use return_state=True
if you want more control over how you handle task failures
- Use try/except if you need custom failure handling logicLester Cayabyab
07/18/2025, 6:32 PMMarvin
07/18/2025, 6:32 PMMarvin
07/18/2025, 6:33 PMpython
from prefect import task, flow
from typing import Any
def task_failure_handler(task, task_run, state):
# Access parameters from the task run
params = task_run.parameters
print(f"Task failed with parameters: {params}")
print(f"Error message: {state.message}")
@task(on_failure=[task_failure_handler])
def risky_task(param1: str, param2: int):
raise RuntimeError("Task failed!")
@flow
def my_flow():
risky_task("test", 42)
2. Using Flow-level failure handler to access task parameters:
python
from prefect import task, flow
from typing import Any
def flow_failure_handler(flow, flow_run, state):
# The state contains information about the failed task
if hasattr(state.state_details, "task_run"):
failed_task_run = state.state_details.task_run
print(f"Failed task parameters: {failed_task_run.parameters}")
print(f"Error message: {state.message}")
@task(raise_on_failure=True) # Important to propagate failure to flow
def risky_task(param1: str, param2: int):
raise RuntimeError("Task failed!")
@flow(on_failure=[flow_failure_handler])
def my_flow():
risky_task("test", 42)
3. Using runtime context within the task:
python
from prefect import task, flow
from prefect.runtime import task_run
@task
def risky_task(param1: str, param2: int):
try:
raise RuntimeError("Something went wrong!")
except Exception as e:
# Access parameters directly within the task
current_params = task_run.parameters
print(f"Failed with parameters: {current_params}")
raise # Re-raise the exception to mark the task as failed
@flow
def my_flow():
risky_task("test", 42)
4. More detailed failure handling with parameter logging:
python
from prefect import task, flow
from typing import Any
import logging
logger = logging.getLogger(__name__)
def detailed_failure_handler(task, task_run, state):
# Access all task run details
params = task_run.parameters
task_name = task.name
error_msg = state.message
logger.error(
f"Task '{task_name}' failed!\n"
f"Parameters used: {params}\n"
f"Error message: {error_msg}"
)
@task(on_failure=[detailed_failure_handler])
def risky_task(param1: str, param2: int, param3: dict[str, Any]):
raise RuntimeError("Task failed!")
@flow
def my_flow():
params = {
"param1": "test",
"param2": 42,
"param3": {"key": "value"}
}
risky_task(**params)
Key points to remember:
- Task parameters are available in both task-level and flow-level failure handlers
- The task_run.parameters
contains all parameters passed to the task
- You can access parameters within the task using prefect.runtime.task_run.parameters
- Make sure to use raise_on_failure=True
if you want task failures to propagate to the flow
Best practices:
1. Always log the parameters along with the error message for better debugging
2. Consider what sensitive information might be in your parameters before logging
3. Use structured logging when possible
4. Consider adding parameter validation before task execution
The failure handlers give you a lot of flexibility in how you handle and log task failures, and having access to the parameters can be crucial for debugging and error reporting.Lester Cayabyab
07/18/2025, 6:35 PMMarvin
07/18/2025, 6:35 PMMarvin
07/18/2025, 6:35 PMtask_run.parameters
attribute. Let me show you the correct way to access task parameters in failure handlers in Prefect 3.x:
python
from prefect import task, flow
from typing import Any
def task_failure_handler(task, task_run, state):
# Access parameters through the task run's context
# You can access the raw inputs that caused the failure
inputs = state.state_details.inputs if state.state_details else {}
print(f"Task failed with inputs: {inputs}")
print(f"Error message: {state.message}")
@task(on_failure=[task_failure_handler])
def risky_task(param1: str, param2: int):
raise RuntimeError("Task failed!")
@flow
def my_flow():
risky_task("test", 42)
For more detailed access to task run information during failure, you can also use:
python
from prefect import task, flow
def detailed_failure_handler(task, task_run, state):
# Access state details
if state.state_details:
inputs = state.state_details.inputs
print(f"Failed inputs: {inputs}")
# Access the exception that caused the failure
if state.state_details and state.state_details.exception:
print(f"Exception: {state.state_details.exception}")
# Access the error message
print(f"Error message: {state.message}")
@task(on_failure=[detailed_failure_handler])
def risky_task(param1: str, param2: int):
raise RuntimeError(f"Failed processing param1={param1}")
@flow
def my_flow():
risky_task("test", 42)
The key points about accessing parameters in Prefect 3.x:
1. Parameters are available through state.state_details.inputs
in the failure handler
2. The state details contain the actual input values that were passed to the task
3. You can access both the raw inputs and any exception information through the state object
This corrected information reflects the current implementation in Prefect 3.x. Thank you for bringing this to my attention, and I apologize for any confusion my earlier response may have caused.