Hello, I'm trying to parallelize task runs in a fl...
# ask-community
r
Hello, I'm trying to parallelize task runs in a flow that works fine using the SequentialTaskRunner. I tried the ConcurrentTaskRunner and DaskTaskRunner, but I encounter a lot of 500 errors of this type :
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/prefect/xxx/yyy/backups/flows.py", line 73, in project_flow
    my_flow(
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1392, in enter_task_run_engine
    return submit_autonomous_task_run_to_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 304, in coroutine_wrapper
    return call()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 67, in submit_autonomous_task_run_to_engine
    future_result_or_state = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1806, in begin_task_run
    state = await orchestrate_task_run(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2149, in orchestrate_task_run
    state = await propose_state(client, terminal_state, task_run_id=task_run.id)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/engine.py", line 381, in propose_state
    response = await set_state_and_handle_waits(set_state)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/engine.py", line 368, in set_state_and_handle_waits
    response = await set_state_func()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2332, in set_task_run_state
    response = await <http://self._client.post|self._client.post>(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1892, in post
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 358, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 171, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://<prefect-url>:4200/api/task_runs/73bl686c-c2d5-4efc-bb85-6cfc61517653/set_state'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
Could it be an issue with the API not being able to handle the worker calls in a timely manner? This error seems to happen randomly. A 500 error status is quite broad, I'm having a hard time isolating the problem.
Is there a timeout parameter in the Prefect API that I can modify? It seems that some API calls timeout once the flow is deployed (tasks are running on ECR).
Everything runs like a charm locally.