Devin Flake
09/17/2025, 7:59 PMMarvin
09/17/2025, 7:59 PMNate
09/17/2025, 8:04 PM» ipython
#[1]
from prefect.logging.clients import get_logs_subscriber
#[2]
async with get_logs_subscriber() as s:
async for log in s:
print(log)
Nate
09/17/2025, 8:04 PMDevin Flake
09/17/2025, 8:05 PMNate
09/17/2025, 8:05 PMMarvin
09/17/2025, 8:06 PMPrefectLogsSubscriber
. It uses Prefect’s filter schema to scope logs to a single flow_run_id
and works against your Server API URL.
Note: This assumes you’re on Prefect 3.x and have PREFECT_API_URL
set (e.g., `http://127.0.0.1:4200/api`). You can also pass the API URL explicitly, shown below.
# stream_flow_run_logs.py
import asyncio
import os
import sys
from uuid import UUID
from prefect.logging.clients import PrefectLogsSubscriber, get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterLevel
async def main() -> None:
if len(sys.argv) < 2:
print("Usage: python stream_flow_run_logs.py <FLOW_RUN_ID> [API_URL]")
sys.exit(1)
flow_run_id = UUID(sys.argv[1])
# Optional: pass API URL explicitly (e.g., <http://127.0.0.1:4200/api>),
# otherwise get_logs_subscriber() will use PREFECT_API_URL from your environment.
api_url = sys.argv[2] if len(sys.argv) > 2 else os.getenv("PREFECT_API_URL")
# Build a filter to scope to one flow run (this will include its task logs too)
log_filter = LogFilter(
flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
# Optional: set a minimum level, e.g., INFO (= 20)
level=LogFilterLevel(ge_=0),
)
# Option A: Use settings-based factory (reads PREFECT_API_URL)
subscriber = get_logs_subscriber(filter=log_filter)
# Option B: Explicitly target a Server API URL (uncomment to use)
# subscriber = PrefectLogsSubscriber(api_url=api_url, filter=log_filter)
print(f"Subscribing to logs for flow run: {flow_run_id}")
if api_url:
print(f"Using Prefect API URL: {api_url}")
try:
async with subscriber as sub:
async for log in sub:
# Safely access fields; structure is a Prefect Log model
ts = getattr(log, "timestamp", None)
lvl = getattr(log, "level", None)
name = getattr(log, "name", "")
msg = getattr(log, "message", "")
print(f"{ts} [{lvl}] {name}: {msg}")
except KeyboardInterrupt:
print("\nStopped.")
if __name__ == "__main__":
asyncio.run(main())
How to run
- Ensure your Server is running and PREFECT_API_URL
is set (e.g., `export PREFECT_API_URL=http://127.0.0.1:4200/api`)
- Run:
- python stream_flow_run_logs.py <FLOW_RUN_ID>
- Or specify the API URL explicitly as the second arg:
- python stream_flow_run_logs.py <FLOW_RUN_ID> <http://127.0.0.1:4200/api>
Docs
- Prefect logging clients (includes PrefectLogsSubscriber
): Prefect 3 API: logging clients
Want me to tweak this to filter by task run, or to include a small backlog of logs before live streaming?Nate
09/17/2025, 8:07 PMtemporary_settings
ctx managerMarvin
09/17/2025, 8:07 PMMarvin
09/17/2025, 8:07 PMtemporary_settings
so you don’t have to export env vars:
# stream_flow_run_logs.py
import asyncio
import sys
from uuid import UUID
from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterLevel
from prefect.settings import temporary_settings, PREFECT_API_URL
async def stream(flow_run_id: UUID, api_url: str | None = None) -> None:
# Filter to a single flow run (includes its task logs)
log_filter = LogFilter(
flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
level=LogFilterLevel(ge_=0), # 0 = all levels; set to 20 for INFO+
)
async def run():
subscriber = get_logs_subscriber(filter=log_filter) # reads PREFECT_API_URL from settings
async with subscriber as sub:
async for log in sub:
name = getattr(log, "name", "")
print(f"{log.timestamp} [{log.level}] {name}: {log.message}")
if api_url:
# Set PREFECT_API_URL just for this session
with temporary_settings({PREFECT_API_URL: api_url}):
await run()
else:
await run()
async def main():
if len(sys.argv) < 2:
print("Usage: python stream_flow_run_logs.py <FLOW_RUN_ID> [API_URL]")
raise SystemExit(1)
flow_run_id = UUID(sys.argv[1])
api_url = sys.argv[2] if len(sys.argv) > 2 else None
print(f"Subscribing to logs for flow run: {flow_run_id}")
if api_url:
print(f"Using Prefect API URL (temporary): {api_url}")
try:
await stream(flow_run_id, api_url)
except KeyboardInterrupt:
print("\nStopped.")
if __name__ == "__main__":
asyncio.run(main())
How to run
- Example (Prefect Server): python stream_flow_run_logs.py <FLOW_RUN_ID> <http://127.0.0.1:4200/api>
Docs
- Logging clients (includes PrefectLogsSubscriber): Prefect 3 API: logging clientsNate
09/17/2025, 8:09 PMmain()
in the temp settings ctx (at least somewhere above starting the consumer)Devin Flake
09/17/2025, 8:10 PMNate
09/17/2025, 8:13 PM#[1]
from prefect.settings import temporary_settings, PREFECT_API_URL
#[2]
print("outside: ", PREFECT_API_URL.value())
with temporary_settings({PREFECT_API_URL: "foo"}):
print("inside: ", PREFECT_API_URL.value())
outside: <http://127.0.0.1:4200/api>
inside: foo
im just saying to start the logs consumer (ie async with get_logs_consumer() as s:
from inside
the temporary_settings
ctx so that you use the api url that you wantDevin Flake
09/17/2025, 9:08 PMfrom uuid import UUID
from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterLevel
from prefect.settings import temporary_settings, PREFECT_API_URL
from arte_tools.config import PREFECT_API_URL as ARTE_PREFECT_API_URL
asyncio.run(stream_flow_run_logs(UUID(run_id)))
async def stream_flow_run_logs(flow_run_id: UUID):
# Filter to a single flow run (includes its task logs)
log_filter = LogFilter(
flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
level=LogFilterLevel(ge_=0), # 0 = all levels; set to 20 for INFO+
)
print('prefect api url:', PREFECT_API_URL)
with temporary_settings({PREFECT_API_URL: ARTE_PREFECT_API_URL}):
print('prefect api url:', PREFECT_API_URL)
print('temporary_settings', temporary_settings, type(temporary_settings), dir(temporary_settings))
async def run():
subscriber = get_logs_subscriber(filter=log_filter) # this reads the PREFECT_API_URL from settings
print('subscriber:', subscriber)
async with subscriber as sub:
async for log in sub:
name = getattr(log, "name", "")
print(f"{log.timestamp} [{log.level}] {name}: {log.message}")
try:
print('before run()')
await run()
print('after run()')
except KeyboardInterrupt:
print("Log streaming interrupted by user.")
Nate
09/17/2025, 9:15 PMARTE_PREFECT_API_URL
a constant str
?Devin Flake
09/17/2025, 9:16 PMNate
09/17/2025, 9:16 PMDevin Flake
09/17/2025, 9:16 PMNate
09/17/2025, 9:20 PMasync def
?
asyncio.run(stream_flow_run_logs(UUID(run_id)))
async def stream_flow_run_logs(flow_run_id: UUID):
also I'd double check for the flow run that you're filtering for here
log_filter = LogFilter(
flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
level=LogFilterLevel(ge_=0), # 0 = all levels; set to 20 for INFO+
)
Nate
09/17/2025, 9:20 PMDevin Flake
09/17/2025, 9:22 PMasync def
for demonstration purposes - thanks, I'll keep checking