<@ULVA73B9P> I am using windows VM as worker pool ...
# ask-marvin
r
@Marvin I am using windows VM as worker pool and i have API details of prefect cloud workspace. Right now running a batch script through NSSM to keep the worker running and the logs go into a text file. Can i write this automatically into a MSSQL database ? Right now this is encoded character difficult to read.
m
thought for 238.3 seconds
Short answer: yes. Don’t pipe worker stdout to a text file; have Prefect log directly to MSSQL via a custom logging handler and a logging config. That also avoids the “encoded” junk you’re seeing (those are ANSI color codes from the console logger). Here’s a practical setup for Prefect 3.x on Windows/NSSM: What you’ll do - Create a custom Python logging handler that inserts records into MSSQL. - Point Prefect to a dictConfig YAML that uses your handler and a plain (non-color) console formatter. - Set the env var for logging config on your NSSM service (plus your Cloud API env vars). - Optional: remove stdout/stderr file capture in NSSM once DB logging works. 1) Custom MSSQL logging handler Install dependencies on the VM (ODBC driver + pyodbc): - ODBC Driver: Microsoft ODBC Driver for SQL Server 17/18 - Python package:
pyodbc
Create a file on the worker’s PYTHONPATH, e.g. `C:\prefect\prefect_mssql_logging.py`:
Copy code
import logging
import datetime
import threading
import pyodbc

class MSSQLHandler(logging.Handler):
    def __init__(self, connection_string, table="prefect_logs"):
        super().__init__()
        self.connection_string = connection_string
        self.table = table
        self._local = threading.local()
        self._ensure_table()

    def _get_conn(self):
        if getattr(self._local, "conn", None) is None:
            self._local.conn = pyodbc.connect(self.connection_string, autocommit=True)
        return self._local.conn

    def _ensure_table(self):
        try:
            conn = self._get_conn()
            cursor = conn.cursor()
            cursor.execute(f"""
                IF NOT EXISTS (
                  SELECT * FROM sysobjects WHERE name='{self.table}' AND xtype='U'
                )
                CREATE TABLE [{self.table}] (
                    id INT IDENTITY(1,1) PRIMARY KEY,
                    created_at DATETIME2 NOT NULL,
                    level NVARCHAR(20) NOT NULL,
                    logger NVARCHAR(255) NOT NULL,
                    flow_run_id NVARCHAR(50) NULL,
                    task_run_id NVARCHAR(50) NULL,
                    worker NVARCHAR(255) NULL,
                    message NVARCHAR(MAX) NOT NULL
                )
            """)
        except Exception:
            # Swallow table-ensure issues; logging should not crash the worker
            pass

    def emit(self, record):
        try:
            msg = self.format(record)
            conn = self._get_conn()
            cursor = conn.cursor()
            flow_run_id = getattr(record, "flow_run_id", None)
            task_run_id = getattr(record, "task_run_id", None)
            worker = getattr(record, "worker", None)
            cursor.execute(
                f"""
                INSERT INTO [{self.table}]
                    (created_at, level, logger, flow_run_id, task_run_id, worker, message)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                """,
                datetime.datetime.utcfromtimestamp(record.created),
                record.levelname,
                record.name,
                str(flow_run_id) if flow_run_id else None,
                str(task_run_id) if task_run_id else None,
                worker,
                msg,
            )
        except Exception:
            self.handleError(record)
Notes: - This is minimal. In production, consider batching/in-memory queue to reduce DB round-trips, and add retry/backoff. 2) Prefect logging config (YAML) Create `C:\prefect\logging.yml`: ``` version: 1 disable_existing_loggers: false formatters: plain: format: "%(asctime)s | %(levelname)s | %(name)s | %(message)s" json: "()": pythonjsonlogger.jsonlogger.JsonFormatter format: "%(asctime)s %(levelname)s %(name)s %(message)s %(flow_run_id)s %(task_run_id)s" handlers: console: class: logging.StreamHandler level: INFO formatter: plain stream: ext://sys.stdout mssql: class: prefect_mssql_logging.MSSQLHandler level: INFO
formatter: json connection_string: "Driver={ODBC Driver 17 for SQL Server};Server=YOURSERVER;Database=YOURDB;Trusted_Connection=yes;" table: "prefect_logs" loggers: prefect: level: INFO handlers: [console, mssql] propagate: false prefect.workers: level: INFO handlers: [console, mssql] propagate: false root: level: WARNING handlers: [console] ``` - Using
logging.StreamHandler
for console removes color/ANSI sequences. If NSSM still captures stdout to a file, it’ll be readable. - The JSON formatter will make DB rows more consistent and easier to parse. 3) Tell Prefect to use your logging config Set this environment variable in the NSSM service: -
PREFECT_LOGGING_SETTINGS_PATH=C:\prefect\logging.yml
If your build uses a different name, the alternative is
PREFECT_LOGGING_CONFIG_PATH
. Use whichever your version supports. See: - Logging customization (Prefect 3) - Settings reference Also ensure your Prefect Cloud env vars are set on the service: -
PREFECT_API_URL=<https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>>
-
PREFECT_API_KEY=<your_key>
With NSSM, add env vars via AppEnvironmentExtra in the service config. 4) Restart the service After setting env vars and placing the files on PYTHONPATH, restart the NSSM service that runs your worker. Why your current file log looks garbled - Prefect’s console handler uses rich/ANSI styling by default. When stdout is redirected to a file by NSSM, those escape sequences land in the file. Replacing the console handler with a plain formatter (as above) fixes that. Or just stop capturing stdout to a file once your DB handler is working. A few practical tips - If you want both worker logs and flow/task logs in MSSQL, keeping
loggers.prefect
and
loggers.prefect.workers
both pointing to
mssql
covers it. - Write volume can be high. If the DB becomes a bottleneck, switch the handler to buffer and batch inserts. - You already get logs in Prefect Cloud; if you only need central searchability, that might suffice without duplicating to MSSQL. But if MSSQL is a hard requirement, the handler pattern is the right way. Want me to tailor the handler to your exact table schema or include specific Prefect fields you care about? Also, confirm: do you want just worker process logs, flow/task logs, or both?