Akash N
07/10/2025, 5:14 AMMarvin
07/10/2025, 5:14 AMMarvin
07/10/2025, 5:15 AMpython
from prefect import flow, task
from prefect.states import Failed
@task
def my_task():
# Your task logic here
raise Exception("Task failed!")
def fail_flow_on_task_failure(flow, flow_state, state):
if state.is_failed():
return Failed(message=f"Flow failed because task {state.name} failed")
return flow_state
@flow(on_completion=[fail_flow_on_task_failure])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
This solution uses a state change hook that will be called whenever a task completes (successfully or not). If any task fails, the hook will mark the entire flow as failed.
Alternatively, you can use exception handling within your flow:
python
from prefect import flow, task
from prefect.states import Failed
@task
def my_task():
# Your task logic here
pass
@flow
def my_flow():
try:
my_task()
except Exception as e:
raise Exception(f"Flow failed due to task failure: {str(e)}")
if __name__ == "__main__":
my_flow()
Both approaches will ensure your flow is marked as failed if any task fails. The first approach using state change hooks is generally preferred as it's more declarative and handles all tasks automatically without requiring explicit try/except blocks.
For more details on states and state change hooks, you can refer to the States documentation and State Change Hooks guide.