<@ULVA73B9P> How to return some data when I return...
# ask-marvin
a
@Marvin How to return some data when I return Failed state?
m
thought for 472.7 seconds
Short answer: in Prefect 3.x you don’t “return a Failed state” from a task/flow. To fail, you raise an exception. If you want to attach usable data to that failure and use it downstream, put the data on the exception and pass the failed state downstream with
allow_failure
. Here are the supported patterns: 1) Attach data to the exception and consume it downstream with allow_failure - Raise a custom exception that includes your payload. - In a downstream task, annotate the input with
allow_failure
so it receives the upstream State even if it failed, then extract the exception or details from the State.
Copy code
from prefect import flow, task, allow_failure
from prefect.states import get_state_exception

class EnrichedError(RuntimeError):
    def __init__(self, message, payload: dict):
        super().__init__(message)
        self.payload = payload

@task
def produce():
    # compute partial info
    partial = {"rows_processed": 123, "file": "<s3://bucket/key>"}
    # fail with payload
    raise EnrichedError("Validation failed", payload=partial)

@task
def handle(upstream_state = allow_failure):
    if upstream_state.type.name == "FAILED":
        exc = get_state_exception(upstream_state)  # -> EnrichedError
        return {"handled": True, "payload": getattr(exc, "payload", None)}
    # If it didn’t fail, you can fetch the normal result:
    # from prefect.states import get_state_result
    # return get_state_result(upstream_state)
    return None

@flow
def main():
    s = produce()
    return handle(s)

if __name__ == "__main__":
    main()
Useful helpers: -
prefect.states.get_state_exception(state)
returns the exception on a failed state. -
prefect.states.get_state_result(state, raise_on_failure=False)
returns the state’s result without raising (on failure, you’ll typically get the exception object). Docs: - prefect.states API - allow_failure annotation 2) Publish data for observability on failure (artifacts) If you only need to “return” data for humans (UI/notifications) and not for downstream tasks, create an artifact in an
except
block or via a task
on_failure
hook:
Copy code
from prefect import task
from prefect.artifacts import create_table_artifact
from prefect.states import get_state_exception

def report_failure(state, task_run, flow_run=None):
    exc = get_state_exception(state)
    create_table_artifact(
        key="validation_summary",
        table=[{"metric": "rows_processed", "value": 123}],
        description=f"Task failed: {exc}"
    )

@task(on_failure=[report_failure])
def produce():
    # ... compute
    raise ValueError("Boom")
Docs: Artifacts API 3) About “returning a Failed state” directly Returning
prefect.states.Failed(...)
from inside your task/flow won’t set the run to failed; it will just be treated as your function’s return value. To actually fail the run, raise an exception. If you need to craft a failed State object programmatically, there is
prefect.states.exception_to_failed_state(...)
, but typical user code should just raise. Clarifying question: Do you want this data for downstream automation (another task needs to read it), or mainly for observability in the UI/notifications? I can tailor the example either way.
n
@Marvin seems overcomplicated, you should be able to return arbitrary
data
with any
State
subclass
m
thought for 43.9 seconds
You’re right that a Prefect State has a
data
field — but in Prefect 3.x you don’t set the run’s state by returning a
State
object from your task/flow. The engine decides the run’s state: it creates a Completed state from a return value, and a Failed state from an exception. If you “return a Failed(...)” state, Prefect will just treat that as your function’s return value and still mark the run Completed. Simplest supported way to “fail with data” - Put your arbitrary data on the exception you raise. - If a downstream task needs it, pass the upstream state with
allow_failure
and read the exception (and thus your data) from that state. Minimal example:
Copy code
from prefect import flow, task, allow_failure
from prefect.states import get_state_exception

class FailWithData(Exception):
    def __init__(self, payload):
        super().__init__("failing with payload")
        self.payload = payload

@task
def producer():
    # Attach any arbitrary data you want to carry with the failure
    raise FailWithData({"rows": 123, "file": "<s3://bucket/key>"})

@task
def consumer(up = allow_failure):
    if up.type.name == "FAILED":
        exc = get_state_exception(up)
        return getattr(exc, "payload", None)  # -> {"rows": 123, "file": ...}
    return None

@flow
def main():
    return consumer(producer())

if __name__ == "__main__":
    print(main())
Why this is the supported path in 3.x - The engine sets states from your function outcome: return -> Completed, raise -> Failed. - A Failed state’s
data
will hold the exception (or a pointer to it if persisted). By putting your payload on the exception, you effectively attach arbitrary data to the Failed state. - Downstream tasks can receive the Failed state via
allow_failure
and read the data with
get_state_exception
. If you only need the “data” for observability (not for downstream code), it can be even simpler to publish an artifact in an
except
block or via a task
on_failure
hook. Docs - States API: prefect.states - Artifacts API: prefect.artifacts If you share your exact use case (downstream logic vs. just UI/notifications), I can tailor the shortest possible pattern.
a
@Nate So I can do a return State(type=StateType.FAILED, data=model_names) (where model_names is a list of str, for exemple) ? Within a task, I mean?
n
yep
Copy code
"""
Example: Returning data with a Failed state.

Both approaches work:
  return State(type=StateType.FAILED, data=model_names)
  return Failed(data=model_names)
"""
from prefect import flow, task
from prefect.client.schemas.objects import StateType
from prefect.states import Failed, State


@task
def task_with_failed_state():
    model_names = ["model_v1", "model_v2", "model_v3"]
    return Failed(data=model_names, message="Failed but here's the data")


@task
def task_using_state_constructor():
    model_names = ["model_a", "model_b", "model_c"]
    return State(
        type=StateType.FAILED,
        data=model_names,
        message="Processing failed but here's what we got",
    )


@flow
def main():
    """Verify both approaches work."""
    
    # Using Failed() helper
    state1 = task_with_failed_state(return_state=True)
    assert state1.type == StateType.FAILED
    assert state1.data.result == ["model_v1", "model_v2", "model_v3"]
    assert state1.message == "Failed but here's the data"
    
    # Using State constructor
    state2 = task_using_state_constructor(return_state=True)
    assert state2.type == StateType.FAILED
    assert state2.data.result == ["model_a", "model_b", "model_c"]
    assert state2.message == "Processing failed but here's what we got"


if __name__ == "__main__":
    main()