I have an on_failure callback setup for all my flo...
# ask-community
c
I have an on_failure callback setup for all my flows; is there anyway to get at the
Exception
that caused the failure from this callback?
k
Not sure on the Flow level but on the task level you can do this:
Copy code
from prefect import Flow, task

def my_state_handler(task, old_state, new_state):
    if new_state.is_failed():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(type(new_state.result))
    return new_state

@task(state_handlers=[my_state_handler])
def test(x):
    "test" + 1
    return x

with Flow("aaa") as flow:
    a = test(1)

flow.run()
I suspect it will work on the Flow level too
c
excellent (as usual), looks like I can get at the original exception with
Copy code
original_exception = list(state.result.values())[0].result
k
Oh I see. That’s nice
c
Is there anything you can tell me about the
state.result
object? is it always guaranteed to be of size 1? The docs just show
Copy code
result (Any, optional): Defaults to `None`. A data payload for the state.
k
For an error I think it will be size one. If your task has multiple outputs, I think it will be a Tuple.
So if you use the
if state.is_failed()
, I’m positive this will work
🙌 1
c
@Kevin Kho wanted to follow up on this. It looks like
state.result
is empty when this is running in ECS, however when it's run locally it contains the Exceptions I'm after. Do you have any other thoughts regarding getting at the original caught exception that caused the failure?
k
Oh will take me some time to test. We’ll respond later
1
I ran on Local and got the error. I can’t think of a reason why ECS would be different. Are you using a DaskExecutor by chance?
this is on ECS
c
I'm using a
map
and normally set a DaskExecutor, however when I commented yesterday on the behavior that I"m seeing, I left the executor as the default (but still used a
map
)
k
Gotcha ok I’ll try that
I still see it though with the
map
(notice log is labelled as
test[1]
Code for reference
Copy code
from prefect import Flow, task
import prefect
from prefect.run_configs import ECSRun
from prefect.storage import S3

def my_state_handler(task, old_state, new_state):
    if new_state.is_failed():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(type(new_state.result))
    return new_state

@task(state_handlers=[my_state_handler])
def test(x):
    if x == 2:
        "test" + 1
    return x

STORAGE = S3(bucket="coiled-prefect", add_default_labels=False)

RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'test-cluster'},
                            task_role_arn= 'arn:aws:iam::1111111111:role/prefect-ecs',
                            execution_role_arn='arn:aws:iam::111111111111:role/prefect-ecs',
                            image= 'prefecthq/prefect:latest-python3.8')

with Flow("except", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    a = test.map([1,2,3,4])

flow.register("bristech")
c
lemme see if I can get you a small repeatable flow too