Adrien Besnard
09/08/2025, 3:38 PMMarvin
09/08/2025, 3:38 PMMarvin
09/08/2025, 3:46 PMallow_failure
.
Here are the supported patterns:
1) Attach data to the exception and consume it downstream with allow_failure
- Raise a custom exception that includes your payload.
- In a downstream task, annotate the input with allow_failure
so it receives the upstream State even if it failed, then extract the exception or details from the State.
from prefect import flow, task, allow_failure
from prefect.states import get_state_exception
class EnrichedError(RuntimeError):
def __init__(self, message, payload: dict):
super().__init__(message)
self.payload = payload
@task
def produce():
# compute partial info
partial = {"rows_processed": 123, "file": "<s3://bucket/key>"}
# fail with payload
raise EnrichedError("Validation failed", payload=partial)
@task
def handle(upstream_state = allow_failure):
if upstream_state.type.name == "FAILED":
exc = get_state_exception(upstream_state) # -> EnrichedError
return {"handled": True, "payload": getattr(exc, "payload", None)}
# If it didn’t fail, you can fetch the normal result:
# from prefect.states import get_state_result
# return get_state_result(upstream_state)
return None
@flow
def main():
s = produce()
return handle(s)
if __name__ == "__main__":
main()
Useful helpers:
- prefect.states.get_state_exception(state)
returns the exception on a failed state.
- prefect.states.get_state_result(state, raise_on_failure=False)
returns the state’s result without raising (on failure, you’ll typically get the exception object).
Docs:
- prefect.states API
- allow_failure annotation
2) Publish data for observability on failure (artifacts)
If you only need to “return” data for humans (UI/notifications) and not for downstream tasks, create an artifact in an except
block or via a task on_failure
hook:
from prefect import task
from prefect.artifacts import create_table_artifact
from prefect.states import get_state_exception
def report_failure(state, task_run, flow_run=None):
exc = get_state_exception(state)
create_table_artifact(
key="validation_summary",
table=[{"metric": "rows_processed", "value": 123}],
description=f"Task failed: {exc}"
)
@task(on_failure=[report_failure])
def produce():
# ... compute
raise ValueError("Boom")
Docs: Artifacts API
3) About “returning a Failed state” directly
Returning prefect.states.Failed(...)
from inside your task/flow won’t set the run to failed; it will just be treated as your function’s return value. To actually fail the run, raise an exception. If you need to craft a failed State object programmatically, there is prefect.states.exception_to_failed_state(...)
, but typical user code should just raise.
Clarifying question: Do you want this data for downstream automation (another task needs to read it), or mainly for observability in the UI/notifications? I can tailor the example either way.Nate
09/08/2025, 3:59 PMdata
with any State
subclassMarvin
09/08/2025, 4:00 PMMarvin
09/08/2025, 4:00 PMdata
field — but in Prefect 3.x you don’t set the run’s state by returning a State
object from your task/flow. The engine decides the run’s state: it creates a Completed state from a return value, and a Failed state from an exception. If you “return a Failed(...)” state, Prefect will just treat that as your function’s return value and still mark the run Completed.
Simplest supported way to “fail with data”
- Put your arbitrary data on the exception you raise.
- If a downstream task needs it, pass the upstream state with allow_failure
and read the exception (and thus your data) from that state.
Minimal example:
from prefect import flow, task, allow_failure
from prefect.states import get_state_exception
class FailWithData(Exception):
def __init__(self, payload):
super().__init__("failing with payload")
self.payload = payload
@task
def producer():
# Attach any arbitrary data you want to carry with the failure
raise FailWithData({"rows": 123, "file": "<s3://bucket/key>"})
@task
def consumer(up = allow_failure):
if up.type.name == "FAILED":
exc = get_state_exception(up)
return getattr(exc, "payload", None) # -> {"rows": 123, "file": ...}
return None
@flow
def main():
return consumer(producer())
if __name__ == "__main__":
print(main())
Why this is the supported path in 3.x
- The engine sets states from your function outcome: return -> Completed, raise -> Failed.
- A Failed state’s data
will hold the exception (or a pointer to it if persisted). By putting your payload on the exception, you effectively attach arbitrary data to the Failed state.
- Downstream tasks can receive the Failed state via allow_failure
and read the data with get_state_exception
.
If you only need the “data” for observability (not for downstream code), it can be even simpler to publish an artifact in an except
block or via a task on_failure
hook.
Docs
- States API: prefect.states
- Artifacts API: prefect.artifacts
If you share your exact use case (downstream logic vs. just UI/notifications), I can tailor the shortest possible pattern.Adrien Besnard
09/08/2025, 4:00 PMNate
09/08/2025, 4:21 PM"""
Example: Returning data with a Failed state.
Both approaches work:
return State(type=StateType.FAILED, data=model_names)
return Failed(data=model_names)
"""
from prefect import flow, task
from prefect.client.schemas.objects import StateType
from prefect.states import Failed, State
@task
def task_with_failed_state():
model_names = ["model_v1", "model_v2", "model_v3"]
return Failed(data=model_names, message="Failed but here's the data")
@task
def task_using_state_constructor():
model_names = ["model_a", "model_b", "model_c"]
return State(
type=StateType.FAILED,
data=model_names,
message="Processing failed but here's what we got",
)
@flow
def main():
"""Verify both approaches work."""
# Using Failed() helper
state1 = task_with_failed_state(return_state=True)
assert state1.type == StateType.FAILED
assert state1.data.result == ["model_v1", "model_v2", "model_v3"]
assert state1.message == "Failed but here's the data"
# Using State constructor
state2 = task_using_state_constructor(return_state=True)
assert state2.type == StateType.FAILED
assert state2.data.result == ["model_a", "model_b", "model_c"]
assert state2.message == "Processing failed but here's what we got"
if __name__ == "__main__":
main()