Hello, my prefect 2.0 tests are going well...I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?
discourse 1
k
Kevin Kho
03/28/2022, 7:56 PM
Orion doesn’t have timeouts yet so I don’t think there is a simple way like in Prefect 1
z
Zanie
03/28/2022, 7:57 PM
We don’t support cancellation of tasks at this time, but you can perform logic based on a wait/timeout
Copy code
future = my_task(arg, arg)
state = future.wait(5)
if not state:
# Do something based on this timeout
...
Zanie
03/28/2022, 7:58 PM
Here’s the example from the docstring
Copy code
Wait N sconds for the task to complete
>>> @flow
>>> def my_flow():
>>> future = my_task()
>>> final_state = future.wait(0.1)
>>> if final_state:
>>> ... # Task done
>>> else:
>>> ... # Task not done yet
upvote 1
m
Michael Smith
03/29/2022, 7:24 AM
I realised that this can be done fairly neatly with sub-graphs, so it relies on the developer knowing the pattern (breaking away from Task), but good function naming standards would make this clearer
Copy code
from prefect import flow, task
from time import sleep
@task
def do_something():
print("Do something")
sleep(20)
@flow(timeout_seconds=10)
def flow2():
do_something()
@flow(timeout_seconds=60)
def timeout_task():
do_something()
flow2()
timeout_task()
Michael Smith
03/29/2022, 7:30 AM
I also tried "double-decorating" a single function with both Flow and Task which would be handy - but I'm guessing Flow is doing some introspection as that failed, but this works as you'd expect
Copy code
from prefect import flow, task
from time import sleep
@flow(timeout_seconds=10)
def combined():
@task
def combined_inner():
print("Do something")
sleep(20)
combined_inner()
combined()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.