Niko
09/18/2025, 9:49 PMMarvin
09/18/2025, 9:49 PMMarvin
09/18/2025, 9:58 PMtimeout_seconds
on your flow so long-running runs fail automatically after X. Then alert on failure (via Automations on Cloud or a flow hook).
- This is simple and works anywhere.
- “Watchdog” flow (works in both Cloud and OSS)
- Schedule a small monitoring flow that queries for flow runs that are still not finished after X time, then calls a webhook or sends a Slack notification.
- Subscribe to events stream (advanced)
- Build a lightweight service that consumes Prefect events and alerts when there’s no “flow-run completed” event after X — useful for complex org policies.
Examples
1) Flow timeout + Slack alert (simple, works everywhere)
from datetime import timedelta
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
# one-time setup (or create via the UI)
slack = SlackWebhook(url="<https://hooks.slack.com/services/XXX/YYY/ZZZ>")
slack.save("team-alerts", overwrite=True)
def notify_failure(name, exc):
SlackWebhook.load("team-alerts").notify(
f":rotating_light: Flow {name} failed due to timeout: {exc}"
)
@flow(name="etl", timeout_seconds=3600, on_failure=[notify_failure])
def etl():
...
- Here, if the run hasn’t completed within 60 minutes, it times out, fails, and your failure handler notifies Slack.
- Slack docs: prefect-slack
2) Watchdog flow that checks for overdue runs and calls a webhook/Slack
import anyio
from datetime import datetime, timedelta, timezone
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas import StateType
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterStateType,
FlowRunFilterStartTime,
)
from prefect.blocks.notifications import SlackWebhook
OVERDUE_AFTER = timedelta(minutes=60)
SLACK_BLOCK = "team-alerts" # created earlier
@flow(name="flow-overdue-watchdog")
async def watchdog(flow_name: str | None = None):
cutoff = datetime.now(timezone.utc) - OVERDUE_AFTER
async with get_client() as client:
runs = await client.read_flow_runs(
flow_runs_filter=FlowRunFilter(
state_type=FlowRunFilterStateType(
any_=[StateType.RUNNING, StateType.PENDING, StateType.LATE]
),
start_time=FlowRunFilterStartTime(before_=cutoff),
),
limit=200,
)
if flow_name:
runs = [r for r in runs if r.flow_name == flow_name]
if runs:
slack = SlackWebhook.load(SLACK_BLOCK)
lines = [
f"- {r.name} ({r.id}) state={r.state_type.value} started={r.start_time}"
for r in runs
]
await slack.apublish(
f":alarm_clock: {len(runs)} flow run(s) have not completed within {OVERDUE_AFTER}:\n" +
"\n".join(lines)
)
if __name__ == "__main__":
anyio.run(watchdog)
- Schedule this watchdog every 5–10 minutes with a deployment. It alerts on any run that’s still not finished after your SLA window.
- Swap Slack for a generic webhook by posting with httpx
or by using a Custom Webhook notification block.Marvin
09/18/2025, 9:58 PMprefect block register
if needed.
3) Prefect Cloud Automations (UI-driven)
- In Cloud, create an Automation that fires if a flow run does not complete within your window. Add a “Send notification” action (Slack, Teams, email) or “Call webhook” to POST to an external endpoint.
- If you don’t see a “create automation” option in CLI, that’s expected — creation is currently done in the UI; the CLI can list, inspect, pause, and resume existing automations:
prefect automations ls
prefect automations inspect "<automation name>"
prefect automations pause "<automation name>"
prefect automations resume "<automation name>"
- Docs: Automations and triggers
A few notes and caveats
- Cloud vs OSS: Full-featured Automations (including proactive “absence of event” behavior) are available in Prefect Cloud. On OSS, the watchdog flow pattern is the most flexible approach.
- Choosing the pattern: If your goal is “notify but let the run continue,” use a watchdog or Cloud Automation. If your goal is “hard cap runtime,” use timeout_seconds
and alert on failure.
Would you like me to tailor one of these to your setup? If so:
- Are you on Prefect Cloud or OSS?
- Which notifier do you prefer (Slack, webhook URL, email, PagerDuty, etc.)?
- Do you want to hard-stop long runs or just notify?