Hello, I am implementing Timeout for one of my flo...
# ask-community
h
Hello, I am implementing Timeout for one of my flow. I have a few questions. 1. how to get the flow to enter
Timeout
state when timeout happens to a task. https://docs.prefect.io/api/latest/engine/state.html#timedout It seems there is only signals.FAIL instead of signals.TIMEOUT. 2. the fow DAG: A > B > C > D > E. Where E is always running with
trigger=triggers.always_run
. when timeout happens to task A, B, C, D, the task raises an exception
TimeoutError
, which is not a
FAIL
signal. and it causes the flow to terminate without running E properly. To overcome that, Currently I have to implement the exception handler in every task, it’s very tedious. Any better way to do it?
Copy code
Try:
            except TimeoutError:
                raise signals.FAIL("Task Timeout.")
👀 1
n
Hi @Hui Zheng - to your first point, we have a ticket open to allow raising different states at the flow run level - I'd appreciate if you could chime in there with your use case as that helps us understand the need and priority for that feature. On the second,
TimeoutError
is an inheritor of the
Failed
state (you can see the full state inheritance model here). You could instead use a
triggers.all_finished
trigger on that task to ensure it runs no matter what?
h
I know in
DbtShellTask
and
ShellTask
the timeout is raised as an exception
TimeoutError
instead of a
signals.FAIL
. Is it the case for other tasks?
n
That's a good question, I'm not as familiar with why those raise the exception instead of a fail signal there, but those might be good examples to point to for functionality you'd prefer they mimic
h
@nicholas I confirmed that the when a task timeout it raises an exception
raise TimeoutError("Execution timed out.")
, which is not a subclass of signal.
FAIL
. The time-out task throws an Unexpected error and terminate the flow run
Copy code
ERROR - prefect.TaskRunner | Unexpected error: TimeoutError('Execution timed out.')
All downstream tasks are not run even if being set to always run
trigger=triggers.always_run
n
Thanks @Hui Zheng - I've flagged this for the Core team to look into; I can't immediately determine if this is a bug but I suspect something weird might be going on. We should be able to get back to you on Monday
h
just want to let you know that I made a solution by creating a decorator, called
handle_timeout_exception
, which handles
TimeoutError
and raise a FAIL signal. I then decorate all my tasks with it.
Copy code
def handle_exception(func):
    """
        a decorator function that handles common exceptions in the tasks.
    """
    @wraps(func)
    def decorated(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except TimeoutError:
            raise signals.FAIL(func.__name__ + " Task Timeout.")

    return decorated