https://prefect.io logo
k

Kha Nguyen

08/02/2022, 11:21 AM
When I use map to submit about 10000 tasks, the run failed. Is there a limit to how many tasks I can submit?
1
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/kha/Desktop/demo/singularity_flow.py", line 23, in run_forecast
    results = produce_forecast.map(time_series, horizon=unmapped(14))
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/tasks.py", line 627, in map
    return enter_task_run_engine(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 686, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 737, in begin_task_map
    return await gather(*task_runs)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 339, in gather
    async with create_gather_task_group() as tg:
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 317, in __aexit__
    retval = await self._task_group.__aexit__(*tb)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 286, in _run_and_store
    self._results[key] = await fn(*args)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 783, in create_task_run_then_submit
    task_run = await create_task_run(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 823, in create_task_run
    task_run = await flow_run_context.client.create_task_run(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/client.py", line 1622, in create_task_run
    response = await <http://self._client.post|self._client.post>(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/client.py", line 278, in send
    response.raise_for_status()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/client.py", line 224, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://127.0.0.1:4200/api/task_runs/>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
This seems to be an issue with httpx.
b

Bianca Hoch

08/02/2022, 2:06 PM
Hi Kha, thanks for reporting. Can you describe what your setup looks like a bit more? What your agents/storage look like, etc?
k

Kha Nguyen

08/02/2022, 5:29 PM
Hi Bianca, here is a reproducible example
Copy code
from prefect import flow, get_run_logger, task


@task
def some_task(i: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Doubling {i} to {i*2}")
    return i * 2

@task
def generate_numbers(g: int) -> list[int]:
    return list(range(g))

@flow
def demo_flow():
    logger = get_run_logger()

    ls = generate_numbers.submit(100)
    results_future = some_task.map(ls)

    results = [f.wait().result() for f in results_future]

    <http://logger.info|logger.info>(results)

demo_flow()
You can replace the argument to
generate_numbers
from 100 to 10, and the flow can run successfully. However, even 100 is enough to break this flow. In my case, the number can go up to 30000.
Using the python for loop works though
Copy code
results_future = [some_task.submit(i) for i in ls.wait().result()]
So I can say that Prefect has no problem handling a large number of tasks, but the httpx config when using the map API has a bug.
@Bianca Hoch in this example, you can also see that I have logging in task, but I cannot see that message anywhere.
b

Bianca Hoch

08/02/2022, 8:17 PM
Hello Kha, interestingly enough I was able to have the flow run to completion setting argument for
generate_numbers
to 100, 200, and even 500
I was also able to see the log info
k

Kha Nguyen

08/02/2022, 8:18 PM
Yes, I can see the log with a local process runner too
I am using Macbook M1. I wonder if that affects anything.
Thanks for reproducing the run
b

Bianca Hoch

08/02/2022, 8:21 PM
Thank you for providing something I could reproduce! For context I'm using a Macbook, MacOS: 12.3.1, Chip: Apple M1 Pro
Note that if you're still running in to the same problem, feel free to open up a GitHub issue like I mentioned in our other exchange.
k

Kha Nguyen

08/03/2022, 1:55 PM
Hi Bianca, I reran my example from yesterday, and today it ran successfully without any problem. I don’t know what causes the exception in the first place. Perhaps it was just some local system issue.
🎉 1
b

Bianca Hoch

08/03/2022, 2:13 PM
Glad to hear that Kha!
7 Views