Tom Shaffner
12/15/2021, 1:56 PMAnna Geller
Tom Shaffner
12/15/2021, 2:07 PMAnna Geller
def log_exception(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
logger = prefect.context.get("logger")
logger.error(exception_to_string(new_state.result))
return new_state
@task(state_handlers=[log_exception])
def divide_numbers(a, b):
return 1 / (b - a)
The logs are stored in the backend and you could retrieve those using a GraphQL query:
query {
flow_run{
logs {
message
level
info
created
}
}
}
Kevin Kho
import traceback
from prefect import task, Flow
from functools import partial, wraps
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
raise ValueError()
return x
with Flow("custom-decorator-test") as flow:
abc(1)
Tom Shaffner
12/15/2021, 2:20 PM