<@ULVA73B9P> How do I search through all logs emit...
# ask-marvin
t
@Marvin How do I search through all logs emitted from all kinds of event?
m
thought for 497.2 seconds
Great question — quick clarification first: in Prefect 3.x, “logs” and “events” are two separate systems. - Logs = messages emitted by flows, tasks, workers, etc. (level, message, timestamp, flow/task run IDs) - Events = state changes and custom events in the event stream (e.g., flow-run state transitions) - Log messages are not events; there is no “prefect.log.write” event. So “search all logs” and “search all events” use different APIs. Below are both paths so you can pick what you meant. Search all logs across your workspace - Best via API or Python client; there is no CLI “logs” command and the UI is run-centric. - Filters you can use: logger
name
,
level
(ge_/le_),
timestamp
(after_/before_),
flow_run_id
,
task_run_id
, and message text search via
LogFilterTextSearch
. Python example (Prefect 3.x)
Copy code
import asyncio
from datetime import datetime, timedelta, timezone

from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    LogFilter, LogFilterLevel, LogFilterTimestamp, LogFilterTextSearch,
    LogFilterFlowRunId, LogFilterTaskRunId
)
from prefect.client.schemas.sorting import LogSort

async def main():
    async with get_client() as client:
        logs = await client.read_logs(
            limit=200,
            offset=0,
            sort=LogSort.TIMESTAMP_DESC,
            filter=LogFilter(
                level=LogFilterLevel(ge_=20),  # INFO and above
                timestamp=LogFilterTimestamp(
                    after_=datetime.now(timezone.utc) - timedelta(days=1)
                ),
                text=LogFilterTextSearch(any_=["error", "failed"]),  # simple substring match
                # Optional scoping:
                # flow_run_id=LogFilterFlowRunId(any_=["<flow-run-uuid>"]),
                # task_run_id=LogFilterTaskRunId(is_null_=True),  # e.g., worker/server logs without a task
            ),
        )
        for row in logs:  # each row has .message, .timestamp, .level, .flow_run_id, .task_run_id, .name
            print(row.timestamp, row.level, row.name, "-", row.message)

asyncio.run(main())
REST API example (self-hosted) - URL: http//&lt;your prefect host&gt;4200/api/logs/read
Copy code
POST /api/logs/read
Content-Type: application/json

{
  "limit": 200,
  "sort": "TIMESTAMP_DESC",
  "filter": {
    "level": { "ge_": 20 },
    "timestamp": { "after_": "2025-01-15T00:00:00Z" },
    "text": { "any_": ["error", "failed"] }
    /* Optionally:
    "flow_run_id": { "any_": ["<flow-run-uuid>"] },
    "task_run_id": { "is_null_": true }
    */
  }
}
REST API example (Cloud) - URL: https://api.prefect.cloud/api/accounts/&lt;account_id&gt;/workspaces/&lt;workspace_id&gt;/logs/read - Same JSON body as above. Docs: - REST: Read logs - Python filter models (LogFilter*, operators) Notes for scale: with SQLite (your current setup), large text searches can be slow under load; your planned move to Postgres will significantly help performance and reliability. Search all events across your workspace - Use this if you literally want the global event stream (flow/task/deployment state-change events and custom events), not logs. CLI (live stream)
Copy code
prefect event stream
# Options: --format text | json, --output-file <path>, --run-once
Python (subscribe with filters)
Copy code
import asyncio
from prefect.events.clients import get_events_subscriber
from prefect.events.filters import EventFilter, EventNameFilter

async def listen():
    f = EventFilter(event=EventNameFilter(prefix=["prefect.flow-run"]))
    async with get_events_subscriber(filter=f) as sub:
        async for event in sub:
            print(event)

asyncio.run(listen())
REST (historical reads) - Self-hosted: POST http//&lt;host&gt;4200/api/events/read - Cloud: POST https://api.prefect.cloud/api/accounts/&lt;account_id&gt;/workspaces/&lt;workspace_id&gt;/events/read ```
POST /api/events/read Content-Type: application/json { "limit": 100, "filter": { "event": { "prefix": ["prefect.flow-run"] }, "occurred": { "after_": "2025-01-15T000000Z" } } } ``` Docs: - Events overview - CLI: prefect event - REST: Read events Quick recap - If you want to search log messages across everything: use logs read (Python client or REST) with filters; include
LogFilterTextSearch
for message content. - If you want to search event activity: use events read (CLI streaming, Python subscriber, or REST). - Logs are not part of the event stream. Does this match what you’re looking for? If you can share whether you want log text search or event activity search (or both), I can tailor an example to your exact use case, including filtering by deployment/work queue/worker and time windows.