https://prefect.io logo
Title
m

Michael Smith

03/28/2022, 7:44 PM
Hello, my prefect 2.0 tests are going well...I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?
:discourse: 1
k

Kevin Kho

03/28/2022, 7:56 PM
Orion doesn’t have timeouts yet so I don’t think there is a simple way like in Prefect 1
z

Zanie

03/28/2022, 7:57 PM
We don’t support cancellation of tasks at this time, but you can perform logic based on a wait/timeout
future = my_task(arg, arg)
state = future.wait(5)
if not state:
   # Do something based on this timeout
   ...
Here’s the example from the docstring
Wait N sconds for the task to complete

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task()
        >>>     final_state = future.wait(0.1)
        >>>     if final_state:
        >>>         ... # Task done
        >>>     else:
        >>>         ... # Task not done yet
:upvote: 1
m

Michael Smith

03/29/2022, 7:24 AM
I realised that this can be done fairly neatly with sub-graphs, so it relies on the developer knowing the pattern (breaking away from Task), but good function naming standards would make this clearer
from prefect import flow, task
from time import sleep

@task
def do_something():
    print("Do something")
    sleep(20)

@flow(timeout_seconds=10)
def flow2():
    do_something()

@flow(timeout_seconds=60)
def timeout_task():
    do_something()
    flow2()

timeout_task()
I also tried "double-decorating" a single function with both Flow and Task which would be handy - but I'm guessing Flow is doing some introspection as that failed, but this works as you'd expect
from prefect import flow, task
from time import sleep

@flow(timeout_seconds=10)
def combined():

    @task
    def combined_inner():
        print("Do something")
        sleep(20)

    combined_inner()

combined()
k

Kevin Kho

03/29/2022, 1:51 PM
Ah I see that’s a nice suggestion