Hi guys, I'am using `prefect 2.3.1`, `prefect-dask...
# prefect-community
i
Hi guys, I'am using
prefect 2.3.1
,
prefect-dask: 0.2.0
and got several errors running code similar to this pseudo code :
Copy code
@task
def dask_context(item):
    name="myblock-name"
    config = await JSON.load(name)

@flow(task_runner=DaskTaskRunner())
def do_stuff():
   with tags("only_4"):
       result_list = dask_context.map(data_list)
I got this Exception multiple times after it running for a while (sometimes minutes, sometimes hours):
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/usr/local/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
    return b""
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
    return b""
  File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1185, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/tmp/tmpq25d58myprefect/lib/dask_context.py", line 14, in _dask_task
    with await get_context(flow_run.parameters[block_name]):
  File "/tmp/tmpq25d58myprefect/./lib/config_context.py", line 40, in get_context
    config = await JSON.load(name)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 606, in load
    block_document = await client.read_block_document_by_name(
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1268, in read_block_document_by_name
    response = await self._client.get(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1751, in get
    return await self.request(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 257, in send
    await super().send(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
I started running this code with a tag concurrency limit of 20 then switched to 4. The more threads the exception was raised faster. Currently running on a server of 44 cores and 256 GB.
1
f
I am in the same boat, somehow I don't get any errors with prefect 2.0.4, but for any prefect>2.0.4 I get errors sometime the flow freezes without any errors.
i
I manage to make a sample code that generates the error and tested it in
prefect: 2.3.2
the error is still present
Copy code
import asyncio
import random
import os
from prefect.blocks.system import JSON
from prefect_dask import DaskTaskRunner


@task
async def do_work(idx: int, block_name: str):
    config = await JSON.load(block_name)
    sleep_time = random.uniform(.5, 3.0)
    await asyncio.sleep(sleep_time) # simulating load with random times
    print(f"done with {idx}")
    return idx


@flow(task_runner=DaskTaskRunner())
def concurrent(block_name: str):
    with tags("workers"):
        work_futures = [ do_work.submit(index, block_name) for index in list(range(1,1024)) ]
        work_results = [ item.result() for item in work_futures ]
    print("the end")


if __name__ == "__main__":
    os.system("prefect concurrency-limit create workers 4")
    json_block = JSON(value={"the_answer": 42})
    json_block.save("test-block", overwrite=True)
    concurrent("test-block")
j
This sounds like a 🪲. Are you able to create a bug report issue at prefecthq/prefect ?
i
ok, I will
🙏 1
🙌 1
hi @Jeff Hale, filed my bug report https://github.com/PrefectHQ/prefect/issues/6759
👍 1
🙌 1