Rama Chandar
08/15/2025, 10:08 AMMarvin
08/15/2025, 10:08 AMMarvin
08/15/2025, 10:12 AMpyodbc
Create a file on the worker’s PYTHONPATH, e.g. `C:\prefect\prefect_mssql_logging.py`:
import logging
import datetime
import threading
import pyodbc
class MSSQLHandler(logging.Handler):
def __init__(self, connection_string, table="prefect_logs"):
super().__init__()
self.connection_string = connection_string
self.table = table
self._local = threading.local()
self._ensure_table()
def _get_conn(self):
if getattr(self._local, "conn", None) is None:
self._local.conn = pyodbc.connect(self.connection_string, autocommit=True)
return self._local.conn
def _ensure_table(self):
try:
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute(f"""
IF NOT EXISTS (
SELECT * FROM sysobjects WHERE name='{self.table}' AND xtype='U'
)
CREATE TABLE [{self.table}] (
id INT IDENTITY(1,1) PRIMARY KEY,
created_at DATETIME2 NOT NULL,
level NVARCHAR(20) NOT NULL,
logger NVARCHAR(255) NOT NULL,
flow_run_id NVARCHAR(50) NULL,
task_run_id NVARCHAR(50) NULL,
worker NVARCHAR(255) NULL,
message NVARCHAR(MAX) NOT NULL
)
""")
except Exception:
# Swallow table-ensure issues; logging should not crash the worker
pass
def emit(self, record):
try:
msg = self.format(record)
conn = self._get_conn()
cursor = conn.cursor()
flow_run_id = getattr(record, "flow_run_id", None)
task_run_id = getattr(record, "task_run_id", None)
worker = getattr(record, "worker", None)
cursor.execute(
f"""
INSERT INTO [{self.table}]
(created_at, level, logger, flow_run_id, task_run_id, worker, message)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
datetime.datetime.utcfromtimestamp(record.created),
record.levelname,
record.name,
str(flow_run_id) if flow_run_id else None,
str(task_run_id) if task_run_id else None,
worker,
msg,
)
except Exception:
self.handleError(record)
Notes:
- This is minimal. In production, consider batching/in-memory queue to reduce DB round-trips, and add retry/backoff.
2) Prefect logging config (YAML)
Create `C:\prefect\logging.yml`:
```
version: 1
disable_existing_loggers: false
formatters:
plain:
format: "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
json:
"()": pythonjsonlogger.jsonlogger.JsonFormatter
format: "%(asctime)s %(levelname)s %(name)s %(message)s %(flow_run_id)s %(task_run_id)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: plain
stream: ext://sys.stdout
mssql:
class: prefect_mssql_logging.MSSQLHandler
level: INFOMarvin
08/15/2025, 10:12 AMlogging.StreamHandler
for console removes color/ANSI sequences. If NSSM still captures stdout to a file, it’ll be readable.
- The JSON formatter will make DB rows more consistent and easier to parse.
3) Tell Prefect to use your logging config
Set this environment variable in the NSSM service:
- PREFECT_LOGGING_SETTINGS_PATH=C:\prefect\logging.yml
If your build uses a different name, the alternative is PREFECT_LOGGING_CONFIG_PATH
. Use whichever your version supports. See:
- Logging customization (Prefect 3)
- Settings reference
Also ensure your Prefect Cloud env vars are set on the service:
- PREFECT_API_URL=<https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>>
- PREFECT_API_KEY=<your_key>
With NSSM, add env vars via AppEnvironmentExtra in the service config.
4) Restart the service
After setting env vars and placing the files on PYTHONPATH, restart the NSSM service that runs your worker.
Why your current file log looks garbled
- Prefect’s console handler uses rich/ANSI styling by default. When stdout is redirected to a file by NSSM, those escape sequences land in the file. Replacing the console handler with a plain formatter (as above) fixes that. Or just stop capturing stdout to a file once your DB handler is working.
A few practical tips
- If you want both worker logs and flow/task logs in MSSQL, keeping loggers.prefect
and loggers.prefect.workers
both pointing to mssql
covers it.
- Write volume can be high. If the DB becomes a bottleneck, switch the handler to buffer and batch inserts.
- You already get logs in Prefect Cloud; if you only need central searchability, that might suffice without duplicating to MSSQL. But if MSSQL is a hard requirement, the handler pattern is the right way.
Want me to tailor the handler to your exact table schema or include specific Prefect fields you care about? Also, confirm: do you want just worker process logs, flow/task logs, or both?