Ouail Bendidi
04/06/2023, 4:08 PMtask.map
on a large sequence of inputs (while defining a tag concurrency limit of 20), prefect seems to crash a lot while trying set_state
of tasks with a status code 500 (traceback in the thread ⬇️ )
Do you have any recommandations on how to work around this ? can define retries for prefect internal API ? is it something I'm doing wrong ?File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "home/user/myproject/flows/sync_kili_labels_flow.py", line 70, in sync_kili_labels_flow
async with anyio.create_task_group() as tg:
File "home/user/myproject/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 571, in create_and_begin_subflow_run
return await terminal_state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "home/user/myproject/flows/my_flow.py", line 51, in my_flow
data_to_store = [
^
File "home/user/myproject/flows/my_flow.py", line 52, in <listcomp>
item for sublist in data_to_store_futures for item in sublist.result()
^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/futures.py", line 231, in result
).result()
^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/task_runners.py", line 295, in _run_and_store_result
self._results[key] = await call()
^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1369, in begin_task_run
state = await orchestrate_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1494, in orchestrate_task_run
state = await propose_state(
^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1930, in propose_state
response = await set_state_and_handle_waits(set_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1917, in set_state_and_handle_waits
response = await set_state_func()
^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1894, in set_task_run_state
response = await <http://self._client.post|self._client.post>(
^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1533, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/base.py", line 269, in send
response.raise_for_status()
File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/base.py", line 132, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://myprefect/api/task_runs/4a58fd29-8576-4a98-8445-4906a2f1aaea/set_state>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
17:55:28.870 | ERROR | Flow run 'ebony-starfish' - Finished in state Failed("Flow run encountered an exception. PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://myprefect/api/task_runs/4a58fd29-8576-4a98-8445-4906a2f1aaea/set_state>'\nResponse: {'exception_message': 'Internal Server Error'}\nFor more information check: <https://httpstatuses.com/500>\n")
Traceback (most recent call last):
Zanie
04/06/2023, 4:17 PM