Cullen Baker
08/25/2025, 6:40 PMMarvin
08/25/2025, 6:40 PMMarvin
08/25/2025, 6:44 PMstate
inside your on_failure
hook.
Key points:
- Register hooks via the on_failure=[...]
parameter on @flow
and @task
.
- Prefect calls your hook with the run objects and the final State
.
- Use prefect.states.get_state_exception(state)
(and optionally format_exception
) to access the actual exception and traceback.
- Hooks fire when the run enters a Failed state (after retries are exhausted). If you catch exceptions in your code, re-raise them if you want the hook to run.
Example: flow failure hook
from prefect import flow
from prefect.states import get_state_exception, format_exception
async def flow_failed(flow, flow_run, state):
exc = get_state_exception(state) # the actual exception
tb = format_exception(exc, exc.__traceback__) # full traceback string
# Send to Slack / log / alert
print(f"[FLOW FAILED] {flow_run.name} ({flow_run.id})\n{tb}")
@flow(on_failure=[flow_failed])
def my_flow():
raise RuntimeError("Something went wrong in the flow!")
if __name__ == "__main__":
my_flow()
Example: task failure hook
from prefect import task, flow
from prefect.states import get_state_exception, format_exception
def task_failed(task, task_run, state):
exc = get_state_exception(state)
msg = format_exception(exc, exc.__traceback__)
print(f"[TASK FAILED] {task_run.name} ({task_run.id})\n{msg}")
@task(on_failure=[task_failed])
def boom():
raise ValueError("Boom from task")
@flow
def parent():
boom()
if __name__ == "__main__":
parent()
Notes:
- You cannot pass custom arguments into hooks; Prefect defines the hook signature. If you need config (e.g., Slack webhook), use environment variables, closures, or partials to bind config when you define the hook.
- If you also want to handle infrastructure crashes, add on_crashed=[...]
. Crashes are distinct from failures.
- You can also read useful context from flow_run
/ task_run
(ids, names, parameters, tags) to enrich alerts.
Docs:
- State change hooks in Prefect 3.x
If you share how you’re routing the error (Slack, PagerDuty, etc.), I can tailor a hook snippet for that.Marvin
08/25/2025, 6:45 PMMarvin
08/25/2025, 6:47 PMstate
and format it.
Best practice (Prefect 3.x):
- Flow hook signature: (flow, flow_run, state)
- Task hook signature: (task, task_run, state)
- Use prefect.states.get_state_exception(state)
to get the exception, and prefect.states.format_exception(...)
if you want a full traceback string.
- Hooks fire only after retries are exhausted (i.e., when the run truly enters Failed).
Flow example
from prefect import flow
from prefect.states import get_state_exception, format_exception
def on_flow_failure(flow, flow_run, state):
exc = get_state_exception(state)
tb_str = format_exception(exc, exc.__traceback__)
print(f"[FLOW FAILED] {flow_run.name} ({flow_run.id})\n{tb_str}")
@flow(on_failure=[on_flow_failure])
def my_flow():
raise RuntimeError("Something went wrong in the flow")
Task example
from prefect import flow, task
from prefect.states import get_state_exception, format_exception
def on_task_failure(task, task_run, state):
exc = get_state_exception(state)
tb_str = format_exception(exc, exc.__traceback__)
print(f"[TASK FAILED] {task_run.name} ({task_run.id})\n{tb_str}")
@task(on_failure=[on_task_failure])
def boom():
raise ValueError("Boom from task")
@flow
def parent():
boom()
Tips
- Don’t swallow exceptions inside your task/flow if you want on_failure
to run; re-raise after any custom handling.
- For infra crashes, add on_crashed=[...]
as well.
- Need to route errors to Slack/PagerDuty/etc.? Put credentials in env vars and use them inside the hook.
Docs: State change hooks in Prefect 3.x
If you share where you want to send the messages (Slack, email, SIEM), I can tailor the hook snippet.Cullen Baker
08/25/2025, 7:19 PMMarvin
08/25/2025, 7:19 PMMarvin
08/25/2025, 7:20 PMon_failure
hooks is to raise a clear, purpose-built exception from your task/flow when validation fails. Your hook then reads that exception from the state
object.
Two common patterns:
1) Plain exception with a clear message
- Raise with the exact message you want surfaced.
- In the hook, call get_state_exception(state)
and format_exception(...)
.
Example
from prefect import task, flow
from prefect.states import get_state_exception, format_exception
def on_task_failure(task, task_run, state):
exc = get_state_exception(state)
print(f"[TASK FAILED] {task_run.name}: {str(exc)}")
@task(on_failure=[on_task_failure])
def validate(data: dict):
if "customer_id" not in data:
raise ValueError("Validation failed: 'customer_id' is required.")
@flow
def pipeline():
validate({})
2) Custom exception to carry structured validation details
- Create a custom exception with attributes (e.g., errors
dict or list).
- Raise this when validation fails.
- In the hook, detect your exception type and access the structured data to format a rich message.
Example
import json
from typing import Any, Dict, List
from prefect import task, flow
from prefect.states import get_state_exception
class DataValidationError(Exception):
def __init__(self, message: str, errors: List[Dict[str, Any]]):
super().__init__(message)
self.errors = errors
def on_task_failure(task, task_run, state):
exc = get_state_exception(state)
if isinstance(exc, DataValidationError):
details = json.dumps(exc.errors, indent=2)
print(f"[TASK FAILED] {task_run.name}\n{exc}\nDetails:\n{details}")
else:
print(f"[TASK FAILED] {task_run.name}\n{exc}")
@task(on_failure=[on_task_failure])
def validate_order(order: dict):
errors = []
if "id" not in order:
errors.append({"field": "id", "error": "missing"})
if errors:
raise DataValidationError("Order failed validation", errors)
@flow
def pipeline():
validate_order({})
Notes and tips:
- Hooks receive (task, task_run, state)
for tasks and (flow, flow_run, state)
for flows. They cannot accept custom args; use closures/partials/env vars for config.
- Only raise exceptions you want to trigger the on_failure
hook (i.e., don’t swallow them). Hooks run after retries are exhausted.
- If you rely on Prefect’s built-in parameter validation (validate_parameters=True
on flows), the hook will receive Prefect’s errors (e.g., ParameterTypeError
). If you need customized messages, add your own validation and raise your custom exception before proceeding.
- For full tracebacks, use:
from prefect.states import get_state_exception, format_exception
tb = format_exception(get_state_exception(state), get_state_exception(state).__traceback__)
Docs:
- State change hooks in Prefect 3.x
- Prefect exceptions API
If you share whether you’re validating flow parameters vs. task payloads—and where you want to send these messages (Slack, email, etc.)—I can tailor a drop-in hook with the right formatting.Cullen Baker
08/26/2025, 7:26 PMCullen Baker
08/26/2025, 7:27 PMMarvin
08/26/2025, 7:27 PMMarvin
08/26/2025, 7:28 PMon_failure
will not run. Add on_crashed=[...]
as well, or use a workspace automation.
- Retries are delaying your alert: on_failure
fires only after the final retry fails. If you expect an alert on the first attempt, you’ll need custom logic or zero retries.
- The failure state isn’t what you think: Cancellations (Abort
, Cancel
) or manual stops won’t trigger on_failure
. Use on_cancellation
for those cases.
Hook implementation pitfalls
- Hook never runs because the exception was swallowed: If you catch and don’t re-raise errors in your task/flow, Prefect will mark the run as Completed and the hook won’t fire.
- Wrong hook placement: You added a task hook, but you’re only looking at a flow failure (or vice versa). Make sure the hook is on the run type where you want the alert.
- Async “fire-and-forget”: If your hook creates an async task or thread to send Slack without awaiting/joining, the process may exit before the request completes.
Environment/runtime issues
- Missing credentials/config at runtime: The Slack webhook URL or token is available locally but not in the worker/pod environment. Verify env vars/secrets are mounted where the flow runs.
- Network egress blocked: Kubernetes/EC2/security groups/proxies can block outbound HTTPS to Slack.
- Certificate/proxy issues: Corporate proxies or custom certs can cause TLS failures that you don’t surface.
Slack-side problems
- Payload too large or malformed: Long tracebacks can exceed Slack’s limits or block formats. Slack may reject with 400/invalid_payload. Trim or chunk large messages.
- Rate limits: Bursts can hit Slack rate limits (429). Add small backoff and retry on 429/5xx.
- Scope/permission mismatches (for SDK/bot-token approaches): Ensure the bot has chat:write and is in the channel.
Quick checklist
- Confirm the hook fires at all: add a log/print in the hook.
- Add on_crashed=[...]
alongside on_failure
.
- Ensure you re-raise exceptions you want to alert on.
- Verify env vars and outbound network in the actual worker environment.
- Log Slack HTTP response status/body in the hook; retry on transient errors.
- Trim long tracebacks before sending.
A robust hook snippet with retries and truncation
```
import os, textwrap, time, json, requests
from prefect.states import get_state_exception, format_exception
WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL")
def safe_post_to_slack(text: str, retries: int = 3, timeout: float = 5.0):
if not WEBHOOK:
print("SLACK_WEBHOOK_URL not set; skipping Slack alert")
return
# Trim to keep payload small
max_len = 3000
if len(text) > max_len:
text = text[:max_len] + "\n…(truncated)"
payload = {"text": text}
backoff = 1.0
for i in range(retries):
try:
r = requests.post(WEBHOOK, data=json.dumps(payload), headers={"Content-Type": "application/json"}, timeout=timeout)
if r.status_code == 200:
return
# Retry on common transient statuses
if r.status_code in (429, 500, 502, 503, 504):
time.sleep(backoff); backoff *= 2
continue
print(f"Slack rejected payload: {r.status_code} {r.text}")
return
except Exception as e:
print(f"Slack post error: {e}")
time.sleep(backoff); backoff *= 2
def on_flow_failure(flow, flow_run, state):
exc = get_state_exception(state)
tb = format_exception(exc, exc.traceback)
safe_post_to_slack(f"[FLOW FAILED] {flow_run.name} ({flow_run.id})\n{tb}")
def on_flow_crashed(flow, flow_run, state):
# No python exception if the process died; provide context
safe_post_to_slack(f"[FLOW CRASHED] {flow_run.name} ({flow_run.id})")Marvin
08/26/2025, 7:28 PMCullen Baker
08/26/2025, 7:38 PMCullen Baker
08/26/2025, 7:38 PMMarvin
08/26/2025, 7:38 PMMarvin
08/26/2025, 7:39 PM(flow, flow_run, state)
- Use flow.name
(the flow’s code-defined name) or flow_run.name
(the specific run name, e.g., with templating)
Example
def on_flow_failure(flow, flow_run, state):
flow_name = flow.name # e.g., "my_flow"
run_name = flow_run.name # e.g., "my_flow-2025-08-26-..."
print(f"[FLOW FAILED] {flow_name} | run={run_name}")
Task failure hook
- Hook signature: (task, task_run, state)
(no flow
arg)
- You can still include the flow info:
- Quick: include task_run.flow_run_id
- Full flow name: look it up via the client
Async example to fetch the flow name in a task hook
from prefect.client.orchestration import get_client
async def on_task_failure(task, task_run, state):
async with get_client() as client:
fr = await client.read_flow_run(task_run.flow_run_id)
flow = await client.read_flow(fr.flow_id)
flow_name = flow.name
print(f"[TASK FAILED] flow={flow_name} task={task.name} run={fr.name}")
Alternative anywhere (including inside tasks/flows, not just hooks)
- Use the run context:
from prefect.context import get_run_context
ctx = get_run_context()
flow_name = ctx.flow.name
flow_run_name = ctx.flow_run.name
Docs: State change hooks in Prefect 3.x
If you share your current hook snippet, I can drop the exact line(s) to add.