Devin Flake
09/24/2025, 5:26 PMMarvin
09/24/2025, 5:26 PMMarvin
09/24/2025, 5:34 PMPREFECT_API_URL
and is at /logs/out
- You authenticate with an Authorization header:
- Prefect Cloud: Authorization: Bearer <PREFECT_API_KEY>
- Prefect Server (OSS):
- If auth is disabled: no header
- If PREFECT_API_AUTH_STRING
is set: Authorization: <that string>
(no Bearer)
How to form the WebSocket URL
- Take your PREFECT_API_URL
and swap http(s) -> ws(s) then append /logs/out
- Cloud example:
- PREFECT_API_URL=<https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>>
- WebSocket: <wss://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>/logs/out>
- Server example:
- PREFECT_API_URL=<http://127.0.0.1:4200/api>
- WebSocket: <ws://127.0.0.1:4200/api/logs/out>
If you want Prefect to compute this for you:
from prefect.logging.clients import logs_out_socket_from_api_url
print(logs_out_socket_from_api_url("<https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>"))
# -> <wss://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>/logs/out>
Connect with wscat
- Prefect Cloud
wscat -c "<wss://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>/logs/out>" \
-H "Authorization: Bearer <PREFECT_API_KEY>"
- Prefect Server (no auth)
wscat -c "<ws://127.0.0.1:4200/api/logs/out>"
- Prefect Server (with PREFECT_API_AUTH_STRING)
wscat -c "<ws://127.0.0.1:4200/api/logs/out>" \
-H "Authorization: <your PREFECT_API_AUTH_STRING>"
Connect with Python (websockets)
import os, asyncio, json, websockets
API_URL = os.environ.get("PREFECT_API_URL") # e.g. <https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>
WS_URL = API_URL.replace("https://", "wss://").replace("http://", "ws://").rstrip("/") + "/logs/out"
headers = {}
api_key = os.environ.get("PREFECT_API_KEY")
auth_string = os.environ.get("PREFECT_API_AUTH_STRING")
# Cloud: Bearer token; Server with auth string: raw string; Server without auth: no header
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
elif auth_string:
headers["Authorization"] = auth_string
async def main():
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
# If you want to filter, you can send a JSON filter message first (see note below)
# await ws.send(json.dumps({"filter": { ... }}))
async for msg in ws:
# Messages are JSON-encoded logs
print(msg) # or: print(json.loads(msg))
asyncio.run(main())
About filtering
- The server supports filtering logs (e.g., by flow_run_id, task_run_id, level).
- The exact filter shape for the WebSocket matches the Prefect LogFilter models. If you want to avoid guessing the JSON, the simplest path is to use Prefect’s subscriber client which handles the handshake for you:
from prefect.logging.clients import get_logs_subscriber
sub = get_logs_subscriber() # optionally pass a LogFilter
async def watch():
async for log in sub:
print(log)
- If you want to send a manual filter over the socket, send a JSON message immediately after connecting that describes your filter. For example, to only see logs for one flow run and INFO+:
{
"filter": {
"flow_runs": {
"id": {
"any_": [
"<flow_run_id>"
]
}
},
"logs": {
"level": {
"ge": 20
}
}
}
}
Note: the exact filter schema mirrors the REST filters for logs. If you want, I can generate a precise JSON for your use case.
What raw messages look like
- Frames are JSON-encoded log records. A typical payload includes:
{
"id": "9f5b7a43-7b2d-4f33-9a1a-2f30d9c8f6d3",
"timestamp": "2025-01-02T15:04:05.123456+00:00",
"level": 20,
"name": "prefect.flow_runs",
"message": "Flow run started",
"flow_run_id": "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4",
"task_run_id": null
}
Helpful docsMarvin
09/24/2025, 5:34 PMDevin Flake
09/24/2025, 7:14 PMFile "/home/dflake/adobe/git_repos/ARTE/arte/prefect_ws_log_test.py", line 9, in main
async with websockets.connect(WS_URL) as ws:
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 635, in __aenter__
return await self
^^^^^^^^^^
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 656, in __await_impl__
await protocol.handshake(
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 322, in handshake
raise InvalidStatusCode(status_code, response_headers)
websockets.legacy.exceptions.InvalidStatusCode: server rejected WebSocket connection: HTTP 403
Devin Flake
09/24/2025, 7:23 PMPrefect is giving me the following error:
File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_ws_log_test.py", line 9, in main
async with websockets.connect(WS_URL) as ws:
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 635, in __aenter__
return await self
^^^^^^^^^^
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 656, in __await_impl__
await protocol.handshake(
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 322, in handshake
raise InvalidStatusCode(status_code, response_headers)
websockets.legacy.exceptions.InvalidStatusCode: server rejected WebSocket connection: HTTP 403
Marvin
09/24/2025, 7:23 PMMarvin
09/24/2025, 7:24 PMwscat -c "<ws://127.0.0.1:4200/api/logs/out>"
2) Set an Origin header if the server requires it
Some deployments will reject sockets missing an Origin or with a mismatched Origin. Try adding an Origin matching your server URL:
- wscat
wscat -c "<ws://127.0.0.1:4200/api/logs/out>" -H "Origin: <http://127.0.0.1:4200>"
- Python websockets
import asyncio, websockets
WS_URL = "<ws://127.0.0.1:4200/api/logs/out>"
async def main():
async with websockets.connect(WS_URL, origin="<http://127.0.0.1:4200>") as ws:
async for msg in ws:
print(msg)
asyncio.run(main())
3) Reverse proxy not forwarding WebSockets
If you’re going through a proxy (Nginx, Traefik, ELB/ALB), it must allow WebSocket upgrades. For Nginx, for example:
location / {
proxy_pass <http://prefect-server:4200;>
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
4) Scheme mismatch (wss vs ws)
- If your server is plain HTTP on 4200, use ws:// not wss://.
- Only use wss:// if you’ve terminated TLS on the same host you’re connecting to.
5) No auth headers are needed (if you truly have no auth)
- Ensure you’re not setting PREFECT_API_AUTH_STRING accidentally.
- If you are using an auth string, then add it as:
- Authorization: <PREFECT_API_AUTH_STRING>
Minimal working example (no auth, OSS Server)
import asyncio, websockets
WS_URL = "<ws://127.0.0.1:4200/api/logs/out>"
async def main():
async with websockets.connect(WS_URL, origin="<http://127.0.0.1:4200>") as ws:
async for msg in ws:
print(msg)
asyncio.run(main())
If it still 403s, can you share:
- The exact WS_URL you’re using
- Whether you’re connecting directly to the Server or through a proxy
- Your Prefect version and server logs around the time of the 403 (the server often logs why it rejected the socket)
References
- Logging clients (subscriber helpers): prefect.logging.clients
- Subscriptions utilities: prefect.server.utilities.subscriptions
- Security settings (for Server auth/headers): Security settingsDevin Flake
09/24/2025, 7:29 PM"auth_string": null
Marvin
09/24/2025, 7:29 PMMarvin
09/24/2025, 7:29 PM<ws://127.0.0.1:4200/api/logs/out>
If you hit ws://127.0.0.1:4200/logs/out (without /api), you’ll be rejected by the UI app.
2) Add an Origin header
Some deployments require a matching Origin. Try:
- wscat
wscat -c "<ws://127.0.0.1:4200/api/logs/out>" -H "Origin: <http://127.0.0.1:4200>"
- Python
import asyncio, websockets
WS_URL = "<ws://127.0.0.1:4200/api/logs/out>"
async def main():
async with websockets.connect(WS_URL, origin="<http://127.0.0.1:4200>") as ws:
async for msg in ws:
print(msg)
asyncio.run(main())
3) If you’re behind a reverse proxy (Nginx/Traefik/ALB)
Ensure upgrade headers are forwarded. For Nginx:
location / {
proxy_pass <http://prefect-server:4200;>
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
4) Scheme must match your deployment
- Plain HTTP on port 4200 → use ws://
- Only use wss:// if TLS is actually terminated at that host/port
Sanity checks
- Confirm your API URL (should end with /api):
from prefect.settings import get_current_settings
print(get_current_settings().PREFECT_API_URL)
# Expect: <http://127.0.0.1:4200/api> (or your host)/api
- Let Prefect compute the WS URL for you (to avoid typos):
from prefect.logging.clients import logs_out_socket_from_api_url
print(logs_out_socket_from_api_url("<http://127.0.0.1:4200/api>"))
# -> <ws://127.0.0.1:4200/api/logs/out>
- Control test using Prefect’s subscriber (no auth needed if server is open):
import asyncio
from prefect.logging.clients import get_logs_subscriber
async def main():
sub = get_logs_subscriber()
async for log in sub:
print(log)
asyncio.run(main())
If this works but your manual socket still 403s, it’s definitely path/origin/proxy-related.
If the 403 persists, can you share:
- The exact WS_URL
you’re using
- Whether you’re connecting directly or via a proxy/load balancer
- A snippet of server logs around the 403 (they often log the reason)
Refs
- prefect.logging.clients
- prefect.server.utilities.subscriptions
- Security settingsDevin Flake
09/24/2025, 7:46 PMMarvin
09/24/2025, 7:46 PMMarvin
09/24/2025, 7:48 PMcurl -s -X POST "<http://127.0.0.1:4200/api/logs/filter>" \
-H "Content-Type: application/json" \
-d '{
"filter": {
"flow_runs": { "id": { "any_": ["<FLOW_RUN_ID>"] } }
},
"limit": 1000
}'
Example: Python (httpx)
import httpx
API = "<http://127.0.0.1:4200/api>"
payload = {
"filter": {
"flow_runs": { "id": { "any_": ["<FLOW_RUN_ID>"] } }
},
"limit": 100
}
r = <http://httpx.post|httpx.post>(f"{API}/logs/filter", json=payload, timeout=30)
r.raise_for_status()
for log in r.json():
print(log)
Docs: Read logs (REST)
2) Download logs for a specific flow run
- There’s also a flow-run–scoped “download logs” endpoint if you want all logs for one run in bulk. See Flow Runs API in the docs (look for “download logs”): Flow Runs API
What the records look like
- The REST response returns the same log objects you’d see over the WebSocket, e.g.:
{
"id": "9f5b7a43-7b2d-4f33-9a1a-2f30d9c8f6d3",
"timestamp": "2025-01-02T15:04:05.123456+00:00",
"level": 20,
"name": "prefect.flow_runs",
"message": "Flow run started",
"flow_run_id": "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4",
"task_run_id": null
}
Notes
- Pagination: use the limit
parameter; some versions also support offset
.
- Filtering: you can filter by flow_run_id, task_run_id, level, timestamp, etc., via the FilterSet body. The docs page above shows the full filter schema.
- Auth: For your OSS Server with "auth_string": null
, no Authorization header is needed. For Cloud, add Authorization: Bearer <PREFECT_API_KEY>
.Devin Flake
09/24/2025, 8:27 PMMarvin
09/24/2025, 8:27 PMMarvin
09/24/2025, 8:28 PMimport time
import httpx
from datetime import datetime, timezone
API = "<http://127.0.0.1:4200/api>"
last_ts = datetime.now(timezone.utc).isoformat()
while True:
payload = {
"filter": {
# Optional: scope to a flow run to reduce noise
# "flow_runs": {"id": {"any_": ["<FLOW_RUN_ID>"]}},
"logs": {
"timestamp": {"ge": last_ts}
}
},
"sort": "TIMESTAMP_ASC",
"limit": 1000
}
r = <http://httpx.post|httpx.post>(f"{API}/logs/filter", json=payload, timeout=30)
r.raise_for_status()
logs = r.json()
if logs:
for log in logs:
print(log) # raw JSON records
# advance the cursor to the last seen timestamp
last_ts = logs[-1]["timestamp"]
time.sleep(1) # adjust to your needs
Example curl (single page)
curl -s -X POST "<http://127.0.0.1:4200/api/logs/filter>" \
-H "Content-Type: application/json" \
-d '{
"filter": {
"logs": { "timestamp": { "ge": "2025-01-02T15:00:00Z" } }
},
"sort": "TIMESTAMP_ASC",
"limit": 1000
}'
Docs:
- Read logs (REST): Read logs (REST)
Option B: Bypass the proxy for WebSockets
- WebSockets require the HTTP Upgrade; there’s no “straight WS” without it. If the proxy can’t support upgrades, you’ll need to avoid it:
- Connect directly to the Prefect Server host/port (e.g., ws://HOST:4200/api/logs/out)
- Or use an SSH tunnel:
- ssh -L 14200127.0.0.14200 user@server
- Then connect to ws://127.0.0.1:14200/api/logs/out
Quick clarification: did you mean “without WebSockets” (HTTP-only), or “bypass the proxy to use WebSockets directly”? If HTTP-only works for you, I can tailor the polling filter (flow_run_id/task_run_id/level) to your needs. If you want to use WS directly, I can help with a tunnel or a minimal proxy rule for just /api/logs/out.Devin Flake
09/24/2025, 8:34 PMMarvin
09/24/2025, 8:34 PMMarvin
09/24/2025, 8:35 PMimport time
import httpx
import json
from datetime import datetime, timezone
API = "<http://127.0.0.1:4200/api>" # your Prefect Server API base
POLL_SECONDS = 1
LIMIT = 1000
# Optional: restrict to a specific flow run to reduce noise
FLOW_RUN_ID = None # e.g., "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4"
# Start the cursor at "now" so we only see new logs
cursor_ts = datetime.now(timezone.utc).isoformat()
# Keep a small rolling set of seen IDs to de-dupe when multiple logs share the same timestamp
seen_ids = set()
SEEN_MAX = 5000
def build_payload(cursor):
filter_obj = {
"logs": {"timestamp": {"ge": cursor}},
# Optional: only INFO and above
# "logs": {"timestamp": {"ge": cursor}, "level": {"ge": 20}},
}
if FLOW_RUN_ID:
filter_obj["flow_runs"] = {"id": {"any_": [FLOW_RUN_ID]}}
payload = {
"filter": filter_obj,
# Ask for ascending timestamps so we can advance the cursor safely
"sort": "TIMESTAMP_ASC",
"limit": LIMIT,
}
return payload
with httpx.Client(timeout=30) as client:
while True:
payload = build_payload(cursor_ts)
r = <http://client.post|client.post>(f"{API}/logs/filter", json=payload)
r.raise_for_status()
logs = r.json()
if logs:
for log in logs:
# De-duplicate by ID (helps when multiple logs share the same timestamp)
if log["id"] in seen_ids:
continue
print(json.dumps(log)) # raw JSON record
seen_ids.add(log["id"])
if len(seen_ids) > SEEN_MAX:
# prevent unbounded growth
seen_ids = set(list(seen_ids)[-SEEN_MAX:])
# Advance the cursor to the last timestamp we received
cursor_ts = logs[-1]["timestamp"]
time.sleep(POLL_SECONDS)
Minimal curl example (single page)
curl -s -X POST "<http://127.0.0.1:4200/api/logs/filter>" \
-H "Content-Type: application/json" \
-d '{
"filter": {
"logs": { "timestamp": { "ge": "2025-01-02T15:00:00Z" } }
},
"sort": "TIMESTAMP_ASC",
"limit": 1000
}'
Notes and tips
- To scope to a single run, add:
- "flow_runs": { "id": { "any_": ["<FLOW_RUN_ID>"] } }
- To filter by level:
- "logs": { "level": { "ge": 20 } }
(20=INFO, 30=WARNING, 40=ERROR)
- If you get a 422 from the API about sorting, remove the sort or adjust according to the server’s LogSort enum.
- For large volumes, reduce POLL_SECONDS
and tune LIMIT
.
Docs
- Read logs (REST): Read logs (REST)
If you share your exact filtering needs (flow/task run IDs, minimum level), I can tailor the payload for you.Devin Flake
09/24/2025, 9:03 PMTraceback (most recent call last):
File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_http_log_test.py", line 40, in <module>
r.raise_for_status()
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/httpx/_models.py", line 829, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Redirect response '301 Moved Permanently' for url '<http://workflows.arte.adobe.net/api/logs/filter>'
Redirect location: '<https://workflows.arte.adobe.net/api/logs/filter>'
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/301>
Marvin
09/24/2025, 9:03 PMMarvin
09/24/2025, 9:04 PMimport time
import httpx
import json
from datetime import datetime, timezone
API = "<https://workflows.arte.adobe.net/api>" # use https
POLL_SECONDS = 1
LIMIT = 1000
# If you have a corporate/self-signed CA, point to it:
# VERIFY = "/path/to/ca-bundle.pem"
VERIFY = True # or False temporarily (not recommended for prod)
cursor_ts = datetime.now(timezone.utc).isoformat()
seen_ids = set()
SEEN_MAX = 5000
def build_payload(cursor):
return {
"filter": {
"logs": {"timestamp": {"ge": cursor}},
# Example: restrict to a flow run or set min level
# "flow_runs": {"id": {"any_": ["<FLOW_RUN_ID>"]}},
# "logs": {"timestamp": {"ge": cursor}, "level": {"ge": 20}}
},
"sort": "TIMESTAMP_ASC",
"limit": LIMIT,
}
with httpx.Client(timeout=30, verify=VERIFY) as client:
while True:
r = <http://client.post|client.post>(f"{API}/logs/filter", json=build_payload(cursor_ts))
# If you still see a redirect, print r.status_code and r.headers to debug
r.raise_for_status()
logs = r.json()
if logs:
for log in logs:
if log["id"] in seen_ids:
continue
print(json.dumps(log))
seen_ids.add(log["id"])
if len(seen_ids) > SEEN_MAX:
seen_ids = set(list(seen_ids)[-SEEN_MAX:])
cursor_ts = logs[-1]["timestamp"]
time.sleep(POLL_SECONDS)
Quick curl check
curl -s -X POST "<https://workflows.arte.adobe.net/api/logs/filter>" \
-H "Content-Type: application/json" \
-d '{"filter":{"logs":{"timestamp":{"ge":"2025-01-02T15:00:00Z"}}},"sort":"TIMESTAMP_ASC","limit":1000}'
Notes
- Avoid follow_redirects=True for POST here; a 301/302 may change POST to GET. It’s safer to target HTTPS directly.
- If you hit TLS errors:
- Provide your CA bundle path via httpx.Client(verify="/path/to/ca.pem"), or
- Temporarily set verify=False (not recommended for production).
If you share your exact filtering needs (flow/task run IDs or min level), I can tailor the payload.Devin Flake
09/24/2025, 9:10 PMTraceback (most recent call last):
File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_http_log_test.py", line 39, in <module>
r.raise_for_status()
File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/httpx/_models.py", line 829, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '422 Unprocessable Entity' for url '<https://workflows.arte.adobe.net/api/logs/filter>'
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
Marvin
09/24/2025, 9:10 PMMarvin
09/24/2025, 9:12 PMlimit
, offset
, sort
- A logs
object containing a LogFilter
- Filter fields use suffixed operators, e.g. `after_`/`before_`, `ge_`/`le_`, and any_
Correct HTTP “tail” example (HTTPS)
import time
import httpx
import json
from datetime import datetime, timezone
API = "<https://workflows.arte.adobe.net/api>"
POLL_SECONDS = 1
LIMIT = 1000
# Optional: scope to a flow run
FLOW_RUN_ID = None # e.g., "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4"
# If using a custom CA bundle, set verify to its path; otherwise keep True
VERIFY = True
cursor_ts = datetime.now(timezone.utc).isoformat()
seen_ids = set()
SEEN_MAX = 5000
def build_payload(cursor):
lf = {
"timestamp": {"after_": cursor}, # use after_/before_
# "level": {"ge_": 20}, # optional: INFO and above
}
if FLOW_RUN_ID:
lf["flow_run_id"] = {"any_": [FLOW_RUN_ID]} # use any_ for IDs
return {
"logs": lf,
"limit": LIMIT,
# sort is optional; TIMESTAMP_ASC is typical—remove if your server complains
"sort": "TIMESTAMP_ASC",
}
with httpx.Client(timeout=30, verify=VERIFY) as client:
while True:
r = <http://client.post|client.post>(f"{API}/logs/filter", json=build_payload(cursor_ts))
r.raise_for_status()
logs = r.json()
if logs:
for log in logs:
if log["id"] in seen_ids:
continue
print(json.dumps(log))
seen_ids.add(log["id"])
if len(seen_ids) > SEEN_MAX:
seen_ids = set(list(seen_ids)[-SEEN_MAX:])
cursor_ts = logs[-1]["timestamp"]
time.sleep(POLL_SECONDS)
Single-page curl with the correct filter structure
curl -s -X POST "<https://workflows.arte.adobe.net/api/logs/filter>" \
-H "Content-Type: application/json" \
-d '{
"logs": {
"timestamp": { "after_": "2025-01-02T15:00:00Z" },
"level": { "ge_": 20 },
"flow_run_id": { "any_": ["<FLOW_RUN_ID>"] }
},
"sort": "TIMESTAMP_ASC",
"limit": 1000
}'
Key differences from your previous payload
- No outer filter
object; use logs
for LogFilter
- Use operator-suffixed fields:
- Timestamp: {"after_": "...", "before_": "..."}
(not `ge`/`le`)
- Level: {"ge_": 20}
(not ge
)
- IDs: {"any_": ["id1","id2"]}
- sort
is typically `"TIMESTAMP_ASC"`; if you still see 422, try omitting sort
.
If you share the exact filter you need (e.g., flow_run_id and minimum level), I can provide a narrowed payload tailored to your case.