<@ULVA73B9P> show me how to manually open a WebSoc...
# ask-marvin
d
@Marvin show me how to manually open a WebSocket connection to the get_logs_subscriber endpoint so I can see what the raw log messages look like
m
thought for 481.6 seconds
Great question — here’s how to connect directly to the logs WebSocket so you can inspect the raw frames. Assumptions - Prefect 3.x - The logs WebSocket endpoint is derived from your
PREFECT_API_URL
and is at
/logs/out
- You authenticate with an Authorization header: - Prefect Cloud:
Authorization: Bearer <PREFECT_API_KEY>
- Prefect Server (OSS): - If auth is disabled: no header - If
PREFECT_API_AUTH_STRING
is set:
Authorization: <that string>
(no Bearer) How to form the WebSocket URL - Take your
PREFECT_API_URL
and swap http(s) -> ws(s) then append
/logs/out
- Cloud example: -
PREFECT_API_URL=<https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>>
- WebSocket:
<wss://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>/logs/out>
- Server example: -
PREFECT_API_URL=<http://127.0.0.1:4200/api>
- WebSocket:
<ws://127.0.0.1:4200/api/logs/out>
If you want Prefect to compute this for you:
Copy code
from prefect.logging.clients import logs_out_socket_from_api_url
print(logs_out_socket_from_api_url("<https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>"))
# -> <wss://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>/logs/out>
Connect with wscat - Prefect Cloud
Copy code
wscat -c "<wss://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>/logs/out>" \
  -H "Authorization: Bearer <PREFECT_API_KEY>"
- Prefect Server (no auth)
Copy code
wscat -c "<ws://127.0.0.1:4200/api/logs/out>"
- Prefect Server (with PREFECT_API_AUTH_STRING)
Copy code
wscat -c "<ws://127.0.0.1:4200/api/logs/out>" \
  -H "Authorization: <your PREFECT_API_AUTH_STRING>"
Connect with Python (websockets)
Copy code
import os, asyncio, json, websockets

API_URL = os.environ.get("PREFECT_API_URL")  # e.g. <https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>
WS_URL = API_URL.replace("https://", "wss://").replace("http://", "ws://").rstrip("/") + "/logs/out"

headers = {}
api_key = os.environ.get("PREFECT_API_KEY")
auth_string = os.environ.get("PREFECT_API_AUTH_STRING")

# Cloud: Bearer token; Server with auth string: raw string; Server without auth: no header
if api_key:
    headers["Authorization"] = f"Bearer {api_key}"
elif auth_string:
    headers["Authorization"] = auth_string

async def main():
    async with websockets.connect(WS_URL, extra_headers=headers) as ws:
        # If you want to filter, you can send a JSON filter message first (see note below)
        # await ws.send(json.dumps({"filter": { ... }}))

        async for msg in ws:
            # Messages are JSON-encoded logs
            print(msg)  # or: print(json.loads(msg))

asyncio.run(main())
About filtering - The server supports filtering logs (e.g., by flow_run_id, task_run_id, level). - The exact filter shape for the WebSocket matches the Prefect LogFilter models. If you want to avoid guessing the JSON, the simplest path is to use Prefect’s subscriber client which handles the handshake for you:
Copy code
from prefect.logging.clients import get_logs_subscriber

sub = get_logs_subscriber()  # optionally pass a LogFilter
async def watch():
    async for log in sub:
        print(log)
- If you want to send a manual filter over the socket, send a JSON message immediately after connecting that describes your filter. For example, to only see logs for one flow run and INFO+:
Copy code
{
  "filter": {
    "flow_runs": {
      "id": {
        "any_": [
          "<flow_run_id>"
        ]
      }
    },
    "logs": {
      "level": {
        "ge": 20
      }
    }
  }
}
Note: the exact filter schema mirrors the REST filters for logs. If you want, I can generate a precise JSON for your use case. What raw messages look like - Frames are JSON-encoded log records. A typical payload includes:
Copy code
{
  "id": "9f5b7a43-7b2d-4f33-9a1a-2f30d9c8f6d3",
  "timestamp": "2025-01-02T15:04:05.123456+00:00",
  "level": 20,
  "name": "prefect.flow_runs",
  "message": "Flow run started",
  "flow_run_id": "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4",
  "task_run_id": null
}
Helpful docs
- WebSocket subscribers utilities: prefect.server.utilities.subscriptions - Logging clients (subscriber helpers): prefect.logging.clients - Server logs API (REST; useful for filter shapes): Server logs API - Security settings (auth headers on Server): Security settings Want me to tailor a ready-to-run command for your workspace (Cloud or Server), including a filter for a specific flow/task run?
d
Prefect is giving me the following error:
Copy code
File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_ws_log_test.py", line 9, in main
    async with websockets.connect(WS_URL) as ws:
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 635, in __aenter__
    return await self
           ^^^^^^^^^^
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 656, in __await_impl__
    await protocol.handshake(
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 322, in handshake
    raise InvalidStatusCode(status_code, response_headers)
websockets.legacy.exceptions.InvalidStatusCode: server rejected WebSocket connection: HTTP 403
@Marvin I don't have a auth configured in my Prefect 3 Server but I'm getting this message:
Copy code
Prefect is giving me the following error:
  File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_ws_log_test.py", line 9, in main
    async with websockets.connect(WS_URL) as ws:
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 635, in __aenter__
    return await self
           ^^^^^^^^^^
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 656, in __await_impl__
    await protocol.handshake(
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/websockets/legacy/client.py", line 322, in handshake
    raise InvalidStatusCode(status_code, response_headers)
websockets.legacy.exceptions.InvalidStatusCode: server rejected WebSocket connection: HTTP 403
m
thought for 80.1 seconds
403 on the WebSocket handshake usually means the server is rejecting the request before upgrading. Since you’re on Prefect 3 Server without auth, here are the most likely causes and quick fixes to try. 1) Make sure you’re hitting the API path (with /api) - Prefect Server hosts the API under /api. The logs socket should be: - ws://127.0.0.1:4200/api/logs/out - If you connect to ws://127.0.0.1:4200/logs/out (missing /api), you’ll hit the UI app and get rejected. Quick test with wscat
Copy code
wscat -c "<ws://127.0.0.1:4200/api/logs/out>"
2) Set an Origin header if the server requires it Some deployments will reject sockets missing an Origin or with a mismatched Origin. Try adding an Origin matching your server URL: - wscat
Copy code
wscat -c "<ws://127.0.0.1:4200/api/logs/out>" -H "Origin: <http://127.0.0.1:4200>"
- Python websockets
Copy code
import asyncio, websockets

WS_URL = "<ws://127.0.0.1:4200/api/logs/out>"

async def main():
    async with websockets.connect(WS_URL, origin="<http://127.0.0.1:4200>") as ws:
        async for msg in ws:
            print(msg)

asyncio.run(main())
3) Reverse proxy not forwarding WebSockets If you’re going through a proxy (Nginx, Traefik, ELB/ALB), it must allow WebSocket upgrades. For Nginx, for example:
Copy code
location / {
  proxy_pass <http://prefect-server:4200;>
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
  proxy_set_header Host $host;
}
4) Scheme mismatch (wss vs ws) - If your server is plain HTTP on 4200, use ws:// not wss://. - Only use wss:// if you’ve terminated TLS on the same host you’re connecting to. 5) No auth headers are needed (if you truly have no auth) - Ensure you’re not setting PREFECT_API_AUTH_STRING accidentally. - If you are using an auth string, then add it as: - Authorization: <PREFECT_API_AUTH_STRING> Minimal working example (no auth, OSS Server)
Copy code
import asyncio, websockets

WS_URL = "<ws://127.0.0.1:4200/api/logs/out>"

async def main():
    async with websockets.connect(WS_URL, origin="<http://127.0.0.1:4200>") as ws:
        async for msg in ws:
            print(msg)

asyncio.run(main())
If it still 403s, can you share: - The exact WS_URL you’re using - Whether you’re connecting directly to the Server or through a proxy - Your Prefect version and server logs around the time of the 403 (the server often logs why it rejected the socket) References - Logging clients (subscriber helpers): prefect.logging.clients - Subscriptions utilities: prefect.server.utilities.subscriptions - Security settings (for Server auth/headers): Security settings
d
@Marvin in Prefect my
"auth_string": null
m
thought for 35.9 seconds
Got it — since your auth_string is null, Prefect Server isn’t expecting any auth header. A 403 on the WS handshake is almost always one of these: - Wrong path (missing /api) - Missing or mismatched Origin header - Reverse proxy not allowing WebSocket upgrades - Scheme mismatch (ws vs wss) Quick fixes to try 1) Verify the exact WS URL - It must include /api:
Copy code
<ws://127.0.0.1:4200/api/logs/out>
If you hit ws://127.0.0.1:4200/logs/out (without /api), you’ll be rejected by the UI app. 2) Add an Origin header Some deployments require a matching Origin. Try: - wscat
Copy code
wscat -c "<ws://127.0.0.1:4200/api/logs/out>" -H "Origin: <http://127.0.0.1:4200>"
- Python
Copy code
import asyncio, websockets

WS_URL = "<ws://127.0.0.1:4200/api/logs/out>"

async def main():
    async with websockets.connect(WS_URL, origin="<http://127.0.0.1:4200>") as ws:
        async for msg in ws:
            print(msg)

asyncio.run(main())
3) If you’re behind a reverse proxy (Nginx/Traefik/ALB) Ensure upgrade headers are forwarded. For Nginx:
Copy code
location / {
  proxy_pass <http://prefect-server:4200;>
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
  proxy_set_header Host $host;
}
4) Scheme must match your deployment - Plain HTTP on port 4200 → use ws:// - Only use wss:// if TLS is actually terminated at that host/port Sanity checks - Confirm your API URL (should end with /api):
Copy code
from prefect.settings import get_current_settings
print(get_current_settings().PREFECT_API_URL)
# Expect: <http://127.0.0.1:4200/api> (or your host)/api
- Let Prefect compute the WS URL for you (to avoid typos):
Copy code
from prefect.logging.clients import logs_out_socket_from_api_url
print(logs_out_socket_from_api_url("<http://127.0.0.1:4200/api>"))
# -> <ws://127.0.0.1:4200/api/logs/out>
- Control test using Prefect’s subscriber (no auth needed if server is open):
Copy code
import asyncio
from prefect.logging.clients import get_logs_subscriber

async def main():
    sub = get_logs_subscriber()
    async for log in sub:
        print(log)

asyncio.run(main())
If this works but your manual socket still 403s, it’s definitely path/origin/proxy-related. If the 403 persists, can you share: - The exact
WS_URL
you’re using - Whether you’re connecting directly or via a proxy/load balancer - A snippet of server logs around the 403 (they often log the reason) Refs - prefect.logging.clients - prefect.server.utilities.subscriptions - Security settings
d
@Marvin can I use http instead of websockets to get raw Prefect logs?
m
thought for 129.9 seconds
Yes — you can read “raw” Prefect logs over HTTP via the REST API. This is great for historical queries or polling; for true real-time streaming you’ll still want the WebSocket. Two useful HTTP options 1) Read logs with filters (REST) - Endpoint: read_logs on the Server API - Typical Server path: POST /api/logs/filter - Body: a FilterSet (e.g., filter by flow_run_id, task_run_id, etc.) - Returns: JSON array of log records Example: curl (Prefect Server, no auth)
Copy code
curl -s -X POST "<http://127.0.0.1:4200/api/logs/filter>" \
  -H "Content-Type: application/json" \
  -d '{
    "filter": {
      "flow_runs": { "id": { "any_": ["<FLOW_RUN_ID>"] } }
    },
    "limit": 1000
  }'
Example: Python (httpx)
Copy code
import httpx

API = "<http://127.0.0.1:4200/api>"
payload = {
  "filter": {
    "flow_runs": { "id": { "any_": ["<FLOW_RUN_ID>"] } }
  },
  "limit": 100
}

r = <http://httpx.post|httpx.post>(f"{API}/logs/filter", json=payload, timeout=30)
r.raise_for_status()
for log in r.json():
    print(log)
Docs: Read logs (REST) 2) Download logs for a specific flow run - There’s also a flow-run–scoped “download logs” endpoint if you want all logs for one run in bulk. See Flow Runs API in the docs (look for “download logs”): Flow Runs API What the records look like - The REST response returns the same log objects you’d see over the WebSocket, e.g.:
Copy code
{
  "id": "9f5b7a43-7b2d-4f33-9a1a-2f30d9c8f6d3",
  "timestamp": "2025-01-02T15:04:05.123456+00:00",
  "level": 20,
  "name": "prefect.flow_runs",
  "message": "Flow run started",
  "flow_run_id": "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4",
  "task_run_id": null
}
Notes - Pagination: use the
limit
parameter; some versions also support
offset
. - Filtering: you can filter by flow_run_id, task_run_id, level, timestamp, etc., via the FilterSet body. The docs page above shows the full filter schema. - Auth: For your OSS Server with
"auth_string": null
, no Authorization header is needed. For Cloud, add
Authorization: Bearer <PREFECT_API_KEY>
.
d
@Marvin my reverse proxy doesn't allow websockets upgrades - how I can do it with just straight websockets?
m
thought for 43.1 seconds
Got it — if your reverse proxy won’t allow WebSocket upgrades, you have two practical options: Option A: Don’t use WebSockets — poll logs over HTTP - Use the REST endpoint to read logs and “tail” them by timestamp. - Endpoint (Prefect Server): POST http://YOUR_SERVER:4200/api/logs/filter - Poll with a timestamp filter and sort ascending to simulate streaming. Minimal “tail” loop (HTTP only)
Copy code
import time
import httpx
from datetime import datetime, timezone

API = "<http://127.0.0.1:4200/api>"
last_ts = datetime.now(timezone.utc).isoformat()

while True:
    payload = {
      "filter": {
        # Optional: scope to a flow run to reduce noise
        # "flow_runs": {"id": {"any_": ["<FLOW_RUN_ID>"]}},
        "logs": {
          "timestamp": {"ge": last_ts}
        }
      },
      "sort": "TIMESTAMP_ASC",
      "limit": 1000
    }

    r = <http://httpx.post|httpx.post>(f"{API}/logs/filter", json=payload, timeout=30)
    r.raise_for_status()
    logs = r.json()

    if logs:
        for log in logs:
            print(log)  # raw JSON records
        # advance the cursor to the last seen timestamp
        last_ts = logs[-1]["timestamp"]

    time.sleep(1)  # adjust to your needs
Example curl (single page)
Copy code
curl -s -X POST "<http://127.0.0.1:4200/api/logs/filter>" \
  -H "Content-Type: application/json" \
  -d '{
    "filter": {
      "logs": { "timestamp": { "ge": "2025-01-02T15:00:00Z" } }
    },
    "sort": "TIMESTAMP_ASC",
    "limit": 1000
  }'
Docs: - Read logs (REST): Read logs (REST) Option B: Bypass the proxy for WebSockets - WebSockets require the HTTP Upgrade; there’s no “straight WS” without it. If the proxy can’t support upgrades, you’ll need to avoid it: - Connect directly to the Prefect Server host/port (e.g., ws://HOST:4200/api/logs/out) - Or use an SSH tunnel: - ssh -L 14200127.0.0.14200 user@server - Then connect to ws://127.0.0.1:14200/api/logs/out Quick clarification: did you mean “without WebSockets” (HTTP-only), or “bypass the proxy to use WebSockets directly”? If HTTP-only works for you, I can tailor the polling filter (flow_run_id/task_run_id/level) to your needs. If you want to use WS directly, I can help with a tunnel or a minimal proxy rule for just /api/logs/out.
d
@Marvin Yes, HTTP-only works, please provide a polling example
m
thought for 48.4 seconds
Absolutely. Here’s a simple, reliable HTTP-only “tail” for Prefect 3 logs by polling the REST endpoint and advancing a timestamp cursor. Python (httpx) – tail all logs (optionally scope to a flow run)
Copy code
import time
import httpx
import json
from datetime import datetime, timezone

API = "<http://127.0.0.1:4200/api>"  # your Prefect Server API base
POLL_SECONDS = 1
LIMIT = 1000

# Optional: restrict to a specific flow run to reduce noise
FLOW_RUN_ID = None  # e.g., "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4"

# Start the cursor at "now" so we only see new logs
cursor_ts = datetime.now(timezone.utc).isoformat()

# Keep a small rolling set of seen IDs to de-dupe when multiple logs share the same timestamp
seen_ids = set()
SEEN_MAX = 5000

def build_payload(cursor):
    filter_obj = {
        "logs": {"timestamp": {"ge": cursor}},
        # Optional: only INFO and above
        # "logs": {"timestamp": {"ge": cursor}, "level": {"ge": 20}},
    }
    if FLOW_RUN_ID:
        filter_obj["flow_runs"] = {"id": {"any_": [FLOW_RUN_ID]}}

    payload = {
        "filter": filter_obj,
        # Ask for ascending timestamps so we can advance the cursor safely
        "sort": "TIMESTAMP_ASC",
        "limit": LIMIT,
    }
    return payload

with httpx.Client(timeout=30) as client:
    while True:
        payload = build_payload(cursor_ts)
        r = <http://client.post|client.post>(f"{API}/logs/filter", json=payload)
        r.raise_for_status()
        logs = r.json()

        if logs:
            for log in logs:
                # De-duplicate by ID (helps when multiple logs share the same timestamp)
                if log["id"] in seen_ids:
                    continue
                print(json.dumps(log))  # raw JSON record
                seen_ids.add(log["id"])
                if len(seen_ids) > SEEN_MAX:
                    # prevent unbounded growth
                    seen_ids = set(list(seen_ids)[-SEEN_MAX:])

            # Advance the cursor to the last timestamp we received
            cursor_ts = logs[-1]["timestamp"]

        time.sleep(POLL_SECONDS)
Minimal curl example (single page)
Copy code
curl -s -X POST "<http://127.0.0.1:4200/api/logs/filter>" \
  -H "Content-Type: application/json" \
  -d '{
    "filter": {
      "logs": { "timestamp": { "ge": "2025-01-02T15:00:00Z" } }
    },
    "sort": "TIMESTAMP_ASC",
    "limit": 1000
  }'
Notes and tips - To scope to a single run, add: -
"flow_runs": { "id": { "any_": ["<FLOW_RUN_ID>"] } }
- To filter by level: -
"logs": { "level": { "ge": 20 } }
(20=INFO, 30=WARNING, 40=ERROR) - If you get a 422 from the API about sorting, remove the sort or adjust according to the server’s LogSort enum. - For large volumes, reduce
POLL_SECONDS
and tune
LIMIT
. Docs - Read logs (REST): Read logs (REST) If you share your exact filtering needs (flow/task run IDs, minimum level), I can tailor the payload for you.
d
@Marvin getting this error message:
Copy code
Traceback (most recent call last):
  File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_http_log_test.py", line 40, in <module>
    r.raise_for_status()
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/httpx/_models.py", line 829, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Redirect response '301 Moved Permanently' for url '<http://workflows.arte.adobe.net/api/logs/filter>'
Redirect location: '<https://workflows.arte.adobe.net/api/logs/filter>'
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/301>
m
thought for 54.5 seconds
Your server is redirecting HTTP to HTTPS (301). Since you’re POSTing, don’t rely on redirects—go straight to the HTTPS API URL. Do this: - Use the HTTPS base URL - Optionally configure TLS verification if you’re using a custom/internal CA Updated Python tail example
Copy code
import time
import httpx
import json
from datetime import datetime, timezone

API = "<https://workflows.arte.adobe.net/api>"  # use https
POLL_SECONDS = 1
LIMIT = 1000

# If you have a corporate/self-signed CA, point to it:
# VERIFY = "/path/to/ca-bundle.pem"
VERIFY = True  # or False temporarily (not recommended for prod)

cursor_ts = datetime.now(timezone.utc).isoformat()
seen_ids = set()
SEEN_MAX = 5000

def build_payload(cursor):
    return {
        "filter": {
            "logs": {"timestamp": {"ge": cursor}},
            # Example: restrict to a flow run or set min level
            # "flow_runs": {"id": {"any_": ["<FLOW_RUN_ID>"]}},
            # "logs": {"timestamp": {"ge": cursor}, "level": {"ge": 20}}
        },
        "sort": "TIMESTAMP_ASC",
        "limit": LIMIT,
    }

with httpx.Client(timeout=30, verify=VERIFY) as client:
    while True:
        r = <http://client.post|client.post>(f"{API}/logs/filter", json=build_payload(cursor_ts))
        # If you still see a redirect, print r.status_code and r.headers to debug
        r.raise_for_status()
        logs = r.json()

        if logs:
            for log in logs:
                if log["id"] in seen_ids:
                    continue
                print(json.dumps(log))
                seen_ids.add(log["id"])
                if len(seen_ids) > SEEN_MAX:
                    seen_ids = set(list(seen_ids)[-SEEN_MAX:])
            cursor_ts = logs[-1]["timestamp"]

        time.sleep(POLL_SECONDS)
Quick curl check
Copy code
curl -s -X POST "<https://workflows.arte.adobe.net/api/logs/filter>" \
  -H "Content-Type: application/json" \
  -d '{"filter":{"logs":{"timestamp":{"ge":"2025-01-02T15:00:00Z"}}},"sort":"TIMESTAMP_ASC","limit":1000}'
Notes - Avoid follow_redirects=True for POST here; a 301/302 may change POST to GET. It’s safer to target HTTPS directly. - If you hit TLS errors: - Provide your CA bundle path via httpx.Client(verify="/path/to/ca.pem"), or - Temporarily set verify=False (not recommended for production). If you share your exact filtering needs (flow/task run IDs or min level), I can tailor the payload.
d
@Marvin new error after changing to HTTPS:
Copy code
Traceback (most recent call last):
  File "/home/dflake/adobe/git_repos/ARTE/arte/prefect_http_log_test.py", line 39, in <module>
    r.raise_for_status()
  File "/home/dflake/anaconda3/envs/arte/lib/python3.11/site-packages/httpx/_models.py", line 829, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '422 Unprocessable Entity' for url '<https://workflows.arte.adobe.net/api/logs/filter>'
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
m
thought for 119.3 seconds
A 422 here is almost certainly the request body shape. The logs REST endpoint in Prefect 3 expects: - Top-level keys like
limit
,
offset
,
sort
- A
logs
object containing a LogFilter - Filter fields use suffixed operators, e.g. `after_`/`before_`, `ge_`/`le_`, and
any_
Correct HTTP “tail” example (HTTPS)
Copy code
import time
import httpx
import json
from datetime import datetime, timezone

API = "<https://workflows.arte.adobe.net/api>"
POLL_SECONDS = 1
LIMIT = 1000

# Optional: scope to a flow run
FLOW_RUN_ID = None  # e.g., "b6c8a2b8-9d4c-4cb1-97c1-3a0f6e5b83a4"

# If using a custom CA bundle, set verify to its path; otherwise keep True
VERIFY = True

cursor_ts = datetime.now(timezone.utc).isoformat()
seen_ids = set()
SEEN_MAX = 5000

def build_payload(cursor):
    lf = {
        "timestamp": {"after_": cursor},   # use after_/before_
        # "level": {"ge_": 20},            # optional: INFO and above
    }
    if FLOW_RUN_ID:
        lf["flow_run_id"] = {"any_": [FLOW_RUN_ID]}  # use any_ for IDs

    return {
        "logs": lf,
        "limit": LIMIT,
        # sort is optional; TIMESTAMP_ASC is typical—remove if your server complains
        "sort": "TIMESTAMP_ASC",
    }

with httpx.Client(timeout=30, verify=VERIFY) as client:
    while True:
        r = <http://client.post|client.post>(f"{API}/logs/filter", json=build_payload(cursor_ts))
        r.raise_for_status()
        logs = r.json()

        if logs:
            for log in logs:
                if log["id"] in seen_ids:
                    continue
                print(json.dumps(log))
                seen_ids.add(log["id"])
                if len(seen_ids) > SEEN_MAX:
                    seen_ids = set(list(seen_ids)[-SEEN_MAX:])
            cursor_ts = logs[-1]["timestamp"]

        time.sleep(POLL_SECONDS)
Single-page curl with the correct filter structure
Copy code
curl -s -X POST "<https://workflows.arte.adobe.net/api/logs/filter>" \
  -H "Content-Type: application/json" \
  -d '{
    "logs": {
      "timestamp": { "after_": "2025-01-02T15:00:00Z" },
      "level": { "ge_": 20 },
      "flow_run_id": { "any_": ["<FLOW_RUN_ID>"] }
    },
    "sort": "TIMESTAMP_ASC",
    "limit": 1000
  }'
Key differences from your previous payload - No outer
filter
object; use
logs
for LogFilter - Use operator-suffixed fields: - Timestamp:
{"after_": "...", "before_": "..."}
(not `ge`/`le`) - Level:
{"ge_": 20}
(not
ge
) - IDs:
{"any_": ["id1","id2"]}
-
sort
is typically `"TIMESTAMP_ASC"`; if you still see 422, try omitting
sort
. If you share the exact filter you need (e.g., flow_run_id and minimum level), I can provide a narrowed payload tailored to your case.