<@ULVA73B9P> How can I configure the automation t...
# ask-marvin
m
@Marvin How can I configure the automation trigger to properly match Azure Event Grid webhook events? The event matching doesn't seem to recognize any of the payload fields or headers. Is there a specific syntax or approach for filtering webhook events that come from external sources like Event Grid?
m
thought for 335.3 seconds
Short answer: Automations don’t filter on the raw webhook request. They only see the Prefect event that your webhook template produces. To match fields from Azure Event Grid, first map the headers/body you care about into the event’s name/resource/data in the webhook template, then create an Event trigger that filters on those mapped event fields. What usually trips folks up with Event Grid: - Event Grid POSTs an array of events; a Prefect webhook turns one HTTP request into one Prefect event. Reference the first item with
body[0]
(or fan out upstream if you need one Prefect event per item). - Headers are not available to automations unless you copy them into
event.data
via the webhook template. - There’s no JSONPath/JMESPath in automations; you filter by event name/resource and simple key/value matches on
event.data
, plus a text search filter if needed. A working pattern for Azure Event Grid 1) Configure the webhook template to extract fields - Event Name (constant or dynamic):
Copy code
azure.event_grid
or
Copy code
azure.event_grid.{{ body[0]['eventType'] }}
- Resource Type:
Copy code
azure.event_grid
- Resource ID:
Copy code
{{ body[0]['subject'] }}
- Data (JSON object; copy any headers/body fields you want to filter on):
Copy code
{
  "id": "{{ body[0]['id'] }}",
  "eventType": "{{ body[0]['eventType'] }}",
  "subject": "{{ body[0]['subject'] }}",
  "topic": "{{ body[0]['topic'] }}",
  "eventTime": "{{ body[0]['eventTime'] }}",
  "blobUrl": "{{ body[0]['data']['url'] | default('') }}",
  "aegEventType": "{{ headers['aeg-event-type'] | default('') }}",
  "aegSubscription": "{{ headers['aeg-subscription-name'] | default('') }}"
}
Notes: -
body
is the parsed JSON; Event Grid uses an array, hence
body[0]
. -
headers[...]
is case-insensitive; use the exact names from Event Grid for clarity. - Use
| default('')
to avoid failures if a field is missing. 2) Create the Automation trigger - Trigger on Event - Event name equals
azure.event_grid
(or the dynamic prefix you chose). - Add Data filters for the keys you placed in
data
, for example: -
eventType
equals
Microsoft.Storage.BlobCreated
- If you need subject scoping, either: - Add a Data filter on
subject
equals an exact value, or - Use a Text filter to “contains” part of the subject path (e.g., a container prefix). - If you need to check headers, filter on the
aegEventType
or
aegSubscription
you placed in
data
. Troubleshooting tips - Use the webhook’s Test/Preview in the UI with a sample Event Grid payload to confirm the event that will be emitted—verify your
event.data
has the keys you expect. - Temporarily remove filters and confirm you see the incoming Prefect event in the Events view; then add filters back one by one. - If Event Grid can post batches with multiple items, Prefect will not expand those into multiple events. If you need that, add a tiny proxy (e.g., an Azure Function) to fan out one HTTP POST per event to the Prefect webhook, or adjust your template to pick a specific index (e.g.,
body[0]
). References - Webhooks concept: Prefect Webhooks - Create a webhook (Cloud): How to create a webhook - Event filters API (what you can filter on): Event filters reference If you can share: - A sample Event Grid payload (redacted), - Your webhook template (event name/resource/data), - Your current automation filter setup, I can help pinpoint why the fields aren’t matching and suggest exact templates/filters for your case.
m
Copy code
{
  "id": "068efa68-40a4-7e34-8000-9c207f243f1a",
  "account": "831d4bff-763e-43c6-880a-8399ce235aca",
  "event": "azure.event_grid.blob_created",
  "occurred": "2025-10-15T13:49:56.034Z",
  "payload": {
    "method": "POST",
    "headers": {
      "host": "api.prefect.cloud",
      "content-type": "application/json; charset=utf-8",
      "aeg-subscription-name": "SFTPSMARTPAYTEST",
      "aeg-delivery-count": "0",
      "aeg-metadata-version": "1",
      "aeg-event-type": "Notification",
      "aeg-data-version": "",
      "content-length": "864",
      "x-request-id": "14295f4d-fcff-91e2-bffe-6993296aab63"
    },
    "body": "[{\"topic\":\"/subscriptions/ba54f597-a412-43d9-88b4-2834362ebd08/resourceGroups/SFTP_ROOT_Storage/providers/Microsoft.Storage/storageAccounts/sftprootstoragetest\",\"subject\":\"/blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/TestingSP.csv\",\"eventType\":\"Microsoft.Storage.BlobCreated\",\"id\":\"2740f40d-901e-008e-4bda-3da2b2061360\",\"data\":{\"api\":\"PutBlob\",\"requestId\":\"2740f40d-901e-008e-4bda-3da2b2000000\",\"eTag\":\"0x8DE0BF1B9333FFF\",\"contentType\":\"text/csv\",\"contentLength\":507,\"blobType\":\"BlockBlob\",\"accessTier\":\"Default\",\"url\":\"<https://sftprootstoragetest.blob.core.windows.net/az1-sft-gt3/sftpsmartpay/uploads/TestingSP.csv>\",\"sequencer\":\"0000000000000000000000000001619100000000000030a9\",\"storageDiagnostics\":{\"batchId\":\"93a5c754-0006-00ee-00da-3dde2d000000\"}},\"dataVersion\":\"\",\"metadataVersion\":\"1\",\"eventTime\":\"2025-10-15T13:49:55.5366662Z\"}]",
    "raw_body": "[{\"topic\":\"/subscriptions/ba54f597-a412-43d9-88b4-2834362ebd08/resourceGroups/SFTP_ROOT_Storage/providers/Microsoft.Storage/storageAccounts/sftprootstoragetest\",\"subject\":\"/blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/TestingSP.csv\",\"eventType\":\"Microsoft.Storage.BlobCreated\",\"id\":\"2740f40d-901e-008e-4bda-3da2b2061360\",\"data\":{\"api\":\"PutBlob\",\"requestId\":\"2740f40d-901e-008e-4bda-3da2b2000000\",\"eTag\":\"0x8DE0BF1B9333FFF\",\"contentType\":\"text/csv\",\"contentLength\":507,\"blobType\":\"BlockBlob\",\"accessTier\":\"Default\",\"url\":\"<https://sftprootstoragetest.blob.core.windows.net/az1-sft-gt3/sftpsmartpay/uploads/TestingSP.csv>\",\"sequencer\":\"0000000000000000000000000001619100000000000030a9\",\"storageDiagnostics\":{\"batchId\":\"93a5c754-0006-00ee-00da-3dde2d000000\"}},\"dataVersion\":\"\",\"metadataVersion\":\"1\",\"eventTime\":\"2025-10-15T13:49:55.5366662Z\"}]"
  },
  "received": "2025-10-15T13:49:56.040Z",
  "related": [
    {
      "prefect.resource.id": "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1",
      "prefect.resource.name": "smartpay-event-dynamic",
      "prefect.resource.role": "webhook"
    }
  ],
  "resource": {
    "prefect.resource.id": "azure.storage.blob./blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/TestingSP.csv",
    "prefect.resource.name": "TestingSP.csv",
    "eventType": "Microsoft.Storage.BlobCreated"
  },
  "workspace": "e1d5c674-55bf-4815-b9a2-ebfc7f375545"
}
@Marvin This is the event that i can see when i uploaded a blob in my container, i want the deployment to be triggered as soon as i receive this event, what should be in the json file of my custom trigger? Is the following trigger correct? { "type": "event", "match": {}, "match_related": { "prefect.resource.id": [ "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1" ] }, "after": [], "expect": [], "for_each": [], "posture": "Reactive", "threshold": 1, "within": 60 }
m
thought for 99.5 seconds
Great news: the event you shared is already in the right shape to trigger a deployment. Your trigger JSON will work, but it’s very broad — it will fire for any event that’s “related” to that webhook. I recommend also matching the event name to avoid accidental triggers. What you have (works, but broad)
Copy code
{
  "type": "event",
  "match": {},
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
Safer version (recommended) - Also match the event name emitted by your webhook:
azure.event_grid.blob_created
- Still scope to the specific webhook via
match_related
Copy code
{
  "type": "event",
  "match": {
    "event": [
      "azure.event_grid.blob_created"
    ]
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
Notes - Automations don’t filter on raw webhook headers/body; they filter on the Prefect event fields. Since your webhook template is already producing: - event:
azure.event_grid.blob_created
- related: includes your webhook id - resource: points to the blob path this recommended trigger will match cleanly. - If you later want to restrict to a specific container or path, you have two good options: - Keep the current template and add a text filter (via the UI) for a path substring you expect (e.g.,
az1-sft-gt3/blobs/sftpsmartpay/uploads/
), or - Adjust the webhook template so the resource id is the container (not the full blob) and match that exactly. Refs - Webhooks concept: Prefect Webhooks - Create a webhook (Cloud): How to create a webhook - Event filters reference: Event filters API If you want, paste your current deployment’s trigger block or confirm whether you want to scope by container prefix, and I’ll tailor the exact JSON.
m
the json you shared for the custom trigger is not working, the broader one is working: { "type": "event", "match": {}, "match_related": { "prefect.resource.id": [ "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1" ] }, "after": [], "expect": [], "for_each": [], "posture": "Reactive", "threshold": 1, "within": 60 } @Marvin the above one is working but as you said, it will get triggered for any event i dont want that, but the one you shared later is not working: { "type": "event", "match": { "event": ["azure.event_grid.blob_created"] }, "match_related": { "prefect.resource.id": [ "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1" ] }, "after": [], "expect": [], "for_each": [], "posture": "Reactive", "threshold": 1, "within": 60 } it did not trigger the deployment
m
thought for 171.0 seconds
Thanks for the detail — and you’re right: my earlier JSON used the wrong key for the event name. In Prefect 3, the event name must go in the expect array. The match object is for resource fields, and match_related is for related resources (like your webhook). Use this instead: - Narrow by the event name and your webhook:
Copy code
{
  "type": "event",
  "match": {},
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [
    "azure.event_grid.blob_created"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
- Optionally also scope to your blob path (resource id) to prevent other blob events in the future:
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": [
      "azure.storage.blob./blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/*"
    ]
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [
    "azure.event_grid.blob_created"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
Why your previous attempt didn’t fire - I mistakenly suggested putting the event name under match.event — that field doesn’t exist. Since match only filters the event.resource (not the event name), it didn’t match anything and the trigger never fired. - expect is the correct place to list the event names that should count toward the threshold. A couple of checks - Confirm the webhook template emits event = azure.event_grid.blob_created (your sample shows it does). - If you’ve edited the template to vary the name, you can use a wildcard in expect like ["azure.event_grid.*"]. Docs - Webhooks concept: Prefect Webhooks - Event triggers/filters overview: Event triggers - Filters reference: Event filters API If it still doesn’t trigger with the corrected JSON, please paste your current webhook template’s event name line and I’ll align the expect value exactly.
m
@Marvin I am not using prefect 3, will this still work?
m
thought for 79.0 seconds
Thanks for clarifying — if you’re on Prefect 2 (Cloud), this approach still applies. Automations and their event trigger schema are Cloud-side and essentially the same between 2.x and 3.x. You should: - Put the event name in the expect array - Scope to your webhook with match_related - Optionally scope by resource id to your container/path Given your event sample, try this first with a wildcard to validate event-name matching:
Copy code
{
  "type": "event",
  "match": {},
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [
    "azure.event_grid.*"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
If that triggers, tighten it to the exact name:
Copy code
{
  "type": "event",
  "match": {},
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [
    "azure.event_grid.blob_created"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
Optionally scope to your blob path to avoid future noise:
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": [
      "azure.storage.blob./blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/*"
    ]
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
    ]
  },
  "after": [],
  "expect": [
    "azure.event_grid.blob_created"
  ],
  "for_each": [],
  "posture": "Reactive",
  "threshold": 1,
  "within": 60
}
If the exact-name version still doesn’t fire: - Copy/paste the event name from the Events view to avoid subtle typos/case differences. - Keep the wildcard expect to confirm the trigger wiring works, then narrow. - Verify the webhook template really emits event = azure.event_grid.blob_created for these posts. If you can share a screenshot or snippet of your webhook template’s event name line, I’ll align the expect value precisely. Also, if you’re configuring this as a per-deployment trigger in the Deployment UI (vs a standalone Automation with a “Run deployment” action), let me know — I can tailor the JSON to that screen specifically.
m
perfect that worked, thanks!
@Marvin You have a Prefect flow that runs only when a file is uploaded to an Azure container. You want a daily data-quality (DQ) check that runs at 7 PM — if the flow did not run at all today, you should be notified automatically (email/Slack), without creating a new flow or triggering the existing one again.
m
thought for 378.6 seconds
Short answer: Use a Prefect Cloud Automation with a proactive “expect event” trigger that checks at 7 PM whether your flow had any runs today. If none, the automation sends a Slack/email notification. This does not start any flow runs. A couple of quick questions to tailor it: - Are you on Prefect Cloud or self-hosted Prefect Server? - What timezone should “7 PM” use? - Slack or email (or both) for the notification? If you’re on Prefect Cloud (recommended) This is exactly what Automations are for. Create a proactive Automation that expects at least one flow run event for your flow/deployment by 7 PM. If zero events, it fires a notification. High-level steps in the UI: 1) Go to Automations → New Automation 2) Name it, e.g., “DQ: alert if no runs today by 7 PM” 3) Trigger - Posture: Proactive (Expect events) - Condition: Expect ≥1 of the event “prefect.flow-run.Completed” (or “prefect.flow-run.Created”) filtered to your specific flow/deployment. - Time window: Set it to evaluate at 7 PM in your timezone and look back from midnight (local) to 7 PM. In the Automations UI, use the window controls to define the daily lookback window and local timezone so it evaluates once a day at 7 PM. 4) Action - Send Notification → choose Slack or Email and configure the channel/recipient(s) 5) Save and enable the automation Notes: - This approach does not schedule or trigger your flow. It only sends a notification if the expected event did not happen by the specified time. - Filter by deployment (preferred) rather than just flow name to avoid false positives if you have multiple deployments of the same flow. - Automations are created in the Cloud UI or API. There is no CLI “create” command (confirmed by
prefect automation --help
). Relevant docs: - Event triggers and proactive automations: Event triggers (Prefect 3) - Background on absence-of-event monitoring: Expect the unexpected: automations and observability If you’re on self-hosted Prefect Server (or prefer a code-based fallback) If Automations aren’t available in your environment, the next-lightest option is a tiny “check” job (e.g., a cron or CI job) that runs at 7 PM and queries Prefect for “any runs of Flow/Deployment X since midnight”; if count == 0, it posts to Slack/email. This does not trigger your data flow. Example outline (pseudo-Python):
Copy code
# Run this on a system scheduler at 7 PM local time; it will NOT start any Prefect flow runs.

from datetime import datetime, time
from zoneinfo import ZoneInfo
import os
import httpx

TZ = ZoneInfo("America/New_York")
now = datetime.now(TZ)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)

# Prefect Cloud API URL for your workspace
API_URL = os.environ["PREFECT_API_URL"]  # e.g. <https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>
API_KEY = os.environ["PREFECT_API_KEY"]
DEPLOYMENT_ID = os.environ["DEPLOYMENT_ID"]  # preferred; or filter by flow name

payload = {
  "limit": 1,
  "deployments": {"id": {"any_": [DEPLOYMENT_ID]}},
  "flow_runs": {"start_time": {"after_": start_of_day.isoformat(), "before_": now.isoformat()}}
}
headers = {"Authorization": f"Bearer {API_KEY}"}

r = <http://httpx.post|httpx.post>(f"{API_URL}/flow_runs/filter", json=payload, headers=headers, timeout=30)
r.raise_for_status()
runs = r.json()

if len(runs) == 0:
    # Notify (Slack example via webhook)
    webhook = os.environ["SLACK_WEBHOOK_URL"]
    <http://httpx.post|httpx.post>(webhook, json={"text": f"Alert: No runs for deployment {DEPLOYMENT_ID} today by {now.strftime('%-I:%M %p %Z')}."})
Why this solution matches your constraints - No new flow is created, and the existing flow is not triggered. - You get a single notification at 7 PM if no runs occurred that day. - Works cleanly with event-triggered flows (like your Azure upload-triggered flow).
If you confirm Cloud vs Server, timezone, and notification channel, I can give you exact UI clicks (Cloud) or a ready-to-run script tailored to your workspace.
m
f you’re on Prefect Cloud (recommended) This is exactly what Automations are for. Create a proactive Automation that expects at least one flow run event for your flow/deployment by 7 PM. If zero events, it fires a notification. High-level steps in the UI: 1) Go to Automations → New Automation 2) Name it, e.g., “DQ: alert if no runs today by 7 PM” 3) Trigger - Posture: Proactive (Expect events) - Condition: Expect ≥1 of the event “prefect.flow-run.Completed” (or “prefect.flow-run.Created”) filtered to your specific flow/deployment. - Time window: Set it to evaluate at 7 PM in your timezone and look back from midnight (local) to 7 PM. In the Automations UI, use the window controls to define the daily lookback window and local timezone so it evaluates once a day at 7 PM. 4) Action - Send Notification → choose Slack or Email and configure the channel/recipient(s) 5) Save and enable the automation Notes: - This approach does not schedule or trigger your flow. It only sends a notification if the expected event did not happen by the specified time. - Filter by deployment (preferred) rather than just flow name to avoid false positives if you have multiple deployments of the same flow. - Automations are created in the Cloud UI or API. There is no CLI “create” command (confirmed by
prefect automation --help
). @Marvin i want these steps in detail, how can i setup my trigger's JSON
m
thought for 278.8 seconds
Great—here’s how to set this up in Prefect Cloud with enough detail to get the exact JSON for your trigger. Important notes before we start - This uses a proactive “Expect events” Automation; it will not trigger or create runs. - The simplest, most reliable way to get the exact JSON shape for your workspace is: build it in the UI → export with
prefect automation inspect --json
. You can then keep that JSON in version control or POST it to the API to recreate. - For targeting, filter by Deployment (preferred) so you only alert for the specific pipeline that should run after your Azure upload. Step-by-step in the Prefect Cloud UI 1) Open Automations → New Automation 2) Name: DQ: alert if no runs today by 7 PM 3) Trigger - Posture: Proactive (Expect events) - What to expect: At least 1 event - Event to expect: “prefect.flow-run.Completed” - If your flow can start and then fail quickly, you may prefer “prefect.flow-run.Created” to mean “it started today” rather than “it finished.” Choose based on your intent. - Filter by resource: - Choose “Related resource” → Deployment - Select your specific deployment (e.g., “my-flow/my-deployment”) - Window / evaluation time: - Window: Today 00:00 → Today 19:00 - Evaluate at: 19:00 in your local timezone - Frequency: Daily - Timezone: Set explicitly (e.g., America/New_York) - Result: The trigger will evaluate once at 7 PM daily; if no matching events occurred between midnight and 7 PM local time, it will fire. 4) Action - Send Notification - Choose Slack or Email as configured in Cloud Notifications, and set your recipients. - Message example: - Title: DQ check: no run today by 7 PM - Body: No runs were observed for deployment <your-deployment> between 00:00 and 19:00 <TZ> today. 5) Save and Enable Export the exact JSON (canonical) - After creating, run:
Copy code
prefect automation inspect "DQ: alert if no runs today by 7 PM" --json
- This prints the canonical JSON your workspace uses for this automation, including the trigger, window, timezone, and action wiring. Use this JSON as the source of truth if you want to recreate via API or keep it in VCS. What the JSON will roughly contain (for orientation) - Do not copy this verbatim; use the JSON you export with the inspect command. This is only to show the shape.
Copy code
{
  "name": "DQ: alert if no runs today by 7 PM",
  "description": "Notify if the deployment did not run today by 7 PM local time",
  "enabled": true,
  "posture": "proactive",
  "trigger": {
    "type": "event",
    "expect": {
      "count": 1
    },
    "match": {
      "event": "prefect.flow-run.Completed",
      "related": {
        "prefect.resource.id": "prefect.deployment.<deployment_id>"
      }
    },
    "within": {
      "window": {
        "type": "daily",
        "start": "00:00",
        "end": "19:00",
        "timezone": "America/New_York"
      }
    }
  },
  "actions": [
    {
      "type": "send-notification",
      "recipient": "<your-configured-slack-or-email-destination>",
      "subject": "DQ check: no run today by 7 PM",
      "body": "No runs for deployment <name> between 00:00 and 19:00 America/New_York."
    }
  ]
}
Again: the real field names may differ; rely on the output of
prefect automation inspect --json
. API (if you want to create programmatically) - Create it in the UI once, export JSON with the CLI, then POST that JSON to your workspace’s Automations endpoint to recreate in other environments. The inspect output is the safest payload to use. - Tip: If you prefer not to use Prefect Cloud Notifications and want Slack-only, you can choose the “Call Webhook” action in the UI instead of “Send Notification.” That action payload is simple and posts a JSON body to your Slack Incoming Webhook URL. Testing tips - Temporarily set the evaluation time to a few minutes from now and the window to “last hour” to validate behavior quickly.
- Switch the expected event to “prefect.flow-run.Created” if you only need to know it started at least once. Docs - Event triggers and proactive automations: Event triggers (Prefect 3) - Background: Expect the unexpected: Automations and observability If you share your timezone and whether you want to match on Completed vs Created, I can suggest the exact UI filter settings to use and what to look for in the exported JSON.
m
@Marvin give me the python code to check if the prefect flow/deployment has ran atleast once on that day
m
thought for 295.9 seconds
Here’s a small, reusable helper that checks whether a given deployment has at least one flow run that started “today” (with a configurable timezone). This assumes Prefect 3.x.
Copy code
from datetime import date, datetime, time, timedelta, timezone
from zoneinfo import ZoneInfo

from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStartTime,
    FlowRunFilterDeploymentId,
)

def _utc_bounds_for_day(day: date | None = None, tz: str = "UTC"):
    if day is None:
        day = date.today()
    local_tz = ZoneInfo(tz)
    start_local = datetime.combine(day, time.min).replace(tzinfo=local_tz)
    end_local = start_local + timedelta(days=1)
    return start_local.astimezone(timezone.utc), end_local.astimezone(timezone.utc)

def deployment_ran_on_day(deployment_name: str, day: date | None = None, tz: str = "UTC") -> bool:
    """
    Returns True if the deployment had at least one run that started between
    local midnight and the next midnight for the given day/timezone.
    """
    start_utc, end_utc = _utc_bounds_for_day(day, tz)

    with get_client(sync_client=True) as client:
        # Look up the deployment id from its fully-qualified name: "flow_name/deployment_name"
        deployment = client.read_deployment_by_name(deployment_name)
        if deployment is None:
            raise ValueError(f"Deployment not found: {deployment_name}")

        fr_filter = FlowRunFilter(
            start_time=FlowRunFilterStartTime(after_=start_utc, before_=end_utc),
            deployment_id=FlowRunFilterDeploymentId(any_=[deployment.id]),
        )
        # Limit=1 for a fast existence check
        runs = client.read_flow_runs(flow_run_filter=fr_filter, limit=1)
        return len(runs) > 0

# Examples:
# Check “today” in UTC
print(deployment_ran_on_day("my-flow/my-deployment"))

# Check a specific date in a specific timezone (e.g., US Eastern)
# from datetime import date
# print(deployment_ran_on_day("my-flow/my-deployment", day=date(2025, 10, 16), tz="America/New_York"))
Notes: - This checks actual execution start (
start_time
), not the schedule time. If you need “scheduled today,” swap
FlowRunFilterStartTime
for
FlowRunFilterExpectedStartTime
. - The
deployment_name
should be the fully-qualified name like
"<flow_name>/<deployment_name>"
. - If you already have the
deployment_id
, skip the lookup and pass it to the filter directly. Docs for reference: - API client (Prefect 3) - get_client - Filter classes If you instead want to check “any deployment of a given flow ran today,” say the word and I’ll share a variant that aggregates all deployments for a flow first.