Mugdha Sonare
10/15/2025, 1:16 PMMarvin
10/15/2025, 1:17 PMMarvin
10/15/2025, 1:22 PMbody[0] (or fan out upstream if you need one Prefect event per item).
- Headers are not available to automations unless you copy them into event.data via the webhook template.
- There’s no JSONPath/JMESPath in automations; you filter by event name/resource and simple key/value matches on event.data, plus a text search filter if needed.
A working pattern for Azure Event Grid
1) Configure the webhook template to extract fields
- Event Name (constant or dynamic):
azure.event_grid
or
azure.event_grid.{{ body[0]['eventType'] }}
- Resource Type:
azure.event_grid
- Resource ID:
{{ body[0]['subject'] }}
- Data (JSON object; copy any headers/body fields you want to filter on):
{
"id": "{{ body[0]['id'] }}",
"eventType": "{{ body[0]['eventType'] }}",
"subject": "{{ body[0]['subject'] }}",
"topic": "{{ body[0]['topic'] }}",
"eventTime": "{{ body[0]['eventTime'] }}",
"blobUrl": "{{ body[0]['data']['url'] | default('') }}",
"aegEventType": "{{ headers['aeg-event-type'] | default('') }}",
"aegSubscription": "{{ headers['aeg-subscription-name'] | default('') }}"
}
Notes:
- body is the parsed JSON; Event Grid uses an array, hence body[0].
- headers[...] is case-insensitive; use the exact names from Event Grid for clarity.
- Use | default('') to avoid failures if a field is missing.
2) Create the Automation trigger
- Trigger on Event
- Event name equals azure.event_grid (or the dynamic prefix you chose).
- Add Data filters for the keys you placed in data, for example:
- eventType equals Microsoft.Storage.BlobCreated
- If you need subject scoping, either:
- Add a Data filter on subject equals an exact value, or
- Use a Text filter to “contains” part of the subject path (e.g., a container prefix).
- If you need to check headers, filter on the aegEventType or aegSubscription you placed in data.
Troubleshooting tips
- Use the webhook’s Test/Preview in the UI with a sample Event Grid payload to confirm the event that will be emitted—verify your event.data has the keys you expect.
- Temporarily remove filters and confirm you see the incoming Prefect event in the Events view; then add filters back one by one.
- If Event Grid can post batches with multiple items, Prefect will not expand those into multiple events. If you need that, add a tiny proxy (e.g., an Azure Function) to fan out one HTTP POST per event to the Prefect webhook, or adjust your template to pick a specific index (e.g., body[0]).
References
- Webhooks concept: Prefect Webhooks
- Create a webhook (Cloud): How to create a webhook
- Event filters API (what you can filter on): Event filters reference
If you can share:
- A sample Event Grid payload (redacted),
- Your webhook template (event name/resource/data),
- Your current automation filter setup,
I can help pinpoint why the fields aren’t matching and suggest exact templates/filters for your case.Mugdha Sonare
10/15/2025, 1:55 PM{
"id": "068efa68-40a4-7e34-8000-9c207f243f1a",
"account": "831d4bff-763e-43c6-880a-8399ce235aca",
"event": "azure.event_grid.blob_created",
"occurred": "2025-10-15T13:49:56.034Z",
"payload": {
"method": "POST",
"headers": {
"host": "api.prefect.cloud",
"content-type": "application/json; charset=utf-8",
"aeg-subscription-name": "SFTPSMARTPAYTEST",
"aeg-delivery-count": "0",
"aeg-metadata-version": "1",
"aeg-event-type": "Notification",
"aeg-data-version": "",
"content-length": "864",
"x-request-id": "14295f4d-fcff-91e2-bffe-6993296aab63"
},
"body": "[{\"topic\":\"/subscriptions/ba54f597-a412-43d9-88b4-2834362ebd08/resourceGroups/SFTP_ROOT_Storage/providers/Microsoft.Storage/storageAccounts/sftprootstoragetest\",\"subject\":\"/blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/TestingSP.csv\",\"eventType\":\"Microsoft.Storage.BlobCreated\",\"id\":\"2740f40d-901e-008e-4bda-3da2b2061360\",\"data\":{\"api\":\"PutBlob\",\"requestId\":\"2740f40d-901e-008e-4bda-3da2b2000000\",\"eTag\":\"0x8DE0BF1B9333FFF\",\"contentType\":\"text/csv\",\"contentLength\":507,\"blobType\":\"BlockBlob\",\"accessTier\":\"Default\",\"url\":\"<https://sftprootstoragetest.blob.core.windows.net/az1-sft-gt3/sftpsmartpay/uploads/TestingSP.csv>\",\"sequencer\":\"0000000000000000000000000001619100000000000030a9\",\"storageDiagnostics\":{\"batchId\":\"93a5c754-0006-00ee-00da-3dde2d000000\"}},\"dataVersion\":\"\",\"metadataVersion\":\"1\",\"eventTime\":\"2025-10-15T13:49:55.5366662Z\"}]",
"raw_body": "[{\"topic\":\"/subscriptions/ba54f597-a412-43d9-88b4-2834362ebd08/resourceGroups/SFTP_ROOT_Storage/providers/Microsoft.Storage/storageAccounts/sftprootstoragetest\",\"subject\":\"/blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/TestingSP.csv\",\"eventType\":\"Microsoft.Storage.BlobCreated\",\"id\":\"2740f40d-901e-008e-4bda-3da2b2061360\",\"data\":{\"api\":\"PutBlob\",\"requestId\":\"2740f40d-901e-008e-4bda-3da2b2000000\",\"eTag\":\"0x8DE0BF1B9333FFF\",\"contentType\":\"text/csv\",\"contentLength\":507,\"blobType\":\"BlockBlob\",\"accessTier\":\"Default\",\"url\":\"<https://sftprootstoragetest.blob.core.windows.net/az1-sft-gt3/sftpsmartpay/uploads/TestingSP.csv>\",\"sequencer\":\"0000000000000000000000000001619100000000000030a9\",\"storageDiagnostics\":{\"batchId\":\"93a5c754-0006-00ee-00da-3dde2d000000\"}},\"dataVersion\":\"\",\"metadataVersion\":\"1\",\"eventTime\":\"2025-10-15T13:49:55.5366662Z\"}]"
},
"received": "2025-10-15T13:49:56.040Z",
"related": [
{
"prefect.resource.id": "prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1",
"prefect.resource.name": "smartpay-event-dynamic",
"prefect.resource.role": "webhook"
}
],
"resource": {
"prefect.resource.id": "azure.storage.blob./blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/TestingSP.csv",
"prefect.resource.name": "TestingSP.csv",
"eventType": "Microsoft.Storage.BlobCreated"
},
"workspace": "e1d5c674-55bf-4815-b9a2-ebfc7f375545"
}
@Marvin
This is the event that i can see when i uploaded a blob in my container, i want the deployment to be triggered as soon as i receive this event, what should be in the json file of my custom trigger?
Is the following trigger correct?
{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}Marvin
10/15/2025, 1:55 PMMarvin
10/15/2025, 1:57 PM{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
Safer version (recommended)
- Also match the event name emitted by your webhook: azure.event_grid.blob_created
- Still scope to the specific webhook via match_related
{
"type": "event",
"match": {
"event": [
"azure.event_grid.blob_created"
]
},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
Notes
- Automations don’t filter on raw webhook headers/body; they filter on the Prefect event fields. Since your webhook template is already producing:
- event: azure.event_grid.blob_created
- related: includes your webhook id
- resource: points to the blob path
this recommended trigger will match cleanly.
- If you later want to restrict to a specific container or path, you have two good options:
- Keep the current template and add a text filter (via the UI) for a path substring you expect (e.g., az1-sft-gt3/blobs/sftpsmartpay/uploads/), or
- Adjust the webhook template so the resource id is the container (not the full blob) and match that exactly.
Refs
- Webhooks concept: Prefect Webhooks
- Create a webhook (Cloud): How to create a webhook
- Event filters reference: Event filters API
If you want, paste your current deployment’s trigger block or confirm whether you want to scope by container prefix, and I’ll tailor the exact JSON.Mugdha Sonare
10/15/2025, 3:16 PMMarvin
10/15/2025, 3:16 PMMarvin
10/15/2025, 3:19 PM{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [
"azure.event_grid.blob_created"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
- Optionally also scope to your blob path (resource id) to prevent other blob events in the future:
{
"type": "event",
"match": {
"prefect.resource.id": [
"azure.storage.blob./blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/*"
]
},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [
"azure.event_grid.blob_created"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
Why your previous attempt didn’t fire
- I mistakenly suggested putting the event name under match.event — that field doesn’t exist. Since match only filters the event.resource (not the event name), it didn’t match anything and the trigger never fired.
- expect is the correct place to list the event names that should count toward the threshold.
A couple of checks
- Confirm the webhook template emits event = azure.event_grid.blob_created (your sample shows it does).
- If you’ve edited the template to vary the name, you can use a wildcard in expect like ["azure.event_grid.*"].
Docs
- Webhooks concept: Prefect Webhooks
- Event triggers/filters overview: Event triggers
- Filters reference: Event filters API
If it still doesn’t trigger with the corrected JSON, please paste your current webhook template’s event name line and I’ll align the expect value exactly.Mugdha Sonare
10/15/2025, 3:24 PMMarvin
10/15/2025, 3:24 PMMarvin
10/15/2025, 3:25 PM{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [
"azure.event_grid.*"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
If that triggers, tighten it to the exact name:
{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [
"azure.event_grid.blob_created"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
Optionally scope to your blob path to avoid future noise:
{
"type": "event",
"match": {
"prefect.resource.id": [
"azure.storage.blob./blobServices/default/containers/az1-sft-gt3/blobs/sftpsmartpay/uploads/*"
]
},
"match_related": {
"prefect.resource.id": [
"prefect-cloud.webhook.8f5d2893-956e-460c-8fd3-4f368700ddc1"
]
},
"after": [],
"expect": [
"azure.event_grid.blob_created"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 60
}
If the exact-name version still doesn’t fire:
- Copy/paste the event name from the Events view to avoid subtle typos/case differences.
- Keep the wildcard expect to confirm the trigger wiring works, then narrow.
- Verify the webhook template really emits event = azure.event_grid.blob_created for these posts.
If you can share a screenshot or snippet of your webhook template’s event name line, I’ll align the expect value precisely. Also, if you’re configuring this as a per-deployment trigger in the Deployment UI (vs a standalone Automation with a “Run deployment” action), let me know — I can tailor the JSON to that screen specifically.Mugdha Sonare
10/15/2025, 3:28 PMMugdha Sonare
10/16/2025, 1:44 PMMarvin
10/16/2025, 1:44 PMMarvin
10/16/2025, 1:50 PMprefect automation --help).
Relevant docs:
- Event triggers and proactive automations: Event triggers (Prefect 3)
- Background on absence-of-event monitoring: Expect the unexpected: automations and observability
If you’re on self-hosted Prefect Server (or prefer a code-based fallback)
If Automations aren’t available in your environment, the next-lightest option is a tiny “check” job (e.g., a cron or CI job) that runs at 7 PM and queries Prefect for “any runs of Flow/Deployment X since midnight”; if count == 0, it posts to Slack/email. This does not trigger your data flow.
Example outline (pseudo-Python):
# Run this on a system scheduler at 7 PM local time; it will NOT start any Prefect flow runs.
from datetime import datetime, time
from zoneinfo import ZoneInfo
import os
import httpx
TZ = ZoneInfo("America/New_York")
now = datetime.now(TZ)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Prefect Cloud API URL for your workspace
API_URL = os.environ["PREFECT_API_URL"] # e.g. <https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>
API_KEY = os.environ["PREFECT_API_KEY"]
DEPLOYMENT_ID = os.environ["DEPLOYMENT_ID"] # preferred; or filter by flow name
payload = {
"limit": 1,
"deployments": {"id": {"any_": [DEPLOYMENT_ID]}},
"flow_runs": {"start_time": {"after_": start_of_day.isoformat(), "before_": now.isoformat()}}
}
headers = {"Authorization": f"Bearer {API_KEY}"}
r = <http://httpx.post|httpx.post>(f"{API_URL}/flow_runs/filter", json=payload, headers=headers, timeout=30)
r.raise_for_status()
runs = r.json()
if len(runs) == 0:
# Notify (Slack example via webhook)
webhook = os.environ["SLACK_WEBHOOK_URL"]
<http://httpx.post|httpx.post>(webhook, json={"text": f"Alert: No runs for deployment {DEPLOYMENT_ID} today by {now.strftime('%-I:%M %p %Z')}."})
Why this solution matches your constraints
- No new flow is created, and the existing flow is not triggered.
- You get a single notification at 7 PM if no runs occurred that day.
- Works cleanly with event-triggered flows (like your Azure upload-triggered flow).Marvin
10/16/2025, 1:50 PMMugdha Sonare
10/16/2025, 1:52 PMprefect automation --help).
@Marvin i want these steps in detail, how can i setup my trigger's JSONMarvin
10/16/2025, 1:55 PMMarvin
10/16/2025, 2:00 PMprefect automation inspect --json. You can then keep that JSON in version control or POST it to the API to recreate.
- For targeting, filter by Deployment (preferred) so you only alert for the specific pipeline that should run after your Azure upload.
Step-by-step in the Prefect Cloud UI
1) Open Automations → New Automation
2) Name: DQ: alert if no runs today by 7 PM
3) Trigger
- Posture: Proactive (Expect events)
- What to expect: At least 1 event
- Event to expect: “prefect.flow-run.Completed”
- If your flow can start and then fail quickly, you may prefer “prefect.flow-run.Created” to mean “it started today” rather than “it finished.” Choose based on your intent.
- Filter by resource:
- Choose “Related resource” → Deployment
- Select your specific deployment (e.g., “my-flow/my-deployment”)
- Window / evaluation time:
- Window: Today 00:00 → Today 19:00
- Evaluate at: 19:00 in your local timezone
- Frequency: Daily
- Timezone: Set explicitly (e.g., America/New_York)
- Result: The trigger will evaluate once at 7 PM daily; if no matching events occurred between midnight and 7 PM local time, it will fire.
4) Action
- Send Notification
- Choose Slack or Email as configured in Cloud Notifications, and set your recipients.
- Message example:
- Title: DQ check: no run today by 7 PM
- Body: No runs were observed for deployment <your-deployment> between 00:00 and 19:00 <TZ> today.
5) Save and Enable
Export the exact JSON (canonical)
- After creating, run:
prefect automation inspect "DQ: alert if no runs today by 7 PM" --json
- This prints the canonical JSON your workspace uses for this automation, including the trigger, window, timezone, and action wiring. Use this JSON as the source of truth if you want to recreate via API or keep it in VCS.
What the JSON will roughly contain (for orientation)
- Do not copy this verbatim; use the JSON you export with the inspect command. This is only to show the shape.
{
"name": "DQ: alert if no runs today by 7 PM",
"description": "Notify if the deployment did not run today by 7 PM local time",
"enabled": true,
"posture": "proactive",
"trigger": {
"type": "event",
"expect": {
"count": 1
},
"match": {
"event": "prefect.flow-run.Completed",
"related": {
"prefect.resource.id": "prefect.deployment.<deployment_id>"
}
},
"within": {
"window": {
"type": "daily",
"start": "00:00",
"end": "19:00",
"timezone": "America/New_York"
}
}
},
"actions": [
{
"type": "send-notification",
"recipient": "<your-configured-slack-or-email-destination>",
"subject": "DQ check: no run today by 7 PM",
"body": "No runs for deployment <name> between 00:00 and 19:00 America/New_York."
}
]
}
Again: the real field names may differ; rely on the output of prefect automation inspect --json.
API (if you want to create programmatically)
- Create it in the UI once, export JSON with the CLI, then POST that JSON to your workspace’s Automations endpoint to recreate in other environments. The inspect output is the safest payload to use.
- Tip: If you prefer not to use Prefect Cloud Notifications and want Slack-only, you can choose the “Call Webhook” action in the UI instead of “Send Notification.” That action payload is simple and posts a JSON body to your Slack Incoming Webhook URL.
Testing tips
- Temporarily set the evaluation time to a few minutes from now and the window to “last hour” to validate behavior quickly.Marvin
10/16/2025, 2:00 PMMugdha Sonare
10/16/2025, 5:24 PMMarvin
10/16/2025, 5:25 PMMarvin
10/16/2025, 5:29 PMfrom datetime import date, datetime, time, timedelta, timezone
from zoneinfo import ZoneInfo
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterStartTime,
FlowRunFilterDeploymentId,
)
def _utc_bounds_for_day(day: date | None = None, tz: str = "UTC"):
if day is None:
day = date.today()
local_tz = ZoneInfo(tz)
start_local = datetime.combine(day, time.min).replace(tzinfo=local_tz)
end_local = start_local + timedelta(days=1)
return start_local.astimezone(timezone.utc), end_local.astimezone(timezone.utc)
def deployment_ran_on_day(deployment_name: str, day: date | None = None, tz: str = "UTC") -> bool:
"""
Returns True if the deployment had at least one run that started between
local midnight and the next midnight for the given day/timezone.
"""
start_utc, end_utc = _utc_bounds_for_day(day, tz)
with get_client(sync_client=True) as client:
# Look up the deployment id from its fully-qualified name: "flow_name/deployment_name"
deployment = client.read_deployment_by_name(deployment_name)
if deployment is None:
raise ValueError(f"Deployment not found: {deployment_name}")
fr_filter = FlowRunFilter(
start_time=FlowRunFilterStartTime(after_=start_utc, before_=end_utc),
deployment_id=FlowRunFilterDeploymentId(any_=[deployment.id]),
)
# Limit=1 for a fast existence check
runs = client.read_flow_runs(flow_run_filter=fr_filter, limit=1)
return len(runs) > 0
# Examples:
# Check “today” in UTC
print(deployment_ran_on_day("my-flow/my-deployment"))
# Check a specific date in a specific timezone (e.g., US Eastern)
# from datetime import date
# print(deployment_ran_on_day("my-flow/my-deployment", day=date(2025, 10, 16), tz="America/New_York"))
Notes:
- This checks actual execution start (start_time), not the schedule time. If you need “scheduled today,” swap FlowRunFilterStartTime for FlowRunFilterExpectedStartTime.
- The deployment_name should be the fully-qualified name like "<flow_name>/<deployment_name>".
- If you already have the deployment_id, skip the lookup and pass it to the filter directly.
Docs for reference:
- API client (Prefect 3)
- get_client
- Filter classes
If you instead want to check “any deployment of a given flow ran today,” say the word and I’ll share a variant that aggregates all deployments for a flow first.