Devin Flake
09/16/2025, 4:59 PMMarvin
09/16/2025, 4:59 PMMarvin
09/16/2025, 5:01 PMprefect cloud login
or set PREFECT_API_KEY
and PREFECT_API_URL
- Prefect Server (OSS): set PREFECT_API_URL
(e.g. `http://127.0.0.1:4200/api`); if you’ve enabled auth, set PREFECT_API_AUTH_STRING
(e.g. admin:password
)
- Stream all logs (workspace-wide)
import asyncio
from prefect.logging.clients import get_logs_subscriber
async def main():
async with get_logs_subscriber() as sub:
async for log in sub:
# log is a Prefect Log model; print what you need
print(f"[{log.level_name}] {log.timestamp} {log.message}")
asyncio.run(main())
- Stream logs for a specific flow run (or task run) with filters
import asyncio
from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterTaskRunId, LogFilterLevel
FLOW_RUN_ID = "00000000-0000-0000-0000-000000000000" # replace me
log_filter = LogFilter(
flow_run_id=LogFilterFlowRunId(any_=[FLOW_RUN_ID]),
# optionally filter by level, e.g. only INFO and above
level=LogFilterLevel(any_=["INFO", "WARNING", "ERROR", "CRITICAL"]),
# or by a task run id:
# task_run_id=LogFilterTaskRunId(any_=[TASK_RUN_ID]),
)
async def main():
async with get_logs_subscriber(filter=log_filter) as sub:
async for log in sub:
print(f"[{log.level_name}] {log.timestamp} {log.message}")
asyncio.run(main())
Notes
- The subscriber uses WebSockets under the hood and supports automatic reconnection (reconnection_attempts
).
- For Cloud, your API key is used to authenticate the WS connection; for Server, PREFECT_API_AUTH_STRING
enables basic auth when configured.
- If you really need the raw WS URL, Prefect includes a helper to convert your API URL to the logs-out WebSocket endpoint:
from prefect.logging.clients import logs_out_socket_from_api_url
ws_url = logs_out_socket_from_api_url("<https://api.prefect.cloud/api>") # or your server API URL
print(ws_url)
However, using get_logs_subscriber()
is preferred so you don’t have to implement the WS protocol, filtering payloads, or auth yourself.
Docs
- Logging clients API: prefect.logging.clients
- Security/auth settings (Server): Security settings
Would you like an example that targets Prefect Cloud vs. OSS specifically, or do you need to connect via a raw WebSocket client instead of the Prefect helper?Devin Flake
09/16/2025, 5:06 PMMarvin
09/16/2025, 5:06 PMMarvin
09/16/2025, 5:07 PMget_logs_subscriber()
under the hood, so you don’t have to manage raw WS auth/protocol yourself.
High-level flow
- Prefect Server runs locally and exposes the API at something like http://127.0.0.1:4200/api
- Your Django ASGI app (with Channels) opens a Prefect logs subscription via get_logs_subscriber()
- The Django WebSocket consumer forwards each Prefect log to the connected browser clients
1) Start Prefect Server locally
- In a terminal:
prefect server start
- Default API is typically http://127.0.0.1:4200/api
Note: If you turn on server auth, set PREFECT_API_AUTH_STRING=admin:password
in your Django environment.
2) Configure Django + Channels
Install deps
pip install "prefect>=3" django channels
# optional if you want a production WS layer
# pip install channels-redis
settings.py (key bits)
INSTALLED_APPS = [
# ...
"channels",
"app", # your Django app that will hold the consumer below
]
ASGI_APPLICATION = "myproj.asgi.application"
# If you need a channel layer (e.g., multiple workers), configure Redis:
# CHANNEL_LAYERS = {
# "default": {
# "BACKEND": "channels_redis.core.RedisChannelLayer",
# "CONFIG": {"hosts": [("127.0.0.1", 6379)]},
# }
# }
# Make sure your Django process has Prefect API context
# e.g. via environment variables or .env:
# PREFECT_API_URL=<http://127.0.0.1:4200/api>
# If server auth is enabled:
# PREFECT_API_AUTH_STRING=admin:password
asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from app.routing import websocket_urlpatterns
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproj.settings")
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(
URLRouter(websocket_urlpatterns)
),
})
app/routing.py
from django.urls import re_path
from .consumers import PrefectLogsConsumer
websocket_urlpatterns = [
re_path(r"^ws/prefect/logs$", PrefectLogsConsumer.as_asgi()),
]
3) WebSocket consumer: subscribe to Prefect logs and forward to client
app/consumers.py
```
import asyncio
import urllib.parse
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import (
LogFilter,
LogFilterFlowRunId,
LogFilterTaskRunId,
LogFilterLevel,
)
class PrefectLogsConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
# Kick off background task to stream Prefect logs
self._stream_task = asyncio.create_task(self.stream_logs())
async def disconnect(self, code):
# Clean up background task
task = getattr(self, "_stream_task", None)
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def stream_logs(self):
# Parse optional query params (e.g., ?flow_run_id=...&level=INFO&level=ERROR)
qs = self.scope.get("query_string", b"").decode()
params = urllib.parse.parse_qs(qs)
flow_run_id = params.get("flow_run_id", [None])[0]
task_run_id = params.get("task_run_id", [None])[0]
levels = params.get("level", None) # e.g., ["INFO","ERROR"]
log_filter = None
if flow_run_id or task_run_id or levels:
filter_kwargs = {}
if flow_run_id:
filter_kwargs["flow_run_id"] = LogFilterFlowRunId(any_=[flow_run_id])
if task_run_id:
filter_kwargs["task_run_id"] = LogFilterTaskRunId(any_=[task_run_id])
if levels:Marvin
09/16/2025, 5:07 PM4) Run the Django app
- Use an ASGI server:
uvicorn myproj.asgi:application --reload
5) Connect from the browser
- Example: filter to a single flow run and show only INFO+ERROR
const ws = new WebSocket("ws://localhost:8000/ws/prefect/logs?flow_run_id=<your-flow-run-id>&level=INFO&level=ERROR");
ws.onmessage = (evt) => {
const data = JSON.parse(evt.data);
console.log([${data.level}] ${data.timestamp} ${data.message}
);
};
ws.onclose = () => console.log("WS closed");
ws.onerror = (e) => console.error("WS error", e);
Notes and tips
- Prefect API context:
- For local server, set `PREFECT_API_URL=<http://127.0.0.1:4200/api>` in the Django process environment.
- If you enable server auth, also set `PREFECT_API_AUTH_STRING=admin:password`.
- Filtering:
- Use query params to pass `flow_run_id`, `task_run_id`, and repeated `level` parameters to your consumer. The consumer maps those into a `LogFilter`.
- Reconnection:
- `get_logs_subscriber` handles reconnects. You can tune `reconnection_attempts`.
- If you prefer raw WS endpoints:
- You can derive the Prefect logs WS URL with:
from prefect.logging.clients import logs_out_socket_from_api_url
ws_url = logs_out_socket_from_api_url("http://127.0.0.1:4200/api")
print(ws_url)
```
- But using get_logs_subscriber()
is recommended — it handles auth/filters/protocol for you.
References
- Logging clients API: prefect.logging.clients
- Security/auth for Prefect Server: Security settings
- Django Channels: Django Channels docs
Want me to adapt this to:
- Server-Sent Events (SSE) instead of WebSockets, or
- A specific deployment filter (e.g., only logs from a named deployment), or
- Prefect Cloud instead of a local server?