Abrar Hussain
09/18/2025, 4:58 PMMarvin
09/18/2025, 4:58 PMMarvin
09/18/2025, 5:01 PMfrom prefect import flow
from prefect.blocks.notifications import SlackWebhook
from prefect.client.orchestration import get_client
from prefect.client.schemas import filters
@flow
async def alert_on_late_runs(threshold: int = 1000):
async with get_client() as client:
late_filter = filters.FlowRunFilter(
state=filters.FlowRunFilterState(
name=filters.FlowRunFilterStateName(any_=["LATE"])
)
)
page_size = 200
offset = 0
total = 0
while True:
batch = await client.read_flow_runs(
flow_run_filter=late_filter,
limit=page_size,
offset=offset,
)
total += len(batch)
if len(batch) < page_size:
break
offset += page_size
if total >= threshold:
slack = SlackWebhook.load("my-slack-webhook") # create/load your block
await slack.notify(f"Late flow runs = {total} (>= {threshold})")
# Deploy this flow with an Interval or Cron schedule
Helpful references:
- Using the Prefect 2 client
- SlackWebhook block (Prefect 2.x)
- Cloud API rate limits (be mindful if you have very large volumes)
Notes:
- The filter/type names above reflect typical 2.x imports; depending on your exact 2.18 patch, the filter class names may vary slightly. If you hit an import/enum mismatch, I can help adjust the snippet to your exact version.
- If upgrading is an option: Prefect 3 introduces first‑class Metric automations that can trigger on aggregate conditions and send notifications, which may match this use case more directly.
Questions to tailor the solution:
- Are you on Prefect Cloud 2 or self‑hosted OSS?
- How often do you want to check, and do you want “first time over threshold” vs. “every time over threshold” behavior?
- Do you already have a Slack Notification block set up, or would you prefer Cloud Automations to send Slack?Abrar Hussain
09/18/2025, 5:23 PM