<@ULVA73B9P> do you have any example or best pract...
# ask-marvin
f
@Marvin do you have any example or best practices for building an alerting / notification system with prefect? Essentially, I would like to define alerts in python as a sql query and a schedule, for prefect to run the queries on schedule and send slack notifications if queries return any results
m
thought for 1098.8 seconds
Absolutely—this is a great fit for Prefect. In Prefect 3.x, the simplest pattern is: - Define each alert as: a SQL query + schedule + (optional) parameters - Run the query on schedule - Send a Slack notification only when results are found Below is a production-ready pattern with best practices and a complete example. It uses: -
prefect-sqlalchemy
for DB access -
prefect-slack
for Slack webhooks - One deployment per alert (so each alert has its own schedule) Key docs: - prefect-sqlalchemy docs - prefect-slack docs - Create deployments - Schedules Install
Copy code
pip install "prefect[sqlalchemy,slack]"
One-time setup for blocks (credentials)
Copy code
from prefect.blocks.system import Secret
from prefect_slack.credentials import SlackWebhook

# Store your DB URL as a Secret block
Secret(value="<postgresql+psycopg2://user:pass@host:5432/dbname>").save("prod-db-url", overwrite=True)

# Store your Slack Incoming Webhook URL
SlackWebhook(url="<https://hooks.slack.com/services/XXX/YYY/ZZZ>").save("alerts-slack", overwrite=True)
Alerting flow (SQL + Slack)
Copy code
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from prefect_sqlalchemy.database import SqlAlchemyConnector
from prefect_slack.credentials import SlackWebhook as SlackCreds
from prefect_slack.messages import send_incoming_webhook_message

@dataclass
class AlertSpec:
    name: str
    sql: str
    cron: str
    parameters: Optional[Dict[str, Any]] = None

@task(retries=2, retry_delay_seconds=30)
def fetch_rows(db_url: str, sql: str, params: Optional[dict]) -> List[tuple]:
    # Use the SqlAlchemyConnector context manager for safe cleanup
    with SqlAlchemyConnector(connection_info=db_url) as sa:
        return sa.fetch_all(operation=sql, parameters=params)

@flow
def sql_alert_flow(
    name: str,
    sql: str,
    params: Optional[dict] = None,
    db_url_secret_block: str = "prod-db-url",
    slack_block_name: str = "alerts-slack",
    preview_rows: int = 5,
):
    logger = get_run_logger()

    # Load secrets/blocks
    db_url = Secret.load(db_url_secret_block).get()
    slack = SlackCreds.load(slack_block_name)

    rows = fetch_rows.submit(db_url=db_url, sql=sql, params=params).result()
    count = len(rows)

    if count > 0:
        # Build a concise Slack message with a small preview
        text = f":rotating_light: Alert '{name}' found {count} row(s).\nSQL:\n```{sql}```"
        if rows:
            # Render a small preview
            preview = rows[:preview_rows]
            # Simple CSV-like preview; adjust formatting as needed
            lines = "\n".join([", ".join(map(str, r)) for r in preview])
            text += f"\nPreview (first {len(preview)}):\n```{lines}```"
        send_incoming_webhook_message(slack_webhook=slack, text=text)
        <http://logger.info|logger.info>("Sent Slack alert for %s (%s rows)", name, count)
    else:
        <http://logger.info|logger.info>("No results for alert %s", name)
Programmatic deployments per alert (each with its own schedule) ``` # Put this at the bottom of the same file, e.g., alerts.py ALERTS = [ AlertSpec( name="orders-stuck", sql="SELECT id FROM orders WHERE status='stuck' AND created_at < now() - interval '30 minutes'", cron="*/10 * * * *", # every 10 minutes ), AlertSpec( name="failed-logins", sql=""" SELECT user_id, count(*) AS failures FROM logins WHERE success = false AND ts > now() - interval '5 minutes' GROUP BY 1 HAVING count(*) > 20 """, cron="*/5 * * * *", # every 5 minutes ), ] if name == "__main__":
WORK_POOL = "default" # or your own work pool name for alert in ALERTS: sql_alert_flow.deploy( name=f"alert-{alert.name}", work_pool_name=WORK_POOL, cron=alert.cron, parameters={ "name": alert.name, "sql": alert.sql, "params": alert.parameters, "db_url_secret_block": "prod-db-url", "slack_block_name": "alerts-slack", "preview_rows": 5, }, tags=["alerts"], )
Copy code
How to run it
- Run the script once to create the deployments:
python alerts.py
Copy code
- Start a worker for the target work pool (adjust pool name/type to your infra):
prefect worker start --pool default ``` - You’ll see scheduled runs for each alert appear in the UI, each posting to Slack when results are found. Notes and best practices - Use parameterized SQL and limit result volume. For example, add time windows or a
LIMIT
in preview queries to avoid flooding Slack. - Keep sensitive info in Prefect blocks (as shown above). - Add alert-specific context in messages (deployment name, flow run link). You can pull runtime info from
prefect.runtime
if desired: runtime context. - For large result sets, consider
fetch_many()
for batching; see
SqlAlchemyConnector
methods in the docs: prefect-sqlalchemy. - If you prefer to decouple notification routing from your flow code, you can emit a custom event when
count > 0
and attach a Prefect Automation to send Slack messages. See Event triggers and Automations and Notification blocks. Would you like this pattern as a single deployment that fans out over a list of alerts on each run, or do you prefer one deployment per alert (as above)? Also, which database are you querying and how often do you expect these to run? I can tailor the SQL connector setup and schedules accordingly.
f
Yes @Marvin anything with a single deployment?
m
thought for 61.1 seconds
Totally—here’s a single-deployment pattern that fans out over many alerts on each run. It keeps all alert definitions in a JSON Block so you can edit alerts in the UI without redeploying. One schedule drives the whole set. Key docs: - prefect-sqlalchemy - prefect-slack - Deployments - Schedules Install
Copy code
pip install "prefect[sqlalchemy,slack]"
One-time: store credentials and alert config as Blocks
Copy code
from prefect.blocks.system import Secret, JSON
from prefect_slack.credentials import SlackWebhook

# DB connection string
Secret(value="<postgresql+psycopg2://user:pass@host:5432/dbname>").save("prod-db-url", overwrite=True)

# Slack Incoming Webhook URL
SlackWebhook(url="<https://hooks.slack.com/services/XXX/YYY/ZZZ>").save("alerts-slack", overwrite=True)

# Alert config JSON — edit later in the UI without redeploying
JSON(
    value={
        "alerts": [
            {
                "name": "orders-stuck",
                "sql": "SELECT id FROM orders WHERE status='stuck' AND created_at < now() - interval '30 minutes'",
                "parameters": None,
                "enabled": True
            },
            {
                "name": "failed-logins",
                "sql": """
                    SELECT user_id, count(*) AS failures
                    FROM logins
                    WHERE success = false AND ts > now() - interval '5 minutes'
                    GROUP BY 1
                    HAVING count(*) > 20
                """,
                "parameters": None,
                "enabled": True
            }
        ]
    }
).save("alert-config", overwrite=True)
Single-deployment flow (fans out over all alerts concurrently) ``` from typing import Optional, Dict, Any, List from prefect import flow, task, get_run_logger from prefect.blocks.system import Secret, JSON from prefect_sqlalchemy.database import SqlAlchemyConnector from prefect_slack.credentials import SlackWebhook as SlackCreds from prefect_slack.messages import send_incoming_webhook_message @task(retries=2, retry_delay_seconds=30) def fetch_rows(db_url: str, sql: str, params: Optional[dict]) -> List[tuple]: with SqlAlchemyConnector(connection_info=db_url) as sa: return sa.fetch_all(operation=sql, parameters=params) @task def send_slack(slack_block_name: str, text: str): slack = SlackCreds.load(slack_block_name) send_incoming_webhook_message(slack_webhook=slack, text=text) @task def run_one_alert( alert: Dict[str, Any], db_url: str, slack_block_name: str, preview_rows: int, ) -> Dict[str, Any]: logger = get_run_logger() if not alert.get("enabled", True): logger.info("Alert %s disabled; skipping", alert.get("name")) return {"name": alert.get("name"), "skipped": True, "count": 0} name = alert["name"] sql = alert["sql"] params = alert.get("parameters") rows = fetch_rows.submit(db_url=db_url, sql=sql, params=params).result() count = len(rows) if count > 0: preview = rows[:preview_rows] lines = "\n".join([", ".join(map(str, r)) for r in preview]) text = ( f"🚨 Alert '{name}' found {count} row(s).\n" f"SQL:\n```{sql}```" ) if preview: text += f"\nPreview (first {len(preview)}):\n```{lines}```" send_slack.submit(slack_block_name=slack_block_name, text=text) logger.info("Sent Slack alert for %s (%s rows)", name, count) else: logger.info("No results for alert %s", name) return {"name": name, "count": count, "skipped": False} @flow def run_all_alerts( config_block: str = "alert-config", db_url_secret_block: str = "prod-db-url",
slack_block_name: str = "alerts-slack", preview_rows: int = 5, ) -> List[Dict[str, Any]]: logger = get_run_logger() config = JSON.load(config_block).value alerts = config.get("alerts", []) if not alerts: logger.warning("No alerts configured in JSON block '%s'", config_block) return [] db_url = Secret.load(db_url_secret_block).get() results_futures = [] for alert in alerts: fut = run_one_alert.submit( alert=alert, db_url=db_url, slack_block_name=slack_block_name, preview_rows=preview_rows, ) results_futures.append(fut) results = [f.result() for f in results_futures] counts = {r["name"]: r["count"] for r in results if not r.get("skipped")} logger.info("Alert run summary: %s", counts) return results if name == "__main__": # Single deployment on a single cadence for all alerts run_all_alerts.deploy( name="alerts-monolith", work_pool_name="default", cron="*/5 * * * *", # run every 5 minutes parameters={ "config_block": "alert-config", "db_url_secret_block": "prod-db-url", "slack_block_name": "alerts-slack", "preview_rows": 5, }, tags=["alerts"], )
Copy code
Run it
- Create the deployment:
python alerts_monolith.py
Copy code
- Start a worker:
prefect worker start --pool default ``` Notes and trade-offs - Single deployment = single schedule for all alerts. If some alerts need different cadences, use multiple deployments or encode logic (e.g., “lookback windows”) per alert. - You can add fields to each alert like
lookback_minutes
and include it in the SQL’s time filter to approximate different cadences while keeping one schedule. - Keep messages concise and add a result preview limit to avoid Slack flooding. - For very large result sets, switch to
fetch_many()
to iterate in batches. - All sensitive info should live in Blocks; update the JSON Block to change alert definitions without redeploying. If you share which database you’re on (Postgres, Snowflake, etc.) and any time window logic you need, I can tailor the connector config and example SQL to match.