<@ULVA73B9P> how can I stream prefect v3 logs usin...
# ask-marvin
d
@Marvin how can I stream prefect v3 logs using websockets?
m
thought for 131.3 seconds
Great question. In Prefect 3.x, the recommended way to stream logs over WebSockets is to use the built-in logs subscriber, which handles the WS connection, auth, filtering, and reconnection for you. Here’s a quick start. - Prereqs - Set your API context: - Prefect Cloud: run
prefect cloud login
or set
PREFECT_API_KEY
and
PREFECT_API_URL
- Prefect Server (OSS): set
PREFECT_API_URL
(e.g. `http://127.0.0.1:4200/api`); if you’ve enabled auth, set
PREFECT_API_AUTH_STRING
(e.g.
admin:password
) - Stream all logs (workspace-wide)
Copy code
import asyncio
from prefect.logging.clients import get_logs_subscriber

async def main():
    async with get_logs_subscriber() as sub:
        async for log in sub:
            # log is a Prefect Log model; print what you need
            print(f"[{log.level_name}] {log.timestamp} {log.message}")

asyncio.run(main())
- Stream logs for a specific flow run (or task run) with filters
Copy code
import asyncio
from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterTaskRunId, LogFilterLevel

FLOW_RUN_ID = "00000000-0000-0000-0000-000000000000"   # replace me

log_filter = LogFilter(
    flow_run_id=LogFilterFlowRunId(any_=[FLOW_RUN_ID]),
    # optionally filter by level, e.g. only INFO and above
    level=LogFilterLevel(any_=["INFO", "WARNING", "ERROR", "CRITICAL"]),
    # or by a task run id:
    # task_run_id=LogFilterTaskRunId(any_=[TASK_RUN_ID]),
)

async def main():
    async with get_logs_subscriber(filter=log_filter) as sub:
        async for log in sub:
            print(f"[{log.level_name}] {log.timestamp} {log.message}")

asyncio.run(main())
Notes - The subscriber uses WebSockets under the hood and supports automatic reconnection (
reconnection_attempts
). - For Cloud, your API key is used to authenticate the WS connection; for Server,
PREFECT_API_AUTH_STRING
enables basic auth when configured. - If you really need the raw WS URL, Prefect includes a helper to convert your API URL to the logs-out WebSocket endpoint:
Copy code
from prefect.logging.clients import logs_out_socket_from_api_url
ws_url = logs_out_socket_from_api_url("<https://api.prefect.cloud/api>")  # or your server API URL
print(ws_url)
However, using
get_logs_subscriber()
is preferred so you don’t have to implement the WS protocol, filtering payloads, or auth yourself. Docs - Logging clients API: prefect.logging.clients - Security/auth settings (Server): Security settings Would you like an example that targets Prefect Cloud vs. OSS specifically, or do you need to connect via a raw WebSocket client instead of the Prefect helper?
d
@Marvin can you show me an example doing with with a local Prefect Server plus a Django app?
m
thought for 58.6 seconds
Absolutely — here’s a minimal, end-to-end example that streams Prefect 3 logs from a local Prefect Server into a Django app over WebSockets using Django Channels. It uses Prefect’s
get_logs_subscriber()
under the hood, so you don’t have to manage raw WS auth/protocol yourself. High-level flow - Prefect Server runs locally and exposes the API at something like http://127.0.0.1:4200/api - Your Django ASGI app (with Channels) opens a Prefect logs subscription via
get_logs_subscriber()
- The Django WebSocket consumer forwards each Prefect log to the connected browser clients 1) Start Prefect Server locally - In a terminal:
Copy code
prefect server start
- Default API is typically http://127.0.0.1:4200/api Note: If you turn on server auth, set
PREFECT_API_AUTH_STRING=admin:password
in your Django environment. 2) Configure Django + Channels Install deps
Copy code
pip install "prefect>=3" django channels
# optional if you want a production WS layer
# pip install channels-redis
settings.py (key bits)
Copy code
INSTALLED_APPS = [
    # ...
    "channels",
    "app",  # your Django app that will hold the consumer below
]

ASGI_APPLICATION = "myproj.asgi.application"

# If you need a channel layer (e.g., multiple workers), configure Redis:
# CHANNEL_LAYERS = {
#     "default": {
#         "BACKEND": "channels_redis.core.RedisChannelLayer",
#         "CONFIG": {"hosts": [("127.0.0.1", 6379)]},
#     }
# }

# Make sure your Django process has Prefect API context
# e.g. via environment variables or .env:
# PREFECT_API_URL=<http://127.0.0.1:4200/api>
# If server auth is enabled:
# PREFECT_API_AUTH_STRING=admin:password
asgi.py
Copy code
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from app.routing import websocket_urlpatterns

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproj.settings")

django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": AuthMiddlewareStack(
        URLRouter(websocket_urlpatterns)
    ),
})
app/routing.py
Copy code
from django.urls import re_path
from .consumers import PrefectLogsConsumer

websocket_urlpatterns = [
    re_path(r"^ws/prefect/logs$", PrefectLogsConsumer.as_asgi()),
]
3) WebSocket consumer: subscribe to Prefect logs and forward to client app/consumers.py ``` import asyncio import urllib.parse from channels.generic.websocket import AsyncJsonWebsocketConsumer from prefect.logging.clients import get_logs_subscriber from prefect.server.schemas.filters import ( LogFilter, LogFilterFlowRunId, LogFilterTaskRunId, LogFilterLevel, ) class PrefectLogsConsumer(AsyncJsonWebsocketConsumer): async def connect(self): await self.accept() # Kick off background task to stream Prefect logs self._stream_task = asyncio.create_task(self.stream_logs()) async def disconnect(self, code): # Clean up background task task = getattr(self, "_stream_task", None) if task: task.cancel() try: await task except asyncio.CancelledError: pass async def stream_logs(self): # Parse optional query params (e.g., ?flow_run_id=...&level=INFO&level=ERROR) qs = self.scope.get("query_string", b"").decode() params = urllib.parse.parse_qs(qs) flow_run_id = params.get("flow_run_id", [None])[0] task_run_id = params.get("task_run_id", [None])[0] levels = params.get("level", None) # e.g., ["INFO","ERROR"] log_filter = None if flow_run_id or task_run_id or levels: filter_kwargs = {} if flow_run_id: filter_kwargs["flow_run_id"] = LogFilterFlowRunId(any_=[flow_run_id]) if task_run_id: filter_kwargs["task_run_id"] = LogFilterTaskRunId(any_=[task_run_id]) if levels:
filter_kwargs["level"] = LogFilterLevel(any_=levels) log_filter = LogFilter(**filter_kwargs) try: async with get_logs_subscriber( filter=log_filter, reconnection_attempts=100, # auto-reconnect ) as sub: async for log in sub: # Safely extract fields that are commonly present on Prefect Log objects payload = { "timestamp": getattr(log, "timestamp", None), "level": getattr(log, "level_name", getattr(log, "level", None)), "message": getattr(log, "message", ""), "name": getattr(log, "name", None), "flow_run_id": str(getattr(log, "flow_run_id", "") or ""), "task_run_id": str(getattr(log, "task_run_id", "") or ""), } # Convert timestamp to ISO if it’s a datetime ts = payload["timestamp"] if hasattr(ts, "isoformat"): payload["timestamp"] = ts.isoformat() await self.send_json(payload) except asyncio.CancelledError: # expected on disconnect pass except Exception as exc: await self.send_json({"error": str(exc)})
Copy code
4) Run the Django app
- Use an ASGI server:
uvicorn myproj.asgi:application --reload
Copy code
5) Connect from the browser
- Example: filter to a single flow run and show only INFO+ERROR
const ws = new WebSocket("ws://localhost:8000/ws/prefect/logs?flow_run_id=&lt;your-flow-run-id&gt;&amp;level=INFO&amp;level=ERROR"); ws.onmessage = (evt) => { const data = JSON.parse(evt.data); console.log(
[${data.level}] ${data.timestamp} ${data.message}
); }; ws.onclose = () => console.log("WS closed"); ws.onerror = (e) => console.error("WS error", e);
Copy code
Notes and tips
- Prefect API context:
  - For local server, set `PREFECT_API_URL=<http://127.0.0.1:4200/api>` in the Django process environment.
  - If you enable server auth, also set `PREFECT_API_AUTH_STRING=admin:password`.
- Filtering:
  - Use query params to pass `flow_run_id`, `task_run_id`, and repeated `level` parameters to your consumer. The consumer maps those into a `LogFilter`.
- Reconnection:
  - `get_logs_subscriber` handles reconnects. You can tune `reconnection_attempts`.
- If you prefer raw WS endpoints:
  - You can derive the Prefect logs WS URL with:
from prefect.logging.clients import logs_out_socket_from_api_url ws_url = logs_out_socket_from_api_url("http://127.0.0.1:4200/api") print(ws_url) ``` - But using
get_logs_subscriber()
is recommended — it handles auth/filters/protocol for you. References - Logging clients API: prefect.logging.clients - Security/auth for Prefect Server: Security settings - Django Channels: Django Channels docs Want me to adapt this to: - Server-Sent Events (SSE) instead of WebSockets, or - A specific deployment filter (e.g., only logs from a named deployment), or - Prefect Cloud instead of a local server?