Hui Zheng
12/04/2020, 7:50 PMTimeout
state when timeout happens to a task. https://docs.prefect.io/api/latest/engine/state.html#timedout It seems there is only signals.FAIL instead of signals.TIMEOUT.
2. the fow DAG: A > B > C > D > E. Where E is always running with trigger=triggers.always_run
. when timeout happens to task A, B, C, D, the task raises an exception TimeoutError
, which is not a FAIL
signal. and it causes the flow to terminate without running E properly. To overcome that, Currently I have to implement the exception handler in every task, it’s very tedious. Any better way to do it?
Try:
except TimeoutError:
raise signals.FAIL("Task Timeout.")
nicholas
TimeoutError
is an inheritor of the Failed
state (you can see the full state inheritance model here). You could instead use a triggers.all_finished
trigger on that task to ensure it runs no matter what?Hui Zheng
12/04/2020, 8:48 PMDbtShellTask
and ShellTask
the timeout is raised as an exception TimeoutError
instead of a signals.FAIL
. Is it the case for other tasks?nicholas
Hui Zheng
12/04/2020, 10:13 PMraise TimeoutError("Execution timed out.")
, which is not a subclass of signal.FAIL
. The time-out task throws an Unexpected error and terminate the flow run
ERROR - prefect.TaskRunner | Unexpected error: TimeoutError('Execution timed out.')
Hui Zheng
12/04/2020, 10:14 PMtrigger=triggers.always_run
nicholas
Hui Zheng
12/08/2020, 12:22 AMhandle_timeout_exception
, which handles TimeoutError
and raise a FAIL signal. I then decorate all my tasks with it.
def handle_exception(func):
"""
a decorator function that handles common exceptions in the tasks.
"""
@wraps(func)
def decorated(*args, **kwargs):
try:
return func(*args, **kwargs)
except TimeoutError:
raise signals.FAIL(func.__name__ + " Task Timeout.")
return decorated