<@ULVA73B9P> can you show me an example of a scrip...
# ask-marvin
d
@Marvin can you show me an example of a script to connect to a Prefect Server version 3 using the PrefectLogsSubscriber to stream the logs of a flow run?
m
thought for 387.5 seconds
n
Copy code
» ipython

#[1]
from prefect.logging.clients import get_logs_subscriber

#[2]
async with get_logs_subscriber() as s:
    async for log in s:
        print(log)
i wouldnt use the class directly, the util will get the right one based on your api url
d
oh nice - is there a way to set the api url?
n
like specifically for that subscriber?
m
Absolutely — here’s a minimal, working example for Prefect 3.x Server that streams logs for a specific flow run using
PrefectLogsSubscriber
. It uses Prefect’s filter schema to scope logs to a single
flow_run_id
and works against your Server API URL. Note: This assumes you’re on Prefect 3.x and have
PREFECT_API_URL
set (e.g., `http://127.0.0.1:4200/api`). You can also pass the API URL explicitly, shown below.
Copy code
# stream_flow_run_logs.py
import asyncio
import os
import sys
from uuid import UUID

from prefect.logging.clients import PrefectLogsSubscriber, get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterLevel


async def main() -> None:
    if len(sys.argv) < 2:
        print("Usage: python stream_flow_run_logs.py <FLOW_RUN_ID> [API_URL]")
        sys.exit(1)

    flow_run_id = UUID(sys.argv[1])

    # Optional: pass API URL explicitly (e.g., <http://127.0.0.1:4200/api>),
    # otherwise get_logs_subscriber() will use PREFECT_API_URL from your environment.
    api_url = sys.argv[2] if len(sys.argv) > 2 else os.getenv("PREFECT_API_URL")

    # Build a filter to scope to one flow run (this will include its task logs too)
    log_filter = LogFilter(
        flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
        # Optional: set a minimum level, e.g., INFO (= 20)
        level=LogFilterLevel(ge_=0),
    )

    # Option A: Use settings-based factory (reads PREFECT_API_URL)
    subscriber = get_logs_subscriber(filter=log_filter)

    # Option B: Explicitly target a Server API URL (uncomment to use)
    # subscriber = PrefectLogsSubscriber(api_url=api_url, filter=log_filter)

    print(f"Subscribing to logs for flow run: {flow_run_id}")
    if api_url:
        print(f"Using Prefect API URL: {api_url}")

    try:
        async with subscriber as sub:
            async for log in sub:
                # Safely access fields; structure is a Prefect Log model
                ts = getattr(log, "timestamp", None)
                lvl = getattr(log, "level", None)
                name = getattr(log, "name", "")
                msg = getattr(log, "message", "")
                print(f"{ts} [{lvl}] {name}: {msg}")
    except KeyboardInterrupt:
        print("\nStopped.")


if __name__ == "__main__":
    asyncio.run(main())
How to run - Ensure your Server is running and
PREFECT_API_URL
is set (e.g., `export PREFECT_API_URL=http://127.0.0.1:4200/api`) - Run: -
python stream_flow_run_logs.py <FLOW_RUN_ID>
- Or specify the API URL explicitly as the second arg: -
python stream_flow_run_logs.py <FLOW_RUN_ID> <http://127.0.0.1:4200/api>
Docs - Prefect logging clients (includes
PrefectLogsSubscriber
): Prefect 3 API: logging clients Want me to tweak this to filter by task run, or to include a small backlog of logs before live streaming?
n
@Marvin not bad marvin but u can set the api url w the
temporary_settings
ctx manager
m
thought for 32.2 seconds
Great call — here’s the same example updated to set the API URL with
temporary_settings
so you don’t have to export env vars:
Copy code
# stream_flow_run_logs.py
import asyncio
import sys
from uuid import UUID

from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterLevel
from prefect.settings import temporary_settings, PREFECT_API_URL


async def stream(flow_run_id: UUID, api_url: str | None = None) -> None:
    # Filter to a single flow run (includes its task logs)
    log_filter = LogFilter(
        flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
        level=LogFilterLevel(ge_=0),  # 0 = all levels; set to 20 for INFO+
    )

    async def run():
        subscriber = get_logs_subscriber(filter=log_filter)  # reads PREFECT_API_URL from settings
        async with subscriber as sub:
            async for log in sub:
                name = getattr(log, "name", "")
                print(f"{log.timestamp} [{log.level}] {name}: {log.message}")

    if api_url:
        # Set PREFECT_API_URL just for this session
        with temporary_settings({PREFECT_API_URL: api_url}):
            await run()
    else:
        await run()


async def main():
    if len(sys.argv) < 2:
        print("Usage: python stream_flow_run_logs.py <FLOW_RUN_ID> [API_URL]")
        raise SystemExit(1)

    flow_run_id = UUID(sys.argv[1])
    api_url = sys.argv[2] if len(sys.argv) > 2 else None

    print(f"Subscribing to logs for flow run: {flow_run_id}")
    if api_url:
        print(f"Using Prefect API URL (temporary): {api_url}")

    try:
        await stream(flow_run_id, api_url)
    except KeyboardInterrupt:
        print("\nStopped.")


if __name__ == "__main__":
    asyncio.run(main())
How to run - Example (Prefect Server):
python stream_flow_run_logs.py <FLOW_RUN_ID> <http://127.0.0.1:4200/api>
Docs - Logging clients (includes PrefectLogsSubscriber): Prefect 3 API: logging clients
n
hrm close but i'd wrap
main()
in the temp settings ctx (at least somewhere above starting the consumer)
d
interesting - I'll have to go over this to understand it better, thanks
n
Copy code
#[1]
from prefect.settings import temporary_settings, PREFECT_API_URL

#[2]
print("outside: ", PREFECT_API_URL.value())
with temporary_settings({PREFECT_API_URL: "foo"}):
    print("inside: ", PREFECT_API_URL.value())

outside:  <http://127.0.0.1:4200/api>
inside:  foo
im just saying to start the logs consumer (ie
async with get_logs_consumer() as s:
from
inside
the
temporary_settings
ctx so that you use the api url that you want
👍 1
d
@Nate if/when you have a minute - does this look ok? it's not returning any output 😞
Copy code
from uuid import UUID
    
from prefect.logging.clients import get_logs_subscriber
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId, LogFilterLevel
from prefect.settings import temporary_settings, PREFECT_API_URL
    
from arte_tools.config import PREFECT_API_URL as ARTE_PREFECT_API_URL

asyncio.run(stream_flow_run_logs(UUID(run_id)))

async def stream_flow_run_logs(flow_run_id: UUID):
    # Filter to a single flow run (includes its task logs)
    log_filter = LogFilter(
        flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
        level=LogFilterLevel(ge_=0),  # 0 = all levels; set to 20 for INFO+
    )

    print('prefect api url:', PREFECT_API_URL)
    with temporary_settings({PREFECT_API_URL: ARTE_PREFECT_API_URL}):
        print('prefect api url:', PREFECT_API_URL)
        print('temporary_settings', temporary_settings, type(temporary_settings), dir(temporary_settings))
        async def run():
            subscriber = get_logs_subscriber(filter=log_filter)  # this reads the PREFECT_API_URL from settings
            print('subscriber:', subscriber)
            async with subscriber as sub:
                async for log in sub:
                    name = getattr(log, "name", "")
                    print(f"{log.timestamp} [{log.level}] {name}: {log.message}")

        try:
            print('before run()')
            await run()
            print('after run()')
        except KeyboardInterrupt:
            print("Log streaming interrupted by user.")
n
is
ARTE_PREFECT_API_URL
a constant
str
?
d
yes
n
are you sure there are logs happening 🙂
d
I think so, I can see them in UI
n
i assume you're not actually calling this before your your
async def
?
Copy code
asyncio.run(stream_flow_run_logs(UUID(run_id)))

async def stream_flow_run_logs(flow_run_id: UUID):
also I'd double check for the flow run that you're filtering for here
Copy code
log_filter = LogFilter(
        flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]),
        level=LogFilterLevel(ge_=0),  # 0 = all levels; set to 20 for INFO+
    )
but otherwise im not immediately sure why that'd be empty
d
no I moved the
async def
for demonstration purposes - thanks, I'll keep checking