ivo
08/11/2025, 2:33 PMCloudEventSchemaV1_0
(can’t change schema).
• Prefect Webhook:
◦ URL: <https://api.prefect.cloud/hooks/><hook-id>
◦ Template: {{ body|from_cloud_event(headers) }}
◦ No service account auth for now.
Attempts & Results
1️⃣ Using CloudEvents schema (required by Graph)
az eventgrid partner topic event-subscription create \
--resource-group mailwatcher-rg \
--partner-topic-name company-watcher \
--name mailwatcher-to-prefect \
--endpoint "<https://api.prefect.cloud/hooks/><hook-id>" \
--event-delivery-schema cloudeventschemav1_0
Error:
URL validation handshake failed
Http OPTIONS request → 500 InternalServerError from Prefect
This is Azure’s initial webhook validation handshake — Prefect returned 500.
--event-delivery-schema eventgridschema
Error:
InvalidRequest: EventGridSchema cannot be used with topic's input schema CloudEventSchemaV1_0
Graph API partner topics only support CloudEvents v1.0 → can’t use EventGridSchema.
Root Cause
• Graph API partner topics = CloudEvents v1.0 only.
• Prefect webhook appears not to handle CloudEvents v1.0 validation handshake (OPTIONS + validationCode).
• No alternative schema option.
Need from Prefect
• Support CloudEvents v1.0 validation for Azure Event Grid webhooks, or
• Provide a documented proxy/workaround that bridges CloudEvents → Prefect.
Impact
Currently can’t connect Graph API Partner Topic → Prefect directly due to handshake failure.Marvin
08/11/2025, 2:34 PMivo
08/11/2025, 2:39 PMivo
08/11/2025, 2:40 PMMarvin
08/11/2025, 2:40 PMMarvin
08/11/2025, 2:42 PMfrom_cloud_event(headers)
filter isn’t documented in Prefect 3.x; there isn’t a built-in CloudEvents-to-Prefect filter right now.
Relevant Prefect docs
- Prefect 3.x webhooks concept
- Create a webhook
- Jinja filters reference (no CloudEvents filter listed)
For reference on Azure’s handshake (so you can see what Prefect isn’t responding to):
- Event Grid webhook validation with CloudEvents
What you can do now
Option A — Tiny proxy that speaks the handshake and forwards to Prefect
- Put a minimal HTTP endpoint in front (Azure Function, Logic App, or API Management) that:
- Responds to OPTIONS by echoing WebHook-Allowed-Origin and, if present, calls the WebHook-Request-Callback URL.
- On POST, transforms the CloudEvent(s) into a Prefect event JSON and forwards to your Prefect webhook.
- In Prefect, set your webhook template to simply:
{{ body }}
so the proxy can send a fully formed Prefect event.
Example: Azure Function (Python) to bridge Event Grid → Prefect webhook
import os
import json
import requests
import azure.functions as func
PREFECT_WEBHOOK_URL = os.environ["PREFECT_WEBHOOK_URL"]
def _handle_handshake(req: func.HttpRequest) -> func.HttpResponse:
origin = req.headers.get('WebHook-Request-Origin')
callback = req.headers.get('WebHook-Request-Callback')
headers = {}
if origin:
headers['WebHook-Allowed-Origin'] = origin
# Optionally: headers['WebHook-Allowed-Rate'] = '500' # per second
if callback:
try:
requests.get(callback, timeout=5)
except Exception:
# Best effort per docs; you can log the exception if desired
pass
return func.HttpResponse(status_code=200, headers=headers)
def _as_list(body):
return body if isinstance(body, list) else [body]
def _cloud_event_to_prefect_event(ce: dict) -> dict:
# Map CloudEvent to Prefect event
event_name = ce.get('type') or 'azure.eventgrid.event'
resource_id = ce.get('subject') or ce.get('source') or ce.get('id') or 'azure.eventgrid.unknown'
return {
"event": event_name,
"resource": {"prefect.resource.id": f"azure.eventgrid::{resource_id}"},
"payload": ce # keep original CloudEvent in payload for downstream use
}
def main(req: func.HttpRequest) -> func.HttpResponse:
if req.method == 'OPTIONS':
return _handle_handshake(req)
if req.method == 'POST':
try:
body = req.get_json()
except ValueError:
body = {}
events = _as_list(body)
for ce in events:
evt = _cloud_event_to_prefect_event(ce)
r = <http://requests.post|requests.post>(PREFECT_WEBHOOK_URL, json=evt, timeout=10)
r.raise_for_status()
# 202 means accepted by the proxy
return func.HttpResponse(status_code=202)
return func.HttpResponse(status_code=405)
- In Azure, set PREFECT_WEBHOOK_URL
to your Prefect webhook URL.
- In Prefect, set the webhook Template to {{ body }}
so the proxy’s JSON is accepted as-is.
- Minimal Prefect event shape you must produce:
```
{
"event": "some.event.name",
"resource": { "prefect.resource.id": "some-stable-id" },Marvin
08/11/2025, 2:42 PMSee: <https://docs-3.prefect.io/v3/concepts/webhooks|webhook event shape>
Option B — Use Prefect Events API with auth from the proxy
- If you’re okay with a service account, the proxy can send directly to the Prefect Events API with a SA token instead of going through a webhook. This eliminates the webhook template entirely and gives you retries you control. Happy to sketch that if you want it.
About your current template
- `{{ body|from_cloud_event(headers) }}` isn’t a supported filter in Prefect 3.x. If you keep a direct integration later, you’ll want a template that extracts fields directly:
{
"event": "{{ body.type or headers['ce-type'] }}",
"resource": { "prefect.resource.id": "azure.eventgrid::{{ body.subject or body.source or body.id }}" },
"payload": {{ body | tojson }}
}
```
But you’ll only reach this if Prefect supports the handshake first, so the proxy route is the practical fix today.
Would you like me to open a GitHub feature request on your behalf to add CloudEvents v1.0 WebHook handshake support for Prefect webhooks? I can include your repro details and error. If you prefer to file it yourself: create an issue.
If you want, I can also tailor the proxy mapping to the exact Graph event types you care about and suggest event/resource naming conventions for your Automations.Marvin
08/11/2025, 2:44 PM{{ body|from_cloud_event(headers) }}
template is fine once the POSTs actually arrive.
What Prefect does and doesn’t do right now
- Prefect 3.x webhooks don’t implement the CloudEvents webhook validation (OPTIONS) that Event Grid requires. I can’t find any official support docs or examples indicating otherwise.
- Once you get POST deliveries through, Prefect’s from_cloud_event(headers)
filter will parse CloudEvents (structured/binary) from the body/headers into a Prefect event payload.
- Docs for context: Prefect 3.x webhooks and Create a webhook.
Recommended workaround: a 30‑line proxy
Handle OPTIONS by echoing Webhook-Request-Origin
in Webhook-Allowed-Origin
and return 200; forward POSTs to Prefect unchanged so your Jinja filter can do its thing. Here’s a minimal Azure Functions v4 (Python) example:
# requirements.txt
azure-functions
httpx==0.27.*
# main.py
import logging
import os
import httpx
import azure.functions as func
PREFECT_WEBHOOK_URL = os.environ["PREFECT_WEBHOOK_URL"]
# Optional: set PREFECT_AUTH_HEADER to "Authorization: Bearer <token>" if you enable webhook auth later.
PREFECT_AUTH_HEADER = os.environ.get("PREFECT_AUTH_HEADER")
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="prefect-eg-proxy", methods=["OPTIONS", "POST"])
def prefect_eventgrid_proxy(req: func.HttpRequest) -> func.HttpResponse:
method = req.method.upper()
if method == "OPTIONS":
origin = req.headers.get("WebHook-Request-Origin") or req.headers.get("Webhook-Request-Origin")
headers = {}
if origin:
headers["WebHook-Allowed-Origin"] = origin
# Optional: rate hint. Safe to omit.
headers["WebHook-Allowed-Rate"] = "120"
return func.HttpResponse(status_code=200, headers=headers)
# Forward CloudEvents POST to Prefect
try:
forward_headers = {k: v for k, v in req.headers.items()
if k.lower() not in {"host", "content-length"}}
if PREFECT_AUTH_HEADER:
# Ensure the Prefect auth header is set/overridden when needed
name, value = PREFECT_AUTH_HEADER.split(":", 1)
forward_headers[name.strip()] = value.strip()
with httpx.Client(timeout=15) as client:
resp = <http://client.post|client.post>(
PREFECT_WEBHOOK_URL,
content=req.get_body(),
headers=forward_headers,
)
return func.HttpResponse(status_code=resp.status_code, body=resp.content)
except Exception as e:
logging.exception("Failed to forward to Prefect")
return func.HttpResponse(status_code=500, body=str(e))
If you prefer no Functions, API Management can do this inline with a policy:
<policies>
<inbound>
<choose>
<when condition="@(context.Request.Method == "OPTIONS" && context.Request.Headers.ContainsKey("Webhook-Request-Origin"))">
<set-header name="WebHook-Allowed-Origin" exists-action="override">
<value>@(context.Request.Headers.GetValueOrDefault("Webhook-Request-Origin"))</value>
</set-header>
<return-response>
<set-status code="200" reason="OK" />
</return-response>
</when>
</choose>
<set-backend-service base-url="https://<your-prefect-webhook-url>" />
</inbound>
<backend />
<outbound />
<on-error />
</policies>
How to test quickly
- OPTIONS handshake:
curl -i -X OPTIONS https://<your-proxy>/prefect-eg-proxy \
-H "Webhook-Request-Origin: <http://eventgrid.azure.net|eventgrid.azure.net>"
Marvin
08/11/2025, 2:44 PMWebhook-Allowed-Origin: <http://eventgrid.azure.net|eventgrid.azure.net>
.
- Then point your Event Grid subscription (CloudEvents schema) at the proxy URL. POST deliveries should flow to Prefect and your template {{ body|from_cloud_event(headers) }}
will parse them.
Notes on the Jinja filter
- Keep the CloudEvents body and headers intact. The filter uses both to normalize into Prefect’s event shape. It works for single events and (if Event Grid batches) arrays of CloudEvents.
- If/when you enable Prefect webhook auth, inject the Authorization header in the proxy before forwarding (environment variable shown above).
If you want, I can sketch Bicep/Terraform for the Function + Event Grid subscription wiring. Also, if you’d like this supported natively by Prefect, I can open a feature request; here’s the form: Open a Prefect issue.
References
- Prefect 3.x webhooks
- Prefect Cloud: create a webhook
- Azure Event Grid webhook validation (handshake)
- CloudEvents WebHook spec
Want the proxy in Node/Express instead of Python, or do you prefer APIM/Logic Apps?