https://prefect.io logo
h

Hans Lellelid

08/21/2023, 12:09 PM
Hi folks -- I'm curious about some practical limits I need to place on my flows/tasks in order to prevent TimoutError on the server resulting in cancelling the flow run on the agent (due to 500 error). Stack trace snippets in the šŸ§µ
On the server, I'm seeing (snipped to remove db boilerplate):
Copy code
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    | Exception in ASGI application
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    | Traceback (most recent call last):
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |   File "/opt/app-root/lib64/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |     result = await app(  # type: ignore[func-returns-value]
# SNIP
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/server/api/work_queues.py", line 165, in _record_work_queue_polls
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |     await models.work_queues.update_work_queue(
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |   File "/opt/app-root/lib64/python3.11/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |     return await fn(*args, **kwargs)
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |            ^^^^^^^^^^^^^^^^^^^^^^^^^
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |   File "/opt/app-root/lib64/python3.11/site-packages/prefect/server/models/work_queues.py", line 216, in update_work_queue
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |     result = await session.execute(update_stmt)
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# SNIP
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |   File "/opt/app-root/lib64/python3.11/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |     return await executor(protocol)
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |            ^^^^^^^^^^^^^^^^^^^^^^^^
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    |   File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
atlas-stack_prefect-server.1.qtd2wad4u7na@kcp010    | TimeoutError
On the client:
Copy code
Crash details:
Traceback (most recent call last):
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1839, in report_flow_run_crashes
    yield
  File "/usr/lib64/python3.11/contextlib.py", line 716, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1303, in create_task_run_then_submit
    task_run = await create_task_run(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1348, in create_task_run
    task_run = await flow_run_context.client.create_task_run(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/orchestration.py", line 1872, in create_task_run
    response = await <http://self._client.post|self._client.post>(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1530, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 280, in send
    response.raise_for_status()
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 138, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://prefect-server:4200/api/task_runs/>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
In this case my workload is: ā€¢ submit a flow that pulls a large file from S3 (7GB) ā—¦ file is split into ~250 10k-line files ā—¦ each of the 250 is submitted as a task -- there are no concurrency limits on this task ā—¦ each task will take some number of minutes to complete, but it seems like it is failing simply queuing up these tasks and not to any processing yet.
My first thought is that I am going to try an approach of limiting how many of these batches can be processed concurrently and see if that fixes it.
This is version:
2.10.13
I'm using a DaskTaskRunner with currently just a single agent for testing. Server is using postgres database.
Server and agents all running in custom Docker containers.
I've updated to latest Prefect version and implemented rate limiting, but still banging up against TimeoutError on the server when creating tasks (same stack trace as above). Thinking there is something specific about our rocky-linux-based environment, as I get timeout errors on server when locally (macbook) it works fine .... So I'm still digging into this to identify root cause.
Ok, the common failure case appears to be DaskTaskRunner (on our infrastructure). Or at least that causes this problem to surface. That was a false alarm; it is less consistent than I had hoped, but the error resurfaced despite not using DaskTaskRunner.
I will start a discourse thread on this, as this is either a bug or some sort of user error in my flow. Cannot get this to work with any sort of concurrency without (immediate) TimeoutError on the server side resulting in cancelled jobs šŸ˜ž