Fabien Punin
10/16/2025, 4:32 PMMarvin
10/16/2025, 4:32 PMMarvin
10/16/2025, 4:50 PMprefect-sqlalchemyprefect-slackpip install "prefect[sqlalchemy,slack]"from prefect.blocks.system import Secret
from prefect_slack.credentials import SlackWebhook
# Store your DB URL as a Secret block
Secret(value="<postgresql+psycopg2://user:pass@host:5432/dbname>").save("prod-db-url", overwrite=True)
# Store your Slack Incoming Webhook URL
SlackWebhook(url="<https://hooks.slack.com/services/XXX/YYY/ZZZ>").save("alerts-slack", overwrite=True)from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from prefect_sqlalchemy.database import SqlAlchemyConnector
from prefect_slack.credentials import SlackWebhook as SlackCreds
from prefect_slack.messages import send_incoming_webhook_message
@dataclass
class AlertSpec:
    name: str
    sql: str
    cron: str
    parameters: Optional[Dict[str, Any]] = None
@task(retries=2, retry_delay_seconds=30)
def fetch_rows(db_url: str, sql: str, params: Optional[dict]) -> List[tuple]:
    # Use the SqlAlchemyConnector context manager for safe cleanup
    with SqlAlchemyConnector(connection_info=db_url) as sa:
        return sa.fetch_all(operation=sql, parameters=params)
@flow
def sql_alert_flow(
    name: str,
    sql: str,
    params: Optional[dict] = None,
    db_url_secret_block: str = "prod-db-url",
    slack_block_name: str = "alerts-slack",
    preview_rows: int = 5,
):
    logger = get_run_logger()
    # Load secrets/blocks
    db_url = Secret.load(db_url_secret_block).get()
    slack = SlackCreds.load(slack_block_name)
    rows = fetch_rows.submit(db_url=db_url, sql=sql, params=params).result()
    count = len(rows)
    if count > 0:
        # Build a concise Slack message with a small preview
        text = f":rotating_light: Alert '{name}' found {count} row(s).\nSQL:\n```{sql}```"
        if rows:
            # Render a small preview
            preview = rows[:preview_rows]
            # Simple CSV-like preview; adjust formatting as needed
            lines = "\n".join([", ".join(map(str, r)) for r in preview])
            text += f"\nPreview (first {len(preview)}):\n```{lines}```"
        send_incoming_webhook_message(slack_webhook=slack, text=text)
        <http://logger.info|logger.info>("Sent Slack alert for %s (%s rows)", name, count)
    else:
        <http://logger.info|logger.info>("No results for alert %s", name)Marvin
10/16/2025, 4:50 PMHow to run it
- Run the script once to create the deployments:- Start a worker for the target work pool (adjust pool name/type to your infra):LIMITprefect.runtimefetch_many()SqlAlchemyConnectorcount > 0Fabien Punin
10/16/2025, 4:57 PMMarvin
10/16/2025, 4:57 PMMarvin
10/16/2025, 4:58 PMpip install "prefect[sqlalchemy,slack]"from prefect.blocks.system import Secret, JSON
from prefect_slack.credentials import SlackWebhook
# DB connection string
Secret(value="<postgresql+psycopg2://user:pass@host:5432/dbname>").save("prod-db-url", overwrite=True)
# Slack Incoming Webhook URL
SlackWebhook(url="<https://hooks.slack.com/services/XXX/YYY/ZZZ>").save("alerts-slack", overwrite=True)
# Alert config JSON — edit later in the UI without redeploying
JSON(
    value={
        "alerts": [
            {
                "name": "orders-stuck",
                "sql": "SELECT id FROM orders WHERE status='stuck' AND created_at < now() - interval '30 minutes'",
                "parameters": None,
                "enabled": True
            },
            {
                "name": "failed-logins",
                "sql": """
                    SELECT user_id, count(*) AS failures
                    FROM logins
                    WHERE success = false AND ts > now() - interval '5 minutes'
                    GROUP BY 1
                    HAVING count(*) > 20
                """,
                "parameters": None,
                "enabled": True
            }
        ]
    }
).save("alert-config", overwrite=True)Marvin
10/16/2025, 4:58 PMRun it
- Create the deployment:- Start a worker:lookback_minutesfetch_many()