<@ULVA73B9P> Is there any way in prefect 2.18 to s...
# ask-marvin
a
@Marvin Is there any way in prefect 2.18 to send notification in slack if the number of flows in late state increase above 1000?
m
thought for 212.1 seconds
Short answer: In Prefect 2.18 there isn’t a built‑in automation that triggers when the total number of “Late” flow runs exceeds a threshold. Automations in 2.x can notify on per‑run state changes (e.g., “when a flow run becomes Late, send Slack”), but they don’t aggregate counts across runs. You have two practical options: 1) Per-run notifications (built-in) - Create an Automation: trigger “Flow run enters Late” → action “Send Slack notification”. - This will fire once per Late run (so it’s noisy if many runs go Late). - Docs: Automations (Prefect 2.x) 2) Threshold-based monitor (custom flow) - Create a small monitoring flow on a schedule (e.g., every 5 minutes) that: - Uses the Prefect 2 client to list flow runs filtered to state Late - Counts them (paginate if needed) - Sends a single Slack message if the count >= 1000 Example skeleton you can adapt:
Copy code
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
from prefect.client.orchestration import get_client
from prefect.client.schemas import filters

@flow
async def alert_on_late_runs(threshold: int = 1000):
    async with get_client() as client:
        late_filter = filters.FlowRunFilter(
            state=filters.FlowRunFilterState(
                name=filters.FlowRunFilterStateName(any_=["LATE"])
            )
        )

        page_size = 200
        offset = 0
        total = 0
        while True:
            batch = await client.read_flow_runs(
                flow_run_filter=late_filter,
                limit=page_size,
                offset=offset,
            )
            total += len(batch)
            if len(batch) < page_size:
                break
            offset += page_size

    if total >= threshold:
        slack = SlackWebhook.load("my-slack-webhook")  # create/load your block
        await slack.notify(f"Late flow runs = {total} (>= {threshold})")

# Deploy this flow with an Interval or Cron schedule
Helpful references: - Using the Prefect 2 client - SlackWebhook block (Prefect 2.x) - Cloud API rate limits (be mindful if you have very large volumes) Notes: - The filter/type names above reflect typical 2.x imports; depending on your exact 2.18 patch, the filter class names may vary slightly. If you hit an import/enum mismatch, I can help adjust the snippet to your exact version. - If upgrading is an option: Prefect 3 introduces first‑class Metric automations that can trigger on aggregate conditions and send notifications, which may match this use case more directly. Questions to tailor the solution: - Are you on Prefect Cloud 2 or self‑hosted OSS? - How often do you want to check, and do you want “first time over threshold” vs. “every time over threshold” behavior? - Do you already have a Slack Notification block set up, or would you prefer Cloud Automations to send Slack?
a
We are on prefect 2.18 server not cloud. It is over kedro pipeline. Can we easily upgrade it to prefect 3 without much changes?