<@ULVA73B9P> what is the best way to pass task / f...
# ask-marvin
c
@Marvin what is the best way to pass task / flow error messages to the on_failure function?
m
thought for 230.0 seconds
Great question. In Prefect 3.x, the best practice is to use state-change hooks and extract the exception from the provided
state
inside your
on_failure
hook. Key points: - Register hooks via the
on_failure=[...]
parameter on
@flow
and
@task
. - Prefect calls your hook with the run objects and the final
State
. - Use
prefect.states.get_state_exception(state)
(and optionally
format_exception
) to access the actual exception and traceback. - Hooks fire when the run enters a Failed state (after retries are exhausted). If you catch exceptions in your code, re-raise them if you want the hook to run. Example: flow failure hook
Copy code
from prefect import flow
from prefect.states import get_state_exception, format_exception

async def flow_failed(flow, flow_run, state):
    exc = get_state_exception(state)          # the actual exception
    tb = format_exception(exc, exc.__traceback__)  # full traceback string
    # Send to Slack / log / alert
    print(f"[FLOW FAILED] {flow_run.name} ({flow_run.id})\n{tb}")

@flow(on_failure=[flow_failed])
def my_flow():
    raise RuntimeError("Something went wrong in the flow!")

if __name__ == "__main__":
    my_flow()
Example: task failure hook
Copy code
from prefect import task, flow
from prefect.states import get_state_exception, format_exception

def task_failed(task, task_run, state):
    exc = get_state_exception(state)
    msg = format_exception(exc, exc.__traceback__)
    print(f"[TASK FAILED] {task_run.name} ({task_run.id})\n{msg}")

@task(on_failure=[task_failed])
def boom():
    raise ValueError("Boom from task")

@flow
def parent():
    boom()

if __name__ == "__main__":
    parent()
Notes: - You cannot pass custom arguments into hooks; Prefect defines the hook signature. If you need config (e.g., Slack webhook), use environment variables, closures, or partials to bind config when you define the hook. - If you also want to handle infrastructure crashes, add
on_crashed=[...]
. Crashes are distinct from failures. - You can also read useful context from
flow_run
/
task_run
(ids, names, parameters, tags) to enrich alerts. Docs: - State change hooks in Prefect 3.x If you share how you’re routing the error (Slack, PagerDuty, etc.), I can tailor a hook snippet for that.
thought for 65.7 seconds
Short answer: you don’t pass the error into `on_failure`; Prefect passes the run’s final State to your hook. Inside the hook, extract the exception from the
state
and format it. Best practice (Prefect 3.x): - Flow hook signature:
(flow, flow_run, state)
- Task hook signature:
(task, task_run, state)
- Use
prefect.states.get_state_exception(state)
to get the exception, and
prefect.states.format_exception(...)
if you want a full traceback string. - Hooks fire only after retries are exhausted (i.e., when the run truly enters Failed). Flow example
Copy code
from prefect import flow
from prefect.states import get_state_exception, format_exception

def on_flow_failure(flow, flow_run, state):
    exc = get_state_exception(state)
    tb_str = format_exception(exc, exc.__traceback__)
    print(f"[FLOW FAILED] {flow_run.name} ({flow_run.id})\n{tb_str}")

@flow(on_failure=[on_flow_failure])
def my_flow():
    raise RuntimeError("Something went wrong in the flow")
Task example
Copy code
from prefect import flow, task
from prefect.states import get_state_exception, format_exception

def on_task_failure(task, task_run, state):
    exc = get_state_exception(state)
    tb_str = format_exception(exc, exc.__traceback__)
    print(f"[TASK FAILED] {task_run.name} ({task_run.id})\n{tb_str}")

@task(on_failure=[on_task_failure])
def boom():
    raise ValueError("Boom from task")

@flow
def parent():
    boom()
Tips - Don’t swallow exceptions inside your task/flow if you want
on_failure
to run; re-raise after any custom handling. - For infra crashes, add
on_crashed=[...]
as well. - Need to route errors to Slack/PagerDuty/etc.? Put credentials in env vars and use them inside the hook. Docs: State change hooks in Prefect 3.x If you share where you want to send the messages (Slack, email, SIEM), I can tailor the hook snippet.
c
@Marvin what is the best way to pass specific validation messages to the above?
m
thought for 59.2 seconds
Great follow-up. The simplest, most reliable way to get specific validation messages into your
on_failure
hooks is to raise a clear, purpose-built exception from your task/flow when validation fails. Your hook then reads that exception from the
state
object. Two common patterns: 1) Plain exception with a clear message - Raise with the exact message you want surfaced. - In the hook, call
get_state_exception(state)
and
format_exception(...)
. Example
Copy code
from prefect import task, flow
from prefect.states import get_state_exception, format_exception

def on_task_failure(task, task_run, state):
    exc = get_state_exception(state)
    print(f"[TASK FAILED] {task_run.name}: {str(exc)}")

@task(on_failure=[on_task_failure])
def validate(data: dict):
    if "customer_id" not in data:
        raise ValueError("Validation failed: 'customer_id' is required.")

@flow
def pipeline():
    validate({})
2) Custom exception to carry structured validation details - Create a custom exception with attributes (e.g.,
errors
dict or list). - Raise this when validation fails. - In the hook, detect your exception type and access the structured data to format a rich message. Example
Copy code
import json
from typing import Any, Dict, List
from prefect import task, flow
from prefect.states import get_state_exception

class DataValidationError(Exception):
    def __init__(self, message: str, errors: List[Dict[str, Any]]):
        super().__init__(message)
        self.errors = errors

def on_task_failure(task, task_run, state):
    exc = get_state_exception(state)
    if isinstance(exc, DataValidationError):
        details = json.dumps(exc.errors, indent=2)
        print(f"[TASK FAILED] {task_run.name}\n{exc}\nDetails:\n{details}")
    else:
        print(f"[TASK FAILED] {task_run.name}\n{exc}")

@task(on_failure=[on_task_failure])
def validate_order(order: dict):
    errors = []
    if "id" not in order:
        errors.append({"field": "id", "error": "missing"})
    if errors:
        raise DataValidationError("Order failed validation", errors)

@flow
def pipeline():
    validate_order({})
Notes and tips: - Hooks receive
(task, task_run, state)
for tasks and
(flow, flow_run, state)
for flows. They cannot accept custom args; use closures/partials/env vars for config. - Only raise exceptions you want to trigger the
on_failure
hook (i.e., don’t swallow them). Hooks run after retries are exhausted. - If you rely on Prefect’s built-in parameter validation (
validate_parameters=True
on flows), the hook will receive Prefect’s errors (e.g.,
ParameterTypeError
). If you need customized messages, add your own validation and raise your custom exception before proceeding. - For full tracebacks, use:
Copy code
from prefect.states import get_state_exception, format_exception
  tb = format_exception(get_state_exception(state), get_state_exception(state).__traceback__)
Docs: - State change hooks in Prefect 3.x - Prefect exceptions API If you share whether you’re validating flow parameters vs. task payloads—and where you want to send these messages (Slack, email, etc.)—I can tailor a drop-in hook with the right formatting.
c
What could be a potential cause of (non-prefect) slack alerts not being sent when the failure occurs
@Marvin What could be a potential cause of (non-prefect) slack alerts not being sent when the failure occurs?
m
thought for 76.5 seconds
A few common culprits when custom (non-Prefect) Slack alerts don’t fire on failures: Prefect lifecycle and hook coverage - The run crashed, not failed: If the process is OOM-killed, segfaults, or the container dies, the run enters Crashed, not Failed.
on_failure
will not run. Add
on_crashed=[...]
as well, or use a workspace automation. - Retries are delaying your alert:
on_failure
fires only after the final retry fails. If you expect an alert on the first attempt, you’ll need custom logic or zero retries. - The failure state isn’t what you think: Cancellations (
Abort
,
Cancel
) or manual stops won’t trigger
on_failure
. Use
on_cancellation
for those cases. Hook implementation pitfalls - Hook never runs because the exception was swallowed: If you catch and don’t re-raise errors in your task/flow, Prefect will mark the run as Completed and the hook won’t fire. - Wrong hook placement: You added a task hook, but you’re only looking at a flow failure (or vice versa). Make sure the hook is on the run type where you want the alert. - Async “fire-and-forget”: If your hook creates an async task or thread to send Slack without awaiting/joining, the process may exit before the request completes. Environment/runtime issues - Missing credentials/config at runtime: The Slack webhook URL or token is available locally but not in the worker/pod environment. Verify env vars/secrets are mounted where the flow runs. - Network egress blocked: Kubernetes/EC2/security groups/proxies can block outbound HTTPS to Slack. - Certificate/proxy issues: Corporate proxies or custom certs can cause TLS failures that you don’t surface. Slack-side problems - Payload too large or malformed: Long tracebacks can exceed Slack’s limits or block formats. Slack may reject with 400/invalid_payload. Trim or chunk large messages. - Rate limits: Bursts can hit Slack rate limits (429). Add small backoff and retry on 429/5xx. - Scope/permission mismatches (for SDK/bot-token approaches): Ensure the bot has chat:write and is in the channel. Quick checklist - Confirm the hook fires at all: add a log/print in the hook. - Add
on_crashed=[...]
alongside
on_failure
. - Ensure you re-raise exceptions you want to alert on. - Verify env vars and outbound network in the actual worker environment. - Log Slack HTTP response status/body in the hook; retry on transient errors. - Trim long tracebacks before sending. A robust hook snippet with retries and truncation ``` import os, textwrap, time, json, requests from prefect.states import get_state_exception, format_exception WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL") def safe_post_to_slack(text: str, retries: int = 3, timeout: float = 5.0): if not WEBHOOK: print("SLACK_WEBHOOK_URL not set; skipping Slack alert") return # Trim to keep payload small max_len = 3000 if len(text) > max_len: text = text[:max_len] + "\n…(truncated)" payload = {"text": text} backoff = 1.0 for i in range(retries): try: r = requests.post(WEBHOOK, data=json.dumps(payload), headers={"Content-Type": "application/json"}, timeout=timeout) if r.status_code == 200: return # Retry on common transient statuses if r.status_code in (429, 500, 502, 503, 504): time.sleep(backoff); backoff *= 2 continue print(f"Slack rejected payload: {r.status_code} {r.text}") return except Exception as e: print(f"Slack post error: {e}") time.sleep(backoff); backoff *= 2 def on_flow_failure(flow, flow_run, state): exc = get_state_exception(state) tb = format_exception(exc, exc.traceback) safe_post_to_slack(f"[FLOW FAILED] {flow_run.name} ({flow_run.id})\n{tb}") def on_flow_crashed(flow, flow_run, state): # No python exception if the process died; provide context safe_post_to_slack(f"[FLOW CRASHED] {flow_run.name} ({flow_run.id})")
# Usage: # @flow(on_failure=[on_flow_failure], on_crashed=[on_flow_crashed]) # def my_flow(): ... ``` Docs for hook behavior: - State change hooks in Prefect 3.x If you share how/where your flow runs (K8s, ECS, Docker, local) and how you’re sending Slack (webhook vs bot), I can help pinpoint which of these is most likely and tailor the hook accordingly.
c
How do I pass the name of the flow through to this error message
@Marvin How do I pass the name of the flow through to this error message?
m
thought for 60.5 seconds
In Prefect 3.x you don’t need to “pass” it—your flow failure hook already receives it. Flow failure hook - Hook signature:
(flow, flow_run, state)
- Use
flow.name
(the flow’s code-defined name) or
flow_run.name
(the specific run name, e.g., with templating) Example
Copy code
def on_flow_failure(flow, flow_run, state):
    flow_name = flow.name          # e.g., "my_flow"
    run_name = flow_run.name       # e.g., "my_flow-2025-08-26-..."
    print(f"[FLOW FAILED] {flow_name} | run={run_name}")
Task failure hook - Hook signature:
(task, task_run, state)
(no
flow
arg) - You can still include the flow info: - Quick: include
task_run.flow_run_id
- Full flow name: look it up via the client Async example to fetch the flow name in a task hook
Copy code
from prefect.client.orchestration import get_client

async def on_task_failure(task, task_run, state):
    async with get_client() as client:
        fr = await client.read_flow_run(task_run.flow_run_id)
        flow = await client.read_flow(fr.flow_id)
        flow_name = flow.name
    print(f"[TASK FAILED] flow={flow_name} task={task.name} run={fr.name}")
Alternative anywhere (including inside tasks/flows, not just hooks) - Use the run context:
Copy code
from prefect.context import get_run_context

ctx = get_run_context()
flow_name = ctx.flow.name
flow_run_name = ctx.flow_run.name
Docs: State change hooks in Prefect 3.x If you share your current hook snippet, I can drop the exact line(s) to add.