Joao Moniz
01/15/2024, 9:23 AMtime_out
being set as a flow parameter, so the default value can be changed during a custom flow run for example?
e.g:
@flow(timeout_seconds=timeout_seconds)
def my_flow(
timeout_seconds: int = 600,
):
Nate
01/15/2024, 6:14 PMtimeout_seconds
on a flow run basis (you can do this with some other flow settings now, like result_storage_key and flow_run_name for example)
instead, one thing that definitely would work is something like this, which you could make a util
import anyio
from prefect import flow
from prefect.states import Completed
@flow
async def dynamic_timeout(timeout: int):
try:
with anyio.fail_after(timeout):
await anyio.sleep(timeout + 1)
except TimeoutError:
return Completed(message=f"Timeout after {timeout} seconds", name="TimedOut")
if __name__ == "__main__":
import asyncio
asyncio.run(dynamic_timeout(timeout=3))
also if my_flow
above happens to be a subflow (not the main entrypoint of the deployment), you could use with_options
to set the timeout based on some inputJoao Moniz
01/15/2024, 8:11 PM