https://prefect.io logo
h

Hui Zheng

12/04/2020, 7:50 PM
Hello, I am implementing Timeout for one of my flow. I have a few questions. 1. how to get the flow to enter
Timeout
state when timeout happens to a task. https://docs.prefect.io/api/latest/engine/state.html#timedout It seems there is only signals.FAIL instead of signals.TIMEOUT. 2. the fow DAG: A > B > C > D > E. Where E is always running with
trigger=triggers.always_run
. when timeout happens to task A, B, C, D, the task raises an exception
TimeoutError
, which is not a
FAIL
signal. and it causes the flow to terminate without running E properly. To overcome that, Currently I have to implement the exception handler in every task, it’s very tedious. Any better way to do it?
Copy code
Try:
            except TimeoutError:
                raise signals.FAIL("Task Timeout.")
👀 1
n

nicholas

12/04/2020, 8:39 PM
Hi @Hui Zheng - to your first point, we have a ticket open to allow raising different states at the flow run level - I'd appreciate if you could chime in there with your use case as that helps us understand the need and priority for that feature. On the second,
TimeoutError
is an inheritor of the
Failed
state (you can see the full state inheritance model here). You could instead use a
triggers.all_finished
trigger on that task to ensure it runs no matter what?
h

Hui Zheng

12/04/2020, 8:48 PM
I know in
DbtShellTask
and
ShellTask
the timeout is raised as an exception
TimeoutError
instead of a
signals.FAIL
. Is it the case for other tasks?
n

nicholas

12/04/2020, 9:32 PM
That's a good question, I'm not as familiar with why those raise the exception instead of a fail signal there, but those might be good examples to point to for functionality you'd prefer they mimic
h

Hui Zheng

12/04/2020, 10:13 PM
@nicholas I confirmed that the when a task timeout it raises an exception
raise TimeoutError("Execution timed out.")
, which is not a subclass of signal.
FAIL
. The time-out task throws an Unexpected error and terminate the flow run
Copy code
ERROR - prefect.TaskRunner | Unexpected error: TimeoutError('Execution timed out.')
All downstream tasks are not run even if being set to always run
trigger=triggers.always_run
n

nicholas

12/04/2020, 11:48 PM
Thanks @Hui Zheng - I've flagged this for the Core team to look into; I can't immediately determine if this is a bug but I suspect something weird might be going on. We should be able to get back to you on Monday
h

Hui Zheng

12/08/2020, 12:22 AM
just want to let you know that I made a solution by creating a decorator, called
handle_timeout_exception
, which handles
TimeoutError
and raise a FAIL signal. I then decorate all my tasks with it.
Copy code
def handle_exception(func):
    """
        a decorator function that handles common exceptions in the tasks.
    """
    @wraps(func)
    def decorated(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except TimeoutError:
            raise signals.FAIL(func.__name__ + " Task Timeout.")

    return decorated