Kha Nguyen
08/02/2022, 11:21 AMEncountered exception during execution:
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/kha/Desktop/demo/singularity_flow.py", line 23, in run_forecast
results = produce_forecast.map(time_series, horizon=unmapped(14))
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/tasks.py", line 627, in map
return enter_task_run_engine(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 686, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 737, in begin_task_map
return await gather(*task_runs)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 339, in gather
async with create_gather_task_group() as tg:
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 317, in __aexit__
retval = await self._task_group.__aexit__(*tb)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 286, in _run_and_store
self._results[key] = await fn(*args)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 783, in create_task_run_then_submit
task_run = await create_task_run(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 823, in create_task_run
task_run = await flow_run_context.client.create_task_run(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/client.py", line 1622, in create_task_run
response = await <http://self._client.post|self._client.post>(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/client.py", line 278, in send
response.raise_for_status()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/client.py", line 224, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://127.0.0.1:4200/api/task_runs/>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
Bianca Hoch
08/02/2022, 2:06 PMKha Nguyen
08/02/2022, 5:29 PMfrom prefect import flow, get_run_logger, task
@task
def some_task(i: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Doubling {i} to {i*2}")
return i * 2
@task
def generate_numbers(g: int) -> list[int]:
return list(range(g))
@flow
def demo_flow():
logger = get_run_logger()
ls = generate_numbers.submit(100)
results_future = some_task.map(ls)
results = [f.wait().result() for f in results_future]
<http://logger.info|logger.info>(results)
demo_flow()
You can replace the argument to generate_numbers
from 100 to 10, and the flow can run successfully. However, even 100 is enough to break this flow. In my case, the number can go up to 30000.results_future = [some_task.submit(i) for i in ls.wait().result()]
So I can say that Prefect has no problem handling a large number of tasks, but the httpx config when using the map API has a bug.Bianca Hoch
08/02/2022, 8:17 PMgenerate_numbers
to 100, 200, and even 500Kha Nguyen
08/02/2022, 8:18 PMBianca Hoch
08/02/2022, 8:21 PMKha Nguyen
08/03/2022, 1:55 PMBianca Hoch
08/03/2022, 2:13 PM