<@ULVA73B9P> where the flow runs logs are stored o...
# marvin-ai
a
@Marvin where the flow runs logs are stored or written by default on the machine to read them?
m
thought for 141.9 seconds
Short answer: they aren’t written to local files by default. - In Prefect 3.x, flow and task logs are sent to the Prefect API and shown in the UI. If you’re on Prefect Cloud, they’re stored in Cloud. If you’re on self-hosted Prefect Server, they’re stored in your Server’s PostgreSQL database (in the logs table). - Workers and flow processes only emit to stdout/stderr and forward to the API; they do not write to a file unless you configure it. If you want local log files, add a custom logging config: 1) Create a
logging.yml
and put it in
~/.prefect
(your
PREFECT_HOME
) or point to it with
PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yml
. 2) Include a file handler (e.g., RotatingFileHandler). Example:
Copy code
version: 1
formatters:
  standard:
    format: "%(asctime)s | %(levelname)s | %(name)s - %(message)s"
handlers:
  console:
    class: prefect.logging.handlers.PrefectConsoleHandler
    level: INFO
    formatter: standard
  api:
    class: prefect.logging.handlers.APILogHandler
    level: INFO
  file:
    class: logging.handlers.RotatingFileHandler
    level: INFO
    formatter: standard
    filename: /var/log/prefect/flow_runs.log
    maxBytes: 10485760   # 10 MB
    backupCount: 5
loggers:
  prefect:
    level: INFO
    handlers: [console, api, file]
    propagate: False
Notes: - Set
PREFECT_LOGGING_SETTINGS_PATH
to point at this file if you don’t place it in
~/.prefect
. - To stop sending logs to the API (not common), set
PREFECT_LOGGING_TO_API_ENABLED=false
. - Retention: for files, use `RotatingFileHandler`/`TimedRotatingFileHandler`. For self-hosted Server, retention is managed in your database (e.g., cleanup jobs). In Cloud, retention follows your plan. Docs: - Logging customization - Logging-related settings
a
@Marvin can you give me an example of how can I write files segregated by flow run names or with the unique flow id?
m
thought for 95.8 seconds
Absolutely—two common ways: A) Add a per-run file handler in your flow code (simple and explicit) B) Use a custom handler factory in logging.yml (centralized config) Both use Prefect’s runtime context to get the flow run name or ID. A) In-flow setup (per-run file) - Pros: Easiest to read/maintain; full control. - Cons: You must add/remove the handler in your flow code.
Copy code
import logging
import logging.handlers
from pathlib import Path
from prefect import flow, get_run_logger, runtime

def _sanitize(s: str) -> str:
    return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in s)

def add_per_run_file_handler(base_dir="/var/log/prefect/flows", use_name=False):
    # Safe defaults if not running inside a Prefect flow run
    flow_name = getattr(runtime.flow_run, "flow_name", None) or "unknown_flow"
    run_id = getattr(runtime.flow_run, "id", None) or "no_run_id"
    run_name = getattr(runtime.flow_run, "name", None) or None

    safe_flow = _sanitize(flow_name)
    if use_name and run_name:
        filename = f"{safe_flow}__{_sanitize(run_name)}.log"
    else:
        filename = f"{safe_flow}__{run_id}.log"

    out_dir = Path(base_dir) / safe_flow
    out_dir.mkdir(parents=True, exist_ok=True)
    file_path = out_dir / filename

    handler = logging.handlers.RotatingFileHandler(
        file_path, maxBytes=10 * 1024 * 1024, backupCount=5
    )
    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)s | %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)
    handler.setLevel(<http://logging.INFO|logging.INFO>)

    # Attach to the Prefect logger tree so flow/task logs land here
    logger = logging.getLogger("prefect")
    logger.addHandler(handler)
    return handler

def remove_handler(handler):
    logger = logging.getLogger("prefect")
    logger.removeHandler(handler)
    handler.close()

@flow
def my_flow():
    h = add_per_run_file_handler(
        base_dir="/var/log/prefect/flows",
        use_name=False,  # set True to use run name instead of run id
    )
    logger = get_run_logger()
    try:
        <http://logger.info|logger.info>("Hello from my flow!")
        # your flow code here
    finally:
        remove_handler(h)

if __name__ == "__main__":
    my_flow()
B) Centralized logging.yml with a custom handler factory - Pros: No code changes in flows; keep logging policy in one place - Cons: Requires a small helper module available on all run environments 1) Create a helper module, e.g. `my_project/logging_helpers.py`:
Copy code
import logging
import logging.handlers
from pathlib import Path
from prefect import runtime

def _sanitize(s: str) -> str:
    return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in s)

def per_flow_run_file_handler(base_dir="/var/log/prefect/flows", use_name=False):
    flow_name = getattr(runtime.flow_run, "flow_name", None) or "unknown_flow"
    run_id = getattr(runtime.flow_run, "id", None) or "no_run_id"
    run_name = getattr(runtime.flow_run, "name", None)

    safe_flow = _sanitize(flow_name)
    if use_name and run_name:
        filename = f"{safe_flow}__{_sanitize(run_name)}.log"
    else:
        filename = f"{safe_flow}__{run_id}.log"

    out_dir = Path(base_dir) / safe_flow
    out_dir.mkdir(parents=True, exist_ok=True)
    file_path = out_dir / filename

    handler = logging.handlers.RotatingFileHandler(
        file_path, maxBytes=10 * 1024 * 1024, backupCount=5
    )
    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)s | %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)
    handler.setLevel(<http://logging.INFO|logging.INFO>)
    return handler
2) Reference it from your logging.yml (in
~/.prefect
or via
PREFECT_LOGGING_SETTINGS_PATH
): ``` version: 1 formatters: standard: format: "%(asctime)s | %(levelname)s | %(name)s - %(message)s" handlers: console: class: prefect.logging.handlers.PrefectConsoleHandler level: INFO formatter: standard api: class: prefect.logging.handlers.APILogHandler level: INFO
per_run_file: (): my_project.logging_helpers.per_flow_run_file_handler level: INFO base_dir: /var/log/prefect/flows use_name: false # set true to use flow run name loggers: prefect: level: INFO handlers: [console, api, per_run_file] propagate: False ``` Tips and caveats - Flow run name vs ID: Names can contain spaces/slashes; sanitize them before using as filenames (examples above do this). IDs are UUIDs and always safe. - Where files end up: Files are written on the machine/container where the flow actually runs (e.g., a Kubernetes pod, ECS task, or a local worker), not on your laptop unless that’s where the run executes. - Retention: The examples use RotatingFileHandler (10 MB, 5 backups). Adjust to your needs or use TimedRotatingFileHandler. - Outside a flow: If you run code outside a Prefect run, runtime.flow_run attributes may be missing; the examples fall back to safe defaults. Docs - Logging customization - Logging-related settings If you tell me your execution environment (local, Docker, K8s, ECS, etc.), I can tailor the base directory and any permissions/mounts you might need.
a
@Marvin where can I find the info for the flow status for the last 24 hours, whether a flow has succeeded or failed, how can I access that?
m
thought for 630.8 seconds
You’ve got a few options depending on how you want to access it: - UI (Cloud or self-hosted Server) - Go to the Runs page and set the time filter to “Last 24 hours”, then filter by state (Completed, Failed). You can save this as a Saved Search for reuse. - This is the fastest way to eyeball success/failure counts over a time window. - CLI (quick glance, but no time filter) - The CLI doesn’t expose a time-range flag right now. You can filter by state and limit the number of rows:
Copy code
prefect flow-run ls --state COMPLETED --state FAILED --limit 200
- Docs: CLI: flow-run - Python Client (precise last-24h query) - Use filters with the API client to get exactly the last 24 hours and states you care about:
Copy code
import asyncio
  from datetime import datetime, timedelta, timezone

  from prefect.client.orchestration import get_client
  from prefect.client.schemas.filters import (
      FlowRunFilter,
      FlowRunFilterStartTime,
      FlowRunFilterStateName,
      FlowRunFilterName,  # optional if you want to target a specific flow
  )
  # from prefect.client.schemas.sorting import FlowRunSort  # optional

  async def main():
      now = datetime.now(timezone.utc)
      since = now - timedelta(hours=24)

      filters = FlowRunFilter(
          start_time=FlowRunFilterStartTime(after_=since, before_=now),
          state_name=FlowRunFilterStateName(any_=["COMPLETED", "FAILED"]),
          # name=FlowRunFilterName(any_=["my-flow-name"]),  # uncomment to scope to one flow
      )

      async with get_client() as client:
          runs = await client.read_flow_runs(
              limit=500,
              filters=filters,
              # sort=FlowRunSort.START_TIME_DESC,  # optional
          )

      for r in runs:
          # r.state.name will be 'COMPLETED' or 'FAILED'
          print(r.id, r.name, r.state.name, r.start_time, r.end_time)

  asyncio.run(main())
- Docs: - API client overview - Filter schema reference - REST: paginate flow runs If you tell me whether you prefer UI, CLI, or a Python script (and if you want to scope to a specific flow/deployment), I can tailor the exact steps.
a
@Marvin I will be using a python script to do the same.
m
thought for 46.1 seconds
Great—here’s a ready-to-run Prefect 3.x script that lists flow runs from the last 24 hours and shows whether each succeeded or failed. It also prints totals and can optionally scope to a specific flow name. Notes - Authentication: make sure your environment is configured to talk to your workspace (e.g., set
PREFECT_API_URL
and
PREFECT_API_KEY
, or use your Prefect profile). - Timezone: uses UTC.
Copy code
# pip install "prefect>=3"
import asyncio
from datetime import datetime, timedelta, timezone
from collections import Counter, defaultdict

from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStartTime,
    FlowRunFilterStateName,
    FlowRunFilterName,  # optional if scoping by flow name
)
from prefect.client.schemas.sorting import FlowRunSort


async def fetch_last_24h_runs(flow_name: str | None = None):
    now = datetime.now(timezone.utc)
    since = now - timedelta(hours=24)

    filters = FlowRunFilter(
        start_time=FlowRunFilterStartTime(after_=since, before_=now),
        state_name=FlowRunFilterStateName(any_=["COMPLETED", "FAILED"]),
        name=FlowRunFilterName(any_=[flow_name]) if flow_name else None,
    )

    async with get_client() as client:
        runs = await client.read_flow_runs(
            limit=1000,  # increase if needed
            filters=filters,
            sort=FlowRunSort.START_TIME_DESC,
        )
    return runs


async def main(flow_name: str | None = None):
    runs = await fetch_last_24h_runs(flow_name)

    # Summary counts overall
    counts = Counter(r.state.name for r in runs)
    print(f"Last 24h: {len(runs)} runs")
    print(f"Completed: {counts.get('COMPLETED', 0)} | Failed: {counts.get('FAILED', 0)}")

    # Optional: counts per flow
    by_flow = defaultdict(Counter)
    for r in runs:
        # r.flow_name is available on flow run objects
        by_flow[getattr(r, "flow_name", "unknown")][r.state.name] += 1

    if len(by_flow) > 1:
        print("\nPer-flow totals (last 24h):")
        for fname, c in by_flow.items():
            print(f"- {fname}: Completed={c.get('COMPLETED', 0)}, Failed={c.get('FAILED', 0)}")

    # Detailed listing
    print("\nDetailed runs:")
    for r in runs:
        # Attributes commonly used: r.id, r.name, r.flow_name, r.state.name, r.start_time, r.end_time
        print(f"{r.start_time} | {r.state.name:<9} | {getattr(r, 'flow_name', '')} | {r.name} | {r.id}")


if __name__ == "__main__":
    # Optionally pass a flow name to scope results:
    # asyncio.run(main(flow_name="my-flow"))
    asyncio.run(main())
If you’d like this to also include other terminal states (e.g., CRASHED or CANCELLED), just add them to the
any_=
list. Docs for reference: - API client overview: API client overview - Filter schema reference: Filter schema reference - REST: paginate flow runs: Flow runs REST