Hi all, I’m trying to set up a prefect log subscriber to demo real-time logging from my prefect server from another client.
This is the simple implementation I did for the client:
"""Stream ALL Prefect logs - no filtering""" import asyncio import os from prefect.logging.clients import get_logs_subscriber
async def stream_all_logs(): os.environ["PREFECT_API_URL"] = "
http://10.0.0.81/prefect/server/api" os.environ["PREFECT_API_AUTH_STRING"] = "admin:your-secure-password" print("Connecting to Prefect logs...") print("Streaming ALL logs (no filter)...\n")
# Pass NO filter at all - just get everything async with get_logs_subscriber() as subscriber: print("Connected! Waiting for logs...\n") count = 0 async for log in subscriber: count += 1 # Simple output ts = log.timestamp.strftime("%H:%M:%S.%f")[:-3] level = (log.level_name or "INFO")[:5] msg = log.message[:100] print(f"[{ts}] {level:5} | {msg}") if count % 20 == 0: print(f"\n {count} logs...\n")
if
name == "__main__": try: asyncio.run(stream_all_logs()) except KeyboardInterrupt: print("\n Stopped")
And I keep getting this error:
_ Connecting to Prefect logs... _ Streaming ALL logs (no filter)...
__ Connected! Waiting for logs...
Traceback (most recent call last): File "/code/constellation/test_prefect_streaming.py", line 35, in <module> asyncio.run(stream_all_logs()) File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/code/constellation/test_prefect_streaming.py", line 19, in stream_all_logs async for log in subscriber: File "/usr/local/lib/python3.10/site-packages/prefect/logging/clients.py", line 288, in anext message = orjson.loads(await self._websocket.recv()) File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/connection.py", line 322, in recv raise self.protocol.close_exc from self.recv_exc websockets.exceptions.ConnectionClosedError: no close frame received or sent
Any ideas what I’m doing wrong?