https://prefect.io logo
Title
e

Erik

10/13/2022, 6:38 AM
Hi prefect community. I am trying to handle a known timeout issue, but I can’t seem to be catch it using try; except. Here is the toy case that shows the problem:
from prefect import flow, task
from time import sleep

@task
def sleep_task():
    sleep(5)
    return 'sleep_task complete'
    
@flow(timeout_seconds=1)
def timeout_flow():
    print(sleep_task())
    return 'timeout_flow complete'
    
@flow
def main_flow():
    try: 
        print(timeout_flow())
    except Exception as e:
        print('error caught:: ', repr(e))
The try; except block does not catch the error, instead the framework throws a TimeoutError exception. How can I catch prefect timeouts?
m

Mason Menges

10/13/2022, 3:58 PM
Hey @Erik I think this would be difficult to achieve since the timeout error isn't necessarily a result of the flow code and the try except block is going to be trying to catch errors there, you'd essentially need to re-reraise the timeout error but due to the nature of tasks/flows you'd still likely end up with a failed flow, generally speaking retries can be configured on a flow level to catch these situation https://docs.prefect.io/concepts/flows/#flow-settings. That said what problem are you trying to solve by accomplishing this?
e

Erik

10/13/2022, 4:27 PM
I see. I don’t know if this is technically feasible, but having some way to catch a timeout and complete the rest of a pipeline would be useful. I have a large pipeline that has one unreliable subflow, I could write the pipeline with the error handling logic, but right now it’s impossible and my whole pipeline is now at risk. Should I suggest this on Github?
m

Mason Menges

10/13/2022, 4:29 PM
Final Flow run states are determined by the return values for the flow, you could actually setup the flow like this to exclude the unreliable flow from the final state determination
from prefect import flow, task
from time import sleep

@task
def sleep_task():
    sleep(5)
    return 'sleep_task complete'
    
@flow(timeout_seconds=1)
def timeout_flow():
    print(sleep_task())
    return 'timeout_flow complete'

@flow
def other_flow():
    some_task()
    
@flow
def main_flow():
    unreliable = timeout_flow()
    reliable = other_flow()

    return reliable
e

Erik

10/13/2022, 4:58 PM
Oh… interesting, so the main flow would continue even if that one line returns an exception. I’ll try it. On a slightly different track, I’m curious, I haven’t use submit/result logic, but would that method return the Exception to the flow code that could then be handled in a try/except block?
I tried your code, but the main_flow still failed
m

Mason Menges

10/13/2022, 5:43 PM
Hmmm, I think that's technically because the Subflow is failing due to a time out from the decorator rather than the function itself, for your use case is the "unreliable" flow failing from a timeout typically or from some other exception?
e

Erik

10/13/2022, 8:45 PM
from timeout specifically