https://prefect.io logo
k

Kyle Flanagan

12/23/2020, 7:52 PM
New to prefect -- seems very cool. Would like to be able to store the exception when a task fails due to an unexpected exception. Unfortunately, the results are not getting stored when using LocalResult. Here's a MWE.
Copy code
import os
os.environ['PREFECT__FLOWS__CHECKPOINtING'] = 'true'

from prefect import task, Flow
from prefect.engine.results import LocalResult

@task(result=LocalResult(dir='/tmp/prefect-res'))
def my_task():
    print("in task")
    raise RuntimeError("oh no!")

with Flow('MyFlow') as flow:
    my_task()
    state = flow.run()
    print("Location is: >", state.result[flow.get_tasks()[0]]._result.location, "<")
Result is:
Copy code
Location is: > None <
If I
return
some value from
my_task()
the result gets stored, but exceptions do not.
c

Chris White

12/23/2020, 7:56 PM
Hi Kyle and welcome! Checkpointing only occurs when the task succeeds; unexpected exceptions can get tricky to checkpoint (e.g., all boto3 and GCP exceptions are not serializable and therefore can not be checkpointed), and in practice I haven’t seen a use case where the actual exception object was needed above its string representation in the logs. Could you elaborate on why you want the actual exception object saved?
k

Kyle Flanagan

12/23/2020, 8:02 PM
We have legacy task-type objects that raise exceptions upon failure. E.g. GET a URL and raise a ConnectionTimeout if it's inaccessible. If no exception, the task succeeded. So my idea would be to use prefect as the control for these legacy tasks instead of e.g. scripts and cron. Currently the tasks themselves handle serializing their results to a database, but ideally I would trash all of that and rely on prefect to handle storing the results.
Perhaps I could have a bare except and return the string representation of the Exception. But at that point, how do I ensure that Prefect understands that's a failure vs a success? I guess I could re-raise an exception.
c

Chris White

12/23/2020, 8:06 PM
All of that makes sense, but why do you need the exception stored? As long as you have a log / state history for the runs wouldn’t you still have the same amount of information at your disposal? I have some ideas for how you might achieve this but want to make sure I get the full idea first
k

Kyle Flanagan

12/23/2020, 8:07 PM
Sure, but then our status dashboard would pull from DB for successes and from logs for failures to show results.
I just started w/ prefect today, so there's a greater than 0 chance that I'm ignorant of a better way to handle this w/ prefect.
c

Chris White

12/23/2020, 8:08 PM
Gotcha, an external service is relying on the data, makes sense; I suggest adding a state handler to your tasks that require this functionality:
Copy code
def my_exception_state_handler(task, old_state, new_state):
    if new_state.is_failed():
        new_state._result = new_state._result.write(new_state.result) # looks weird, but will work
        return new_state

@task(state_handlers=[my_exception_state_handler])
def my_task():
    ...
hit enter too early, 1 sec haha state handlers are called on every task run state change, so you can capture the failure and store it in this layer
If you end up using result location templating as well, here is where all the formatting kwargs are created and passed, in case you want to replicate that: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L885-L891
k

Kyle Flanagan

12/23/2020, 8:14 PM
Tried incorporating your code. But got a not implemented error.
Copy code
Not implemented on the base Result class - if you are seeing this error you might be trying to use features that require choosing a Result subclass; see <https://docs.prefect.io/core/concepts/results.html>
c

Chris White

12/23/2020, 8:15 PM
I see I see; as long as you are always attaching results to your tasks you can do this instead (note I’m accessing the result from the task instead of the state now):
Copy code
def my_exception_state_handler(task, old_state, new_state):
    if new_state.is_failed():
        new_state._result = task.result.write(new_state.result) # looks weird, but will work
        return new_state
k

Kyle Flanagan

12/23/2020, 8:15 PM
Nice!
You're quick on the draw. That stores the results and still registers it as a failure.
c

Chris White

12/23/2020, 8:17 PM
haha thank you, glad I could help!
k

Kyle Flanagan

12/23/2020, 8:18 PM
Appreciate it! Merry Christmas and Happy Holidays to you and the team!
c

Chris White

12/23/2020, 8:19 PM
😄 thanks, Merry Christmas and Happy holidays to you as well!
5 Views