https://prefect.io logo
Title
o

Ouail Bendidi

04/06/2023, 4:08 PM
Hey all, When using
task.map
on a large sequence of inputs (while defining a tag concurrency limit of 20), prefect seems to crash a lot while trying
set_state
of tasks with a status code 500 (traceback in the thread ⬇️ ) Do you have any recommandations on how to work around this ? can define retries for prefect internal API ? is it something I'm doing wrong ?
1
File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "home/user/myproject/flows/sync_kili_labels_flow.py", line 70, in sync_kili_labels_flow
    async with anyio.create_task_group() as tg:
  File "home/user/myproject/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 571, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "home/user/myproject/flows/my_flow.py", line 51, in my_flow
    data_to_store = [
                    ^
  File "home/user/myproject/flows/my_flow.py", line 52, in <listcomp>
    item for sublist in data_to_store_futures for item in sublist.result()
                                                          ^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/futures.py", line 231, in result
    ).result()
      ^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/obendidi/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/task_runners.py", line 295, in _run_and_store_result
    self._results[key] = await call()
                         ^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1369, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1494, in orchestrate_task_run
    state = await propose_state(
            ^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1930, in propose_state
    response = await set_state_and_handle_waits(set_state)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/engine.py", line 1917, in set_state_and_handle_waits
    response = await set_state_func()
               ^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1894, in set_task_run_state
    response = await <http://self._client.post|self._client.post>(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/base.py", line 269, in send
    response.raise_for_status()
  File "home/user/myproject/.venv/lib/python3.11/site-packages/prefect/client/base.py", line 132, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://myprefect/api/task_runs/4a58fd29-8576-4a98-8445-4906a2f1aaea/set_state>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
17:55:28.870 | ERROR   | Flow run 'ebony-starfish' - Finished in state Failed("Flow run encountered an exception. PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://myprefect/api/task_runs/4a58fd29-8576-4a98-8445-4906a2f1aaea/set_state>'\nResponse: {'exception_message': 'Internal Server Error'}\nFor more information check: <https://httpstatuses.com/500>\n")
Traceback (most recent call last):
z

Zanie

04/06/2023, 4:17 PM
We’ll need the server logs to determine the cause of the 500
We’re also adding the ability to add custom retry codes in the next version (today), so you could retry on 500 — we don’t by default because it may not be idempotent.
🙌 1