Hello guys, i have a question regarding Timeouts. ...
# best-practices
t
Hello guys, i have a question regarding Timeouts. If i change
retries
and
retry_delay_seconds
params in the task declaration, does the Timeout change? Im having a Timeout error and i increased the above parameters which resulted on the task to fail sooner. Is there any relationship between
timeout_seconds
and the other variables? As i increase retires, i have to increase
timeout_seconds
as well?
These are my current default parameters
n
hi @Tedi Gjoni -
timeout_seconds
should only enforce timeouts for the execution of a single task / flow run, which should be entirely independent of
retries
and
retry_delay_seconds
do you have an example where that seems not to be the case?
t
@Nate correct me if im wrong, if
retry_delay_seconds
> `timeout_seconds`the task will crash/timeout?
Also looks like i added a time.sleep() which picks a random number between (2,120). If the functions picks a number that is greater than
timeout_seconds
it raise Timeout.
n
hmm can you show your code? > if
retry_delay_seconds
> `timeout_seconds`the task will crash/timeout?
retry_delay_seconds
is how long we will wait between retries, which should have nothing to do with how long a task / flow will be allowed to run before failing with a
TimeoutError
according to
timeout_seconds
for example
Copy code
from time import sleep

from prefect import flow


@flow(retries=1, retry_delay_seconds=10, timeout_seconds=3)
def sleepy():
    sleep(1e3)


if __name__ == "__main__":
    sleepy()
this will timeout after 3 seconds, enter
AwaitingRetry
, wait 10 seconds according to
retry_delay_seconds
, retry because
retries=1
and then fail again after 3 seconds and finally enter
Failed
t
I see. Looks like the issues with my job are: 1. I didnt have a timeout_seconds and i was not aware 2. Added a sleep function which results is capable of producing a number which is bigger than
timeout_seconds
👍 1
if i dont pass a
timeout_seonds
whats the default number?
n
by default there is no timeout
t
Then how is it possible that i get TIMEOUT error? Pydantic raises this. Also is there any relationship between the env variables and the timeout_seconds?
n
is there any relationship between the env variables and the timeout_seconds?
no - those are unrelated server-side settings. if you're not setting a
timeout_seconds
and still getting a
TimeoutError
, i would suspect the code you have inside your task/flow is raising that, not prefect. without seeing the code its hard to give a useful guess
t
This is my flow. I forgot to mention that im using Ray
i just added
timeout_seconds
as im currently running a job
The full story is that we upgraded the RDS that i connected to Prefect (version and throughput), since then the ETL keeps failing. We didnt increase CPU or RAM for that db.
Since then we are having HTTP 500 errors
n
what is the output of this?
Copy code
pip freeze | grep -E 'prefect|ray'
t
Copy code
prefect==2.14.13
prefect-email==0.2.2
prefect-ray==0.2.5
ray==2.8.1
👍 1
We reduced the
n_jobs
to test if it solves the issue. Some runs it works, some runs it doesnt. A weird behavior i have noticed is that we this job runs (which big) and i open Prefect UI, the job crashes
n
is it possible your tasks are OOMing? I'm not a foremost ray expert 🙂
t
I believe the RDS is OOMing (increased throughput but not CPU or RAM). I know for a fact that as soon as we did the change it broke.
Does prefect send logs to the database? I see lots of open connections to that db while running this job?
n
by default yeah stuff logged via
get_run_logger
will get sent to the API and stored in the db
t
I see, so each task that is created sends logs to the database. We have a lot of parallel tasks and we generate lots of logs.
Thank you @Nate, i appreciate your help
n
no problem. it does look like with
RayTaskRunner
,
timeout_seconds
is not being correctly enforced, so I've opened an issue about that
t
Great! Do you know the default timeout of RayTaskRunner?
n
for all task runners, the default is None
t
i see
thank you
n
👍
t
Hey @Nate out of curiosity, im trying to understand how you guys send logs via API to the DB. I found a 5s rule in the code. Do you know anything about this?
I will take a look at this later today but i want to verify that there is a timeout for the API call to store logs in DB
@Nate This is the error that im facing
n
can you show the whole trace and pass the text here in a snippet? (Cmd + Shift + Enter on macOS)
t
Copy code
Traceback (most recent call last):
  File "/home/tgjoni/alpharoc/occam/bin/flows/signal-process.py", line 173, in signal_process_flow
    signal_temporary.load_to_dynamodb(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/flows.py", line 1120, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 291, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 733, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/tgjoni/alpharoc/occam/bin/flows/signal-process.py", line 211, in signal_process_flow
    flow_state = Failed(message=error)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/server/schemas/states.py", line 331, in Failed
    return cls(type=StateType.FAILED, **kwargs)
  File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for State
message
  str type expected (type=type_error.str)
21:07:22.658 | ERROR   | Flow run 'notorious-kestrel' - Finished in state Failed('Flow run encountered an exception. ValidationError: 1 validation error for State\nmessage\n  str type expected (type=type_error.str)')
Traceback (most recent call last):
  File "/home/tgjoni/alpharoc/occam/bin/flows/signal-process.py", line 173, in signal_process_flow
    signal_temporary.load_to_dynamodb(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/flows.py", line 1120, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 291, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 733, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tgjoni/alpharoc/occam/bin/flows/signal-process.py", line 221, in <module>
    signal_process_flow(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/flows.py", line 1120, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 291, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 394, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/tgjoni/alpharoc/occam/bin/flows/signal-process.py", line 211, in signal_process_flow
    flow_state = Failed(message=error)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/server/schemas/states.py", line 331, in Failed
    return cls(type=StateType.FAILED, **kwargs)
  File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for State
message
  str type expected (type=type_error.str)
n
is that the whole trace?
t
i edited it
n
interesting looks like we are raising a timeout
Copy code
File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 733, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
TimeoutError
but later on, this looks like some client / server mismatch potentially
Copy code
File "/home/tgjoni/.conda/envs/armono/lib/python3.10/site-packages/prefect/server/schemas/states.py", line 331, in Failed
    return cls(type=StateType.FAILED, **kwargs)
  File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for State
message
  str type expected (type=type_error.str)
t
looks like pydantic model doesnt get passed a variable
n
right, like the server is sending the wrong JSON back to the client schema
t
I believe my Prefect version is old. Currently the database is using postgres 16.1
n
do you have a reason you cannot upgrade to newest prefect (2.17.1 iirc)?
t
Our infrastructure runs on the current version. We updated a month ago prefect and it crashed some of our deployments due to changes in API. We are planning to upgrade
I can survive for tonight as im running the job manually. But i find it interesting that there is a Timeout when it sends the logs to the db
I will upgrade the server tomorrow. Im gonna let you know if i find the same issue
n
ok sounds good, thanks
t
Hey @Nate, we upgraded Prefect but we are still facing the same issue but looks like it shows a different message
Copy code
20:30:08.720 | ERROR   | Flow run 'romantic-eel' - Crash detected! Execution was cancelled by the runtime environment.
Traceback (most recent call last):
  File "/home/ubuntu/alpharoc/occam/bin/flows/signal-process.py", line 173, in signal_process_flow
    signal_temporary.load_to_dynamodb(
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/flows.py", line 1228, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/engine.py", line 291, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
    waiter.wait()
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
    self._handle_waiting_callbacks()
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 147, in _handle_waiting_callbacks
    callback.run()
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 282, in run
    coro = self.context.run(self._run_sync)
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/ubuntu/alpharoc/occam/src/occam/schedule/flow/signal_temporary.py", line 129, in load_to_dynamodb
    future.wait()
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/futures.py", line 158, in wait
    return from_sync.call_soon_in_loop_thread(wait).result()  # type: ignore
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in result
    self._condition.wait(timeout)
  File "/home/ubuntu/miniconda3/envs/armono/lib/python3.10/threading.py", line 320, in wait
    waiter.acquire()
prefect._internal.concurrency.cancellation.CancelledError
In the UI we see
Copy code
Crash detected! Execution was cancelled by the runtime environment.
Copy code
Crash detected! Request to <http://127.0.0.1:4200/api/task_runs/> failed: PoolTimeout:
@Nate We are running in parallel yet it this functions is raising the issue:
result()
180 Views