Hui Zheng
12/04/2020, 7:50 PMTimeout state when timeout happens to a task. https://docs.prefect.io/api/latest/engine/state.html#timedout It seems there is only signals.FAIL instead of signals.TIMEOUT.
2. the fow DAG: A > B > C > D > E. Where E is always running with trigger=triggers.always_run. when timeout happens to task A, B, C, D, the task raises an exception TimeoutError , which is not a FAIL signal. and it causes the flow to terminate without running E properly. To overcome that, Currently I have to implement the exception handler in every task, it’s very tedious. Any better way to do it?
Try:
except TimeoutError:
raise signals.FAIL("Task Timeout.")nicholas
TimeoutError is an inheritor of the Failed state (you can see the full state inheritance model here). You could instead use a triggers.all_finished trigger on that task to ensure it runs no matter what?Hui Zheng
12/04/2020, 8:48 PMDbtShellTask and ShellTask the timeout is raised as an exception TimeoutError instead of a signals.FAIL. Is it the case for other tasks?nicholas
Hui Zheng
12/04/2020, 10:13 PMraise TimeoutError("Execution timed out.") , which is not a subclass of signal.FAIL . The time-out task throws an Unexpected error and terminate the flow run
ERROR - prefect.TaskRunner | Unexpected error: TimeoutError('Execution timed out.')Hui Zheng
12/04/2020, 10:14 PMtrigger=triggers.always_runnicholas
Hui Zheng
12/08/2020, 12:22 AMhandle_timeout_exception, which handles TimeoutError and raise a FAIL signal. I then decorate all my tasks with it.
def handle_exception(func):
"""
a decorator function that handles common exceptions in the tasks.
"""
@wraps(func)
def decorated(*args, **kwargs):
try:
return func(*args, **kwargs)
except TimeoutError:
raise signals.FAIL(func.__name__ + " Task Timeout.")
return decorated