Kyle Flanagan
12/23/2020, 7:52 PMimport os
os.environ['PREFECT__FLOWS__CHECKPOINtING'] = 'true'
from prefect import task, Flow
from prefect.engine.results import LocalResult
@task(result=LocalResult(dir='/tmp/prefect-res'))
def my_task():
print("in task")
raise RuntimeError("oh no!")
with Flow('MyFlow') as flow:
my_task()
state = flow.run()
print("Location is: >", state.result[flow.get_tasks()[0]]._result.location, "<")
Result is:
Location is: > None <
If I return
some value from my_task()
the result gets stored, but exceptions do not.Chris White
Kyle Flanagan
12/23/2020, 8:02 PMChris White
Kyle Flanagan
12/23/2020, 8:07 PMChris White
def my_exception_state_handler(task, old_state, new_state):
if new_state.is_failed():
new_state._result = new_state._result.write(new_state.result) # looks weird, but will work
return new_state
@task(state_handlers=[my_exception_state_handler])
def my_task():
...
Kyle Flanagan
12/23/2020, 8:14 PMNot implemented on the base Result class - if you are seeing this error you might be trying to use features that require choosing a Result subclass; see <https://docs.prefect.io/core/concepts/results.html>
Chris White
def my_exception_state_handler(task, old_state, new_state):
if new_state.is_failed():
new_state._result = task.result.write(new_state.result) # looks weird, but will work
return new_state
Kyle Flanagan
12/23/2020, 8:15 PMChris White
Kyle Flanagan
12/23/2020, 8:18 PMChris White