Wilhelm Su
11/21/2021, 8:54 PMKevin Kho
@task
. I think this might work for you.
import traceback
from prefect import task, Flow
from functools import partial, wraps
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
raise ValueError()
return x
with Flow("custom-decorator-test") as flow:
abc(1)
Wilhelm Su
11/21/2021, 8:57 PMKevin Kho
Kevin Kho
Anna Geller
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = divide_numbers(1, 1)
if __name__ == "__main__":
flow.run()
Anna Geller
Wilhelm Su
11/21/2021, 11:56 PM