<@ULVA73B9P> What are some options for setting up ...
# ask-marvin
n
@Marvin What are some options for setting up alerts and/or webhooks that fire when a flow has not completed over a given window of time?
m
thought for 537.3 seconds
Great question — there are a few solid patterns depending on whether you’re on Prefect Cloud or running open-source Prefect. Quick summary first, then examples. High-level options - Prefect Cloud Automations (recommended on Cloud) - Create an automation that alerts when a flow run doesn’t complete within X time. Use an event/sequence trigger and add a Call Webhook or Send Notification action (Slack, Teams, email, etc.). - Actions support Slack, Teams, PagerDuty, Opsgenie, email (SendGrid), and generic webhooks via notification blocks. - Links: - Automations and triggers - Slack integration - Flow-level timeout + alert - Set
timeout_seconds
on your flow so long-running runs fail automatically after X. Then alert on failure (via Automations on Cloud or a flow hook). - This is simple and works anywhere. - “Watchdog” flow (works in both Cloud and OSS) - Schedule a small monitoring flow that queries for flow runs that are still not finished after X time, then calls a webhook or sends a Slack notification. - Subscribe to events stream (advanced) - Build a lightweight service that consumes Prefect events and alerts when there’s no “flow-run completed” event after X — useful for complex org policies. Examples 1) Flow timeout + Slack alert (simple, works everywhere)
Copy code
from datetime import timedelta
from prefect import flow
from prefect.blocks.notifications import SlackWebhook

# one-time setup (or create via the UI)
slack = SlackWebhook(url="<https://hooks.slack.com/services/XXX/YYY/ZZZ>")
slack.save("team-alerts", overwrite=True)

def notify_failure(name, exc):
    SlackWebhook.load("team-alerts").notify(
        f":rotating_light: Flow {name} failed due to timeout: {exc}"
    )

@flow(name="etl", timeout_seconds=3600, on_failure=[notify_failure])
def etl():
    ...
- Here, if the run hasn’t completed within 60 minutes, it times out, fails, and your failure handler notifies Slack. - Slack docs: prefect-slack 2) Watchdog flow that checks for overdue runs and calls a webhook/Slack
Copy code
import anyio
from datetime import datetime, timedelta, timezone
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas import StateType
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStateType,
    FlowRunFilterStartTime,
)
from prefect.blocks.notifications import SlackWebhook

OVERDUE_AFTER = timedelta(minutes=60)
SLACK_BLOCK = "team-alerts"   # created earlier

@flow(name="flow-overdue-watchdog")
async def watchdog(flow_name: str | None = None):
    cutoff = datetime.now(timezone.utc) - OVERDUE_AFTER
    async with get_client() as client:
        runs = await client.read_flow_runs(
            flow_runs_filter=FlowRunFilter(
                state_type=FlowRunFilterStateType(
                    any_=[StateType.RUNNING, StateType.PENDING, StateType.LATE]
                ),
                start_time=FlowRunFilterStartTime(before_=cutoff),
            ),
            limit=200,
        )

    if flow_name:
        runs = [r for r in runs if r.flow_name == flow_name]

    if runs:
        slack = SlackWebhook.load(SLACK_BLOCK)
        lines = [
            f"- {r.name} ({r.id}) state={r.state_type.value} started={r.start_time}"
            for r in runs
        ]
        await slack.apublish(
            f":alarm_clock: {len(runs)} flow run(s) have not completed within {OVERDUE_AFTER}:\n" +
            "\n".join(lines)
        )

if __name__ == "__main__":
    anyio.run(watchdog)
- Schedule this watchdog every 5–10 minutes with a deployment. It alerts on any run that’s still not finished after your SLA window. - Swap Slack for a generic webhook by posting with
httpx
or by using a Custom Webhook notification block.
- Notification block reference (Slack, Teams, PagerDuty, Opsgenie, email): you’ll see these in the Prefect UI under Blocks, or via
prefect block register
if needed. 3) Prefect Cloud Automations (UI-driven) - In Cloud, create an Automation that fires if a flow run does not complete within your window. Add a “Send notification” action (Slack, Teams, email) or “Call webhook” to POST to an external endpoint. - If you don’t see a “create automation” option in CLI, that’s expected — creation is currently done in the UI; the CLI can list, inspect, pause, and resume existing automations:
Copy code
prefect automations ls
  prefect automations inspect "<automation name>"
  prefect automations pause "<automation name>"
  prefect automations resume "<automation name>"
- Docs: Automations and triggers A few notes and caveats - Cloud vs OSS: Full-featured Automations (including proactive “absence of event” behavior) are available in Prefect Cloud. On OSS, the watchdog flow pattern is the most flexible approach. - Choosing the pattern: If your goal is “notify but let the run continue,” use a watchdog or Cloud Automation. If your goal is “hard cap runtime,” use
timeout_seconds
and alert on failure. Would you like me to tailor one of these to your setup? If so: - Are you on Prefect Cloud or OSS? - Which notifier do you prefer (Slack, webhook URL, email, PagerDuty, etc.)? - Do you want to hard-stop long runs or just notify?