Tom Han
10/16/2025, 8:38 PMMarvin
10/16/2025, 8:38 PMMarvin
10/16/2025, 8:46 PMname, level (ge_/le_), timestamp (after_/before_), flow_run_id, task_run_id, and message text search via LogFilterTextSearch.
Python example (Prefect 3.x)
import asyncio
from datetime import datetime, timedelta, timezone
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
LogFilter, LogFilterLevel, LogFilterTimestamp, LogFilterTextSearch,
LogFilterFlowRunId, LogFilterTaskRunId
)
from prefect.client.schemas.sorting import LogSort
async def main():
async with get_client() as client:
logs = await client.read_logs(
limit=200,
offset=0,
sort=LogSort.TIMESTAMP_DESC,
filter=LogFilter(
level=LogFilterLevel(ge_=20), # INFO and above
timestamp=LogFilterTimestamp(
after_=datetime.now(timezone.utc) - timedelta(days=1)
),
text=LogFilterTextSearch(any_=["error", "failed"]), # simple substring match
# Optional scoping:
# flow_run_id=LogFilterFlowRunId(any_=["<flow-run-uuid>"]),
# task_run_id=LogFilterTaskRunId(is_null_=True), # e.g., worker/server logs without a task
),
)
for row in logs: # each row has .message, .timestamp, .level, .flow_run_id, .task_run_id, .name
print(row.timestamp, row.level, row.name, "-", row.message)
asyncio.run(main())
REST API example (self-hosted)
- URL: http//<your prefect host>4200/api/logs/read
POST /api/logs/read
Content-Type: application/json
{
"limit": 200,
"sort": "TIMESTAMP_DESC",
"filter": {
"level": { "ge_": 20 },
"timestamp": { "after_": "2025-01-15T00:00:00Z" },
"text": { "any_": ["error", "failed"] }
/* Optionally:
"flow_run_id": { "any_": ["<flow-run-uuid>"] },
"task_run_id": { "is_null_": true }
*/
}
}
REST API example (Cloud)
- URL: https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>/logs/read
- Same JSON body as above.
Docs:
- REST: Read logs
- Python filter models (LogFilter*, operators)
Notes for scale: with SQLite (your current setup), large text searches can be slow under load; your planned move to Postgres will significantly help performance and reliability.
Search all events across your workspace
- Use this if you literally want the global event stream (flow/task/deployment state-change events and custom events), not logs.
CLI (live stream)
prefect event stream
# Options: --format text | json, --output-file <path>, --run-once
Python (subscribe with filters)
import asyncio
from prefect.events.clients import get_events_subscriber
from prefect.events.filters import EventFilter, EventNameFilter
async def listen():
f = EventFilter(event=EventNameFilter(prefix=["prefect.flow-run"]))
async with get_events_subscriber(filter=f) as sub:
async for event in sub:
print(event)
asyncio.run(listen())
REST (historical reads)
- Self-hosted: POST http//<host>4200/api/events/read
- Cloud: POST https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>/events/read
```Marvin
10/16/2025, 8:46 PMLogFilterTextSearch for message content.
- If you want to search event activity: use events read (CLI streaming, Python subscriber, or REST).
- Logs are not part of the event stream.
Does this match what you’re looking for? If you can share whether you want log text search or event activity search (or both), I can tailor an example to your exact use case, including filtering by deployment/work queue/worker and time windows.