Hi again, I have a flow that produces the followin...
# ask-community
d
Hi again, I have a flow that produces the following error message
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
. The error seems to originate from
asyncio
and not exactly from my code. In a nutshell, the task that causes this accesses an API endpoint, extracts the relevant data and stores it in a Redis database. So nothing too fancy. Also interesting is that the flow seems to complete successfully (in regards to the status). There is an associated ticket (https://github.com/PrefectHQ/prefect/issues/6335), but it doesn’t seem to match my problem perfectly (the sympthom seems to be the same though). I did also try to set
PYTHONUTF8=1
and it didn’t fix my issue. BTW: I am on Mac. Any thoughts or advises how to get rid of this error message?
Seems it is related to
log_prints=True
.
a
I did some digging into this error today and did not resolve it. 😂 However, I made a little progress understanding the issue. Another user included code in that linked issue that reliably reproduces the error. I resolved the error with that code only by rewriting the code to use async function calls (awaits), So it appears that the issue is some kind of timing error that probably occurs with various uses of streams, when users call flows or tasks as normal, synchronous Python functions, instead of async coroutines. Anyway, you might try rewriting your flow (and tasks) to use awaits — but that may not be worth the time investment. And one day we will track down the root cause(s).
d
For completeness here the stack trace. I had the impression that not printing stuff helped, but I think this happens more or less randomly (I could be wrong). So it happened again later:
Copy code
06:28:00.294 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b"{'type': 'm...mplete': 0}\n")
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b"{'type': 'm...mplete': 0}\n")>
Traceback (most recent call last):
  File "/Users/david/.pyenv/versions/3.9.14/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/david/.pyenv/versions/3.9.14/lib/python3.9/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/Users/david/.pyenv/versions/3.9.14/lib/python3.9/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
06:28:00.841 | ERROR   | prefect.flow_runs.runner - An error occurred while monitoring flow run 'e9eb9676-fb3f-4787-91a5-46d2eb56d9b7'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/prefect/runner/runner.py", line 843, in _submit_run_and_capture_errors
    status_code = await self._run_process(
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/prefect/runner/runner.py", line 510, in _run_process
    process = await run_process(
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/prefect/utilities/processutils.py", line 273, in run_process
    await consume_process_output(
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/prefect/utilities/processutils.py", line 293, in consume_process_output
    tg.start_soon(
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/prefect/utilities/processutils.py", line 305, in stream_text
    async for item in source:
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/anyio/abc/_streams.py", line 34, in __anext__
    return await self.receive()
  File "/Users/david/Library/Caches/pypoetry/virtualenvs/tir-VjyBWUYD-py3.9/lib/python3.9/site-packages/anyio/streams/text.py", line 46, in receive
    decoded = self._decoder.decode(chunk)
  File "/Users/david/.pyenv/versions/3.9.14/lib/python3.9/codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x98 in position 16073: invalid start byte
a
Thanks, that’s helpful! I think this error (the
feed_data
one) can happen anytime a stream closes. In this case, Prefect is running your flow in a subprocess and capturing the standard out/error streams from the process, which fails at some point — a little unclear why, but seemingly because the process emitted bytes in an encoding other than utf-8. So the two errors may not be related, but the utf-8 one could just be that your flow run prints a byte string with non-utf-8 bytes, maybe that flow scraped from a website per your description of the code, and then when we’re monitoring the run in the subprocess and decoding the output stream, we hit these bytes and can’t decode them into utf-8, so you get the utf-8 error.
d
Wondering if the cause is the CPU architecture here. The issue occurred when I tested on Apple silicon. When I migrated the deployment to an x86 VM, I wasn’t able to reproduce the issue so far.