https://prefect.io logo
m

Moe

08/23/2023, 1:18 PM
@Marvin I am seeing these error messages pop up in terminal when running my flow with subflows. However, there is no error in the logs in UI and flow completes successfully. What could be the reason?
Copy code
| ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed to process item (Event(occurred=DateTime(2023, 8, 23, 13, 15, 6, 960254, tzinfo=Timezone('+00:00')), event='prefect.task-run.Running', resource=Resource(__root__={'prefect.resource.id': 'prefect.task-run.b353993f-c297-4745-941f-0ed69a89942c', 'prefect.resource.name': 'save_to_stg-0', 'prefect.state-message': '', 'prefect.state-name': 'Running', 'prefect.state-timestamp': '2023-08-23T13:15:06.960254+00:00', 'prefect.state-type': 'RUNNING'}), related=[RelatedResource(__root__={'prefect.resource.id': 'prefect.flow-run.5ef22b18-2b29-43d2-8acd-fd0a21a2b580', 'prefect.resource.role': 'flow-run', 'prefect.resource.name': 'electric-chowchow'}), RelatedResource(__root__={'prefect.resource.id': 'prefect.flow.a10e3692-66e9-436d-9abc-0a1e0a92dd75', 'prefect.resource.role': 'flow', 'prefect.resource.name': 'wd-okta-main-flow'})], payload={'intended': {'from': 'PENDING', 'to': 'RUNNING'}, 'initial_state': {'type': 'PENDING', 'name': 'Pending', 'message': ''}, 'validated_state': {'type': 'RUNNING', 'name': 'Running', 'message': ''}}, id=UUID('18b7d97d-f796-4e95-93fc-612b2cf0ec1b'), follows=UUID('8f3fb9d2-eb1c-49e2-a092-8c97203af4a5')), <_contextvars.Context object at 0x000002573D64D080>)
Traceback (most recent call last):
  File "C:\Users\maung\AppData\Roaming\Python\Python310\site-packages\prefect\_internal\concurrency\services.py", line 150, in _main_loop
    await self._handle(item)
  File "C:\Users\maung\AppData\Roaming\Python\Python310\site-packages\prefect\events\worker.py", line 41, in _handle    
    await self._client.emit(event)
  File "C:\Users\maung\AppData\Roaming\Python\Python310\site-packages\prefect\events\clients.py", line 22, in emit      
    return await self._emit(event)
  File "C:\Users\maung\AppData\Roaming\Python\Python310\site-packages\prefect\events\clients.py", line 172, in _emit    
    assert self._websocket
AssertionError
m

Marvin

08/23/2023, 1:18 PM
Oh, the joys of debugging. It seems like you're having an issue with Prefect's event system. The error message suggests that the event system is trying to emit an event, but the websocket connection it's trying to use isn't available. It's a classic case of trying to talk when there's nobody to listen. This could be due to a number of reasons. It might be a temporary network issue, or it could be a bug in the Prefect code itself. It's also possible that there's some sort of configuration issue with your setup. The fact that your flow completes successfully and there's no error in the UI logs suggests that this might not be a critical issue. It could be that the event system is trying to emit some sort of optional event that isn't strictly necessary for the execution of your flow. However, I'd recommend reaching out to the Prefect community or support for further assistance. They might be able to provide more insight into what's going on.
m

Moe

08/23/2023, 1:19 PM
@Jake Kaplan Do you have any insights?
j

Jake Kaplan

08/23/2023, 1:41 PM
hi, prefect events are sent over websockets. It won’t impact your flow runs at all if they can’t be sent, but it looks like your network configuration likely is not allowing a websocket connection?
m

Moe

08/23/2023, 2:06 PM
@Jake Kaplan So this message above was when I ran my flow locally. When I ran through deployment through EC2, in the UI logs I am seeing this other error show up although it says state is 'Completed'
Copy code
Finished in state Completed('All states completed.')
09:40:26 AM
prefect.flow_runs
An error occurred while monitoring flow run 'af6a2646-1e53-4781-80ed-f7061588feab'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\prefect\workers\base.py", line 834, in _submit_run_and_capture_errors
    result = await self.run(
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\prefect\workers\process.py", line 177, in run
    process = await run_process(
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\prefect\utilities\processutils.py", line 273, in run_process
    await consume_process_output(
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\prefect\utilities\processutils.py", line 287, in consume_process_output
    async with anyio.create_task_group() as tg:
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\anyio\_backends\_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\prefect\utilities\processutils.py", line 305, in stream_text
    async for item in source:
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\anyio\abc\_streams.py", line 34, in __anext__
    return await self.receive()
  File "C:\Users\rsam.DENTALCORP\AppData\Roaming\Python\Python310\site-packages\anyio\streams\text.py", line 46, in receive
    decoded = self._decoder.decode(chunk)
  File "C:\Program Files\Python310\lib\codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe9 in position 5394: invalid continuation byte