Abuzar Shakikh
11/06/2025, 2:00 PMMarvin
11/06/2025, 2:00 PMMarvin
11/06/2025, 2:02 PMlogging.yml and put it in ~/.prefect (your PREFECT_HOME) or point to it with PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yml.
2) Include a file handler (e.g., RotatingFileHandler). Example:
version: 1
formatters:
standard:
format: "%(asctime)s | %(levelname)s | %(name)s - %(message)s"
handlers:
console:
class: prefect.logging.handlers.PrefectConsoleHandler
level: INFO
formatter: standard
api:
class: prefect.logging.handlers.APILogHandler
level: INFO
file:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: standard
filename: /var/log/prefect/flow_runs.log
maxBytes: 10485760 # 10 MB
backupCount: 5
loggers:
prefect:
level: INFO
handlers: [console, api, file]
propagate: False
Notes:
- Set PREFECT_LOGGING_SETTINGS_PATH to point at this file if you don’t place it in ~/.prefect.
- To stop sending logs to the API (not common), set PREFECT_LOGGING_TO_API_ENABLED=false.
- Retention: for files, use `RotatingFileHandler`/`TimedRotatingFileHandler`. For self-hosted Server, retention is managed in your database (e.g., cleanup jobs). In Cloud, retention follows your plan.
Docs:
- Logging customization
- Logging-related settingsAbuzar Shakikh
11/06/2025, 2:05 PMMarvin
11/06/2025, 2:05 PMMarvin
11/06/2025, 2:07 PMimport logging
import logging.handlers
from pathlib import Path
from prefect import flow, get_run_logger, runtime
def _sanitize(s: str) -> str:
return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in s)
def add_per_run_file_handler(base_dir="/var/log/prefect/flows", use_name=False):
# Safe defaults if not running inside a Prefect flow run
flow_name = getattr(runtime.flow_run, "flow_name", None) or "unknown_flow"
run_id = getattr(runtime.flow_run, "id", None) or "no_run_id"
run_name = getattr(runtime.flow_run, "name", None) or None
safe_flow = _sanitize(flow_name)
if use_name and run_name:
filename = f"{safe_flow}__{_sanitize(run_name)}.log"
else:
filename = f"{safe_flow}__{run_id}.log"
out_dir = Path(base_dir) / safe_flow
out_dir.mkdir(parents=True, exist_ok=True)
file_path = out_dir / filename
handler = logging.handlers.RotatingFileHandler(
file_path, maxBytes=10 * 1024 * 1024, backupCount=5
)
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s - %(message)s"
)
handler.setFormatter(formatter)
handler.setLevel(<http://logging.INFO|logging.INFO>)
# Attach to the Prefect logger tree so flow/task logs land here
logger = logging.getLogger("prefect")
logger.addHandler(handler)
return handler
def remove_handler(handler):
logger = logging.getLogger("prefect")
logger.removeHandler(handler)
handler.close()
@flow
def my_flow():
h = add_per_run_file_handler(
base_dir="/var/log/prefect/flows",
use_name=False, # set True to use run name instead of run id
)
logger = get_run_logger()
try:
<http://logger.info|logger.info>("Hello from my flow!")
# your flow code here
finally:
remove_handler(h)
if __name__ == "__main__":
my_flow()
B) Centralized logging.yml with a custom handler factory
- Pros: No code changes in flows; keep logging policy in one place
- Cons: Requires a small helper module available on all run environments
1) Create a helper module, e.g. `my_project/logging_helpers.py`:
import logging
import logging.handlers
from pathlib import Path
from prefect import runtime
def _sanitize(s: str) -> str:
return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in s)
def per_flow_run_file_handler(base_dir="/var/log/prefect/flows", use_name=False):
flow_name = getattr(runtime.flow_run, "flow_name", None) or "unknown_flow"
run_id = getattr(runtime.flow_run, "id", None) or "no_run_id"
run_name = getattr(runtime.flow_run, "name", None)
safe_flow = _sanitize(flow_name)
if use_name and run_name:
filename = f"{safe_flow}__{_sanitize(run_name)}.log"
else:
filename = f"{safe_flow}__{run_id}.log"
out_dir = Path(base_dir) / safe_flow
out_dir.mkdir(parents=True, exist_ok=True)
file_path = out_dir / filename
handler = logging.handlers.RotatingFileHandler(
file_path, maxBytes=10 * 1024 * 1024, backupCount=5
)
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s - %(message)s"
)
handler.setFormatter(formatter)
handler.setLevel(<http://logging.INFO|logging.INFO>)
return handler
2) Reference it from your logging.yml (in ~/.prefect or via PREFECT_LOGGING_SETTINGS_PATH):
```
version: 1
formatters:
standard:
format: "%(asctime)s | %(levelname)s | %(name)s - %(message)s"
handlers:
console:
class: prefect.logging.handlers.PrefectConsoleHandler
level: INFO
formatter: standard
api:
class: prefect.logging.handlers.APILogHandler
level: INFOMarvin
11/06/2025, 2:07 PMAbuzar Shakikh
11/06/2025, 2:12 PMMarvin
11/06/2025, 2:12 PMMarvin
11/06/2025, 2:23 PMprefect flow-run ls --state COMPLETED --state FAILED --limit 200
- Docs: CLI: flow-run
- Python Client (precise last-24h query)
- Use filters with the API client to get exactly the last 24 hours and states you care about:
import asyncio
from datetime import datetime, timedelta, timezone
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterStartTime,
FlowRunFilterStateName,
FlowRunFilterName, # optional if you want to target a specific flow
)
# from prefect.client.schemas.sorting import FlowRunSort # optional
async def main():
now = datetime.now(timezone.utc)
since = now - timedelta(hours=24)
filters = FlowRunFilter(
start_time=FlowRunFilterStartTime(after_=since, before_=now),
state_name=FlowRunFilterStateName(any_=["COMPLETED", "FAILED"]),
# name=FlowRunFilterName(any_=["my-flow-name"]), # uncomment to scope to one flow
)
async with get_client() as client:
runs = await client.read_flow_runs(
limit=500,
filters=filters,
# sort=FlowRunSort.START_TIME_DESC, # optional
)
for r in runs:
# r.state.name will be 'COMPLETED' or 'FAILED'
print(r.id, r.name, r.state.name, r.start_time, r.end_time)
asyncio.run(main())
- Docs:
- API client overview
- Filter schema reference
- REST: paginate flow runs
If you tell me whether you prefer UI, CLI, or a Python script (and if you want to scope to a specific flow/deployment), I can tailor the exact steps.Abuzar Shakikh
11/06/2025, 2:24 PMMarvin
11/06/2025, 2:24 PMMarvin
11/06/2025, 2:25 PMPREFECT_API_URL and PREFECT_API_KEY, or use your Prefect profile).
- Timezone: uses UTC.
# pip install "prefect>=3"
import asyncio
from datetime import datetime, timedelta, timezone
from collections import Counter, defaultdict
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterStartTime,
FlowRunFilterStateName,
FlowRunFilterName, # optional if scoping by flow name
)
from prefect.client.schemas.sorting import FlowRunSort
async def fetch_last_24h_runs(flow_name: str | None = None):
now = datetime.now(timezone.utc)
since = now - timedelta(hours=24)
filters = FlowRunFilter(
start_time=FlowRunFilterStartTime(after_=since, before_=now),
state_name=FlowRunFilterStateName(any_=["COMPLETED", "FAILED"]),
name=FlowRunFilterName(any_=[flow_name]) if flow_name else None,
)
async with get_client() as client:
runs = await client.read_flow_runs(
limit=1000, # increase if needed
filters=filters,
sort=FlowRunSort.START_TIME_DESC,
)
return runs
async def main(flow_name: str | None = None):
runs = await fetch_last_24h_runs(flow_name)
# Summary counts overall
counts = Counter(r.state.name for r in runs)
print(f"Last 24h: {len(runs)} runs")
print(f"Completed: {counts.get('COMPLETED', 0)} | Failed: {counts.get('FAILED', 0)}")
# Optional: counts per flow
by_flow = defaultdict(Counter)
for r in runs:
# r.flow_name is available on flow run objects
by_flow[getattr(r, "flow_name", "unknown")][r.state.name] += 1
if len(by_flow) > 1:
print("\nPer-flow totals (last 24h):")
for fname, c in by_flow.items():
print(f"- {fname}: Completed={c.get('COMPLETED', 0)}, Failed={c.get('FAILED', 0)}")
# Detailed listing
print("\nDetailed runs:")
for r in runs:
# Attributes commonly used: r.id, r.name, r.flow_name, r.state.name, r.start_time, r.end_time
print(f"{r.start_time} | {r.state.name:<9} | {getattr(r, 'flow_name', '')} | {r.name} | {r.id}")
if __name__ == "__main__":
# Optionally pass a flow name to scope results:
# asyncio.run(main(flow_name="my-flow"))
asyncio.run(main())
If you’d like this to also include other terminal states (e.g., CRASHED or CANCELLED), just add them to the any_= list.
Docs for reference:
- API client overview: API client overview
- Filter schema reference: Filter schema reference
- REST: paginate flow runs: Flow runs REST