hey <@ULVA73B9P> i set this up in my azure but whe...
# ask-marvin
i
hey @Marvin i set this up in my azure but when trying to add the end point to my create event subscription still failing heres a summary Context Trying to integrate Microsoft Graph API events → Azure Event Grid Partner Topic → Prefect Cloud webhook. Setup DonePartner Topic: company-watcher (Microsoft Graph API) ◦ Forced to use
CloudEventSchemaV1_0
(can’t change schema). • Prefect Webhook: ◦ URL:
<https://api.prefect.cloud/hooks/><hook-id>
◦ Template:
{{ body|from_cloud_event(headers) }}
◦ No service account auth for now. Attempts & Results 1️⃣ Using CloudEvents schema (required by Graph)
Copy code
az eventgrid partner topic event-subscription create \
  --resource-group mailwatcher-rg \
  --partner-topic-name company-watcher \
  --name mailwatcher-to-prefect \
  --endpoint "<https://api.prefect.cloud/hooks/><hook-id>" \
  --event-delivery-schema cloudeventschemav1_0
Error:
Copy code
URL validation handshake failed
Http OPTIONS request → 500 InternalServerError from Prefect
This is Azure’s initial webhook validation handshake — Prefect returned 500.
Copy code
--event-delivery-schema eventgridschema
Error:
Copy code
InvalidRequest: EventGridSchema cannot be used with topic's input schema CloudEventSchemaV1_0
Graph API partner topics only support CloudEvents v1.0 → can’t use EventGridSchema. Root Cause • Graph API partner topics = CloudEvents v1.0 only. • Prefect webhook appears not to handle CloudEvents v1.0 validation handshake (OPTIONS + validationCode). • No alternative schema option. Need from Prefect • Support CloudEvents v1.0 validation for Azure Event Grid webhooks, or • Provide a documented proxy/workaround that bridges CloudEvents → Prefect. Impact Currently can’t connect Graph API Partner Topic → Prefect directly due to handshake failure.
m
thought for 477.9 seconds
i
?
@Marvin so what do you think?
m
thought for 232.7 seconds
Thanks for the crisp write-up. You’re not imagining things. Summary - Your diagnosis is correct: Azure Event Grid partner topics for Microsoft Graph only deliver CloudEvents v1.0 and require the WebHook handshake. - Prefect Cloud webhooks today do not implement the CloudEvents WebHook validation handshake (OPTIONS with WebHook-Request-Origin and optional WebHook-Request-Callback), which is why Azure’s subscription validation fails with a 500 from Prefect. - Also, that
from_cloud_event(headers)
filter isn’t documented in Prefect 3.x; there isn’t a built-in CloudEvents-to-Prefect filter right now. Relevant Prefect docs - Prefect 3.x webhooks concept - Create a webhook - Jinja filters reference (no CloudEvents filter listed) For reference on Azure’s handshake (so you can see what Prefect isn’t responding to): - Event Grid webhook validation with CloudEvents What you can do now Option A — Tiny proxy that speaks the handshake and forwards to Prefect - Put a minimal HTTP endpoint in front (Azure Function, Logic App, or API Management) that: - Responds to OPTIONS by echoing WebHook-Allowed-Origin and, if present, calls the WebHook-Request-Callback URL. - On POST, transforms the CloudEvent(s) into a Prefect event JSON and forwards to your Prefect webhook. - In Prefect, set your webhook template to simply:
Copy code
{{ body }}
so the proxy can send a fully formed Prefect event. Example: Azure Function (Python) to bridge Event Grid → Prefect webhook
Copy code
import os
import json
import requests
import azure.functions as func

PREFECT_WEBHOOK_URL = os.environ["PREFECT_WEBHOOK_URL"]

def _handle_handshake(req: func.HttpRequest) -> func.HttpResponse:
    origin = req.headers.get('WebHook-Request-Origin')
    callback = req.headers.get('WebHook-Request-Callback')
    headers = {}
    if origin:
        headers['WebHook-Allowed-Origin'] = origin
    # Optionally: headers['WebHook-Allowed-Rate'] = '500'  # per second
    if callback:
        try:
            requests.get(callback, timeout=5)
        except Exception:
            # Best effort per docs; you can log the exception if desired
            pass
    return func.HttpResponse(status_code=200, headers=headers)

def _as_list(body):
    return body if isinstance(body, list) else [body]

def _cloud_event_to_prefect_event(ce: dict) -> dict:
    # Map CloudEvent to Prefect event
    event_name = ce.get('type') or 'azure.eventgrid.event'
    resource_id = ce.get('subject') or ce.get('source') or ce.get('id') or 'azure.eventgrid.unknown'
    return {
        "event": event_name,
        "resource": {"prefect.resource.id": f"azure.eventgrid::{resource_id}"},
        "payload": ce  # keep original CloudEvent in payload for downstream use
    }

def main(req: func.HttpRequest) -> func.HttpResponse:
    if req.method == 'OPTIONS':
        return _handle_handshake(req)

    if req.method == 'POST':
        try:
            body = req.get_json()
        except ValueError:
            body = {}
        events = _as_list(body)
        for ce in events:
            evt = _cloud_event_to_prefect_event(ce)
            r = <http://requests.post|requests.post>(PREFECT_WEBHOOK_URL, json=evt, timeout=10)
            r.raise_for_status()
        # 202 means accepted by the proxy
        return func.HttpResponse(status_code=202)

    return func.HttpResponse(status_code=405)
- In Azure, set
PREFECT_WEBHOOK_URL
to your Prefect webhook URL. - In Prefect, set the webhook Template to
{{ body }}
so the proxy’s JSON is accepted as-is. - Minimal Prefect event shape you must produce: ``` { "event": "some.event.name", "resource": { "prefect.resource.id": "some-stable-id" },
"payload": { ... optional ... } }
Copy code
See: <https://docs-3.prefect.io/v3/concepts/webhooks|webhook event shape>

Option B — Use Prefect Events API with auth from the proxy
- If you’re okay with a service account, the proxy can send directly to the Prefect Events API with a SA token instead of going through a webhook. This eliminates the webhook template entirely and gives you retries you control. Happy to sketch that if you want it.

About your current template
- `{{ body|from_cloud_event(headers) }}` isn’t a supported filter in Prefect 3.x. If you keep a direct integration later, you’ll want a template that extracts fields directly:
{ "event": "{{ body.type or headers['ce-type'] }}", "resource": { "prefect.resource.id": "azure.eventgrid::{{ body.subject or body.source or body.id }}" }, "payload": {{ body | tojson }} } ``` But you’ll only reach this if Prefect supports the handshake first, so the proxy route is the practical fix today. Would you like me to open a GitHub feature request on your behalf to add CloudEvents v1.0 WebHook handshake support for Prefect webhooks? I can include your repro details and error. If you prefer to file it yourself: create an issue. If you want, I can also tailor the proxy mapping to the exact Graph event types you care about and suggest event/resource naming conventions for your Automations.
Short version: Prefect shrugs at CloudEvents’ OPTIONS handshake. Azure Event Grid insists on it. Result: 500 and no subscription. You’ll need a tiny proxy that politely answers the handshake, then forwards POST deliveries to your Prefect webhook unchanged. Your
{{ body|from_cloud_event(headers) }}
template is fine once the POSTs actually arrive. What Prefect does and doesn’t do right now - Prefect 3.x webhooks don’t implement the CloudEvents webhook validation (OPTIONS) that Event Grid requires. I can’t find any official support docs or examples indicating otherwise. - Once you get POST deliveries through, Prefect’s
from_cloud_event(headers)
filter will parse CloudEvents (structured/binary) from the body/headers into a Prefect event payload. - Docs for context: Prefect 3.x webhooks and Create a webhook. Recommended workaround: a 30‑line proxy Handle OPTIONS by echoing
Webhook-Request-Origin
in
Webhook-Allowed-Origin
and return 200; forward POSTs to Prefect unchanged so your Jinja filter can do its thing. Here’s a minimal Azure Functions v4 (Python) example:
Copy code
# requirements.txt
azure-functions
httpx==0.27.*

# main.py
import logging
import os
import httpx
import azure.functions as func

PREFECT_WEBHOOK_URL = os.environ["PREFECT_WEBHOOK_URL"]
# Optional: set PREFECT_AUTH_HEADER to "Authorization: Bearer <token>" if you enable webhook auth later.
PREFECT_AUTH_HEADER = os.environ.get("PREFECT_AUTH_HEADER")

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="prefect-eg-proxy", methods=["OPTIONS", "POST"])
def prefect_eventgrid_proxy(req: func.HttpRequest) -> func.HttpResponse:
    method = req.method.upper()

    if method == "OPTIONS":
        origin = req.headers.get("WebHook-Request-Origin") or req.headers.get("Webhook-Request-Origin")
        headers = {}
        if origin:
            headers["WebHook-Allowed-Origin"] = origin
        # Optional: rate hint. Safe to omit.
        headers["WebHook-Allowed-Rate"] = "120"
        return func.HttpResponse(status_code=200, headers=headers)

    # Forward CloudEvents POST to Prefect
    try:
        forward_headers = {k: v for k, v in req.headers.items()
                           if k.lower() not in {"host", "content-length"}}
        if PREFECT_AUTH_HEADER:
            # Ensure the Prefect auth header is set/overridden when needed
            name, value = PREFECT_AUTH_HEADER.split(":", 1)
            forward_headers[name.strip()] = value.strip()

        with httpx.Client(timeout=15) as client:
            resp = <http://client.post|client.post>(
                PREFECT_WEBHOOK_URL,
                content=req.get_body(),
                headers=forward_headers,
            )
        return func.HttpResponse(status_code=resp.status_code, body=resp.content)
    except Exception as e:
        logging.exception("Failed to forward to Prefect")
        return func.HttpResponse(status_code=500, body=str(e))
If you prefer no Functions, API Management can do this inline with a policy:
Copy code
<policies>
  <inbound>
    <choose>
      <when condition="@(context.Request.Method == "OPTIONS" && context.Request.Headers.ContainsKey("Webhook-Request-Origin"))">
        <set-header name="WebHook-Allowed-Origin" exists-action="override">
          <value>@(context.Request.Headers.GetValueOrDefault("Webhook-Request-Origin"))</value>
        </set-header>
        <return-response>
          <set-status code="200" reason="OK" />
        </return-response>
      </when>
    </choose>
    <set-backend-service base-url="https://<your-prefect-webhook-url>" />
  </inbound>
  <backend />
  <outbound />
  <on-error />
</policies>
How to test quickly - OPTIONS handshake:
Copy code
curl -i -X OPTIONS https://<your-proxy>/prefect-eg-proxy \
  -H "Webhook-Request-Origin: <http://eventgrid.azure.net|eventgrid.azure.net>"
Expect 200 and a response header
Webhook-Allowed-Origin: <http://eventgrid.azure.net|eventgrid.azure.net>
. - Then point your Event Grid subscription (CloudEvents schema) at the proxy URL. POST deliveries should flow to Prefect and your template
{{ body|from_cloud_event(headers) }}
will parse them. Notes on the Jinja filter - Keep the CloudEvents body and headers intact. The filter uses both to normalize into Prefect’s event shape. It works for single events and (if Event Grid batches) arrays of CloudEvents. - If/when you enable Prefect webhook auth, inject the Authorization header in the proxy before forwarding (environment variable shown above). If you want, I can sketch Bicep/Terraform for the Function + Event Grid subscription wiring. Also, if you’d like this supported natively by Prefect, I can open a feature request; here’s the form: Open a Prefect issue. References - Prefect 3.x webhooks - Prefect Cloud: create a webhook - Azure Event Grid webhook validation (handshake) - CloudEvents WebHook spec Want the proxy in Node/Express instead of Python, or do you prefer APIM/Logic Apps?