Ian Andres Etnyre Mercader
09/08/2022, 7:59 PMprefect 2.3.1
, prefect-dask: 0.2.0
and got several errors running code similar to this pseudo code :
@task
def dask_context(item):
name="myblock-name"
config = await JSON.load(name)
@flow(task_runner=DaskTaskRunner())
def do_stuff():
with tags("only_4"):
result_list = dask_context.map(data_list)
I got this Exception multiple times after it running for a while (sometimes minutes, sometimes hours):
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 33, in read
return await self._stream.receive(max_bytes=max_bytes)
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
await self._protocol.read_event.wait()
File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/usr/local/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
return b""
File "/usr/local/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
raise exc
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
) = await self._receive_response_headers(**kwargs)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
data = await self._network_stream.read(
File "/usr/local/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
return b""
File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.ReadTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1185, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/tmp/tmpq25d58myprefect/lib/dask_context.py", line 14, in _dask_task
with await get_context(flow_run.parameters[block_name]):
File "/tmp/tmpq25d58myprefect/./lib/config_context.py", line 40, in get_context
config = await JSON.load(name)
File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 606, in load
block_document = await client.read_block_document_by_name(
File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1268, in read_block_document_by_name
response = await self._client.get(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 257, in send
await super().send(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ReadTimeout
I started running this code with a tag concurrency limit of 20 then switched to 4. The more threads the exception was raised faster.
Currently running on a server of 44 cores and 256 GB.Faheem Khan
09/08/2022, 11:11 PMIan Andres Etnyre Mercader
09/08/2022, 11:16 PMprefect: 2.3.2
the error is still present
import asyncio
import random
import os
from prefect.blocks.system import JSON
from prefect_dask import DaskTaskRunner
@task
async def do_work(idx: int, block_name: str):
config = await JSON.load(block_name)
sleep_time = random.uniform(.5, 3.0)
await asyncio.sleep(sleep_time) # simulating load with random times
print(f"done with {idx}")
return idx
@flow(task_runner=DaskTaskRunner())
def concurrent(block_name: str):
with tags("workers"):
work_futures = [ do_work.submit(index, block_name) for index in list(range(1,1024)) ]
work_results = [ item.result() for item in work_futures ]
print("the end")
if __name__ == "__main__":
os.system("prefect concurrency-limit create workers 4")
json_block = JSON(value={"the_answer": 42})
json_block.save("test-block", overwrite=True)
concurrent("test-block")
Jeff Hale
09/08/2022, 11:18 PMIan Andres Etnyre Mercader
09/08/2022, 11:18 PM