Wolfgang Steitz
09/04/2025, 2:00 PM{
"id": "132-fa2a-77d6-8be8-abc",
"event": "some.webhook.one",
"occurred": "2025-09-04T13:09:48.202Z",
"payload": {
"webhook_payload": {
"foo": 1,
"bar": 1087
}
},
"received": "2025-09-04T13:09:48.219Z",
"related": [],
"resource": {
"prefect.resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"
}
}
This is the automation trigger:
{
"type": "event",
"match": {},
"match_related": {},
"after": [],
"expect": [
"some.webhook.one"
],
"for_each": [
"event"
],
"posture": "Reactive",
"threshold": 1,
"within": 240
}
Marvin
09/04/2025, 2:00 PMMarvin
09/04/2025, 2:01 PMMarvin
09/04/2025, 2:13 PMfor_each: ["event"]
. That means all events named some.webhook.one
share a single scope. If the webhook is retried or the same payload is posted more than once (very common with webhook providers), each delivery is treated as a fresh event and can cause another fire. Because there’s no unique key to group duplicates, Prefect cannot deduplicate them.
- In OSS, each POST to the webhook endpoint becomes a distinct Prefect event with a new top-level id
. So relying on the Prefect event id
won’t deduplicate duplicate deliveries — each retry will still be new.
How to make it fire once per webhook delivery (idempotent)
The most reliable approach is to scope the trigger by a stable, unique identifier from the webhook source — something that is the same across retries of the same delivery.
1) Add an external identifier to the posted payload (if you don’t have one already)
- Many webhook providers include a unique event/delivery id; if yours does, include it as-is in the JSON you POST to Prefect’s webhook.
- If your provider does not include one, add a deterministic key you control (e.g., a GUID your system generates, or a hash of fields that uniquely define an event in your domain).
Example incoming event (note the external_id):
{
"id": "132-fa2a-77d6-8be8-abc",
"event": "some.webhook.one",
"occurred": "2025-09-04T13:09:48.202Z",
"payload": {
"webhook_payload": {
"foo": 1,
"bar": 1087,
"external_id": "evt_2025-09-04_1087" // stable across retries
}
},
"received": "2025-09-04T13:09:48.219Z",
"related": [],
"resource": {
"prefect.resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"
}
}
2) Scope your automation to that identifier with for_each
- Use a dot path into the event. For the JSON above, this would be:
- for_each: ["payload.webhook_payload.external_id"]
- This ensures that any duplicate deliveries for the same external_id
are considered the same scope and will only fire once per id.
3) Tighten matching
- Keep expect: ["some.webhook.one"]
.
- Optionally add a match
to the webhook endpoint resource so you only respond to your specific hook:
- match: {"resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"}
Note: the event JSON shows prefect.resource.id
under resource
, but in trigger filters you reference it as resource.id
.
Suggested trigger (JSON)
{
"type": "event",
"expect": [
"some.webhook.one"
],
"match": {
"resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"
},
"match_related": {},
"after": [],
"for_each": [
"payload.webhook_payload.external_id"
],
"posture": "Reactive",
"threshold": 1,
"within": 600
}
Notes on the fields you’re using
- expect: list of event names you require to see (you’re using this correctly).
- match/match_related: additional filtering on the event’s fields or related resources; great to narrow to a specific webhook endpoint.
- for_each: the dedup “scope” keys. Use a stable, source-provided id if at all possible. You can use nested paths like payload.webhook_payload.external_id
. Avoid using "event"
here because it groups all events of that name together and won’t prevent duplicates per delivery.
- threshold + within: the number of matching events required within a rolling time window before firing. With threshold: 1
, the first matching event for a given for_each
scope will fire immediately; any subsequent events with the same for_each
value within the window will be coalesced. This is why a stable for_each
key is crucial for deduplication.
If you cannot add a stable id from the source
- Use a deterministic key from fields that uniquely identify the delivery (e.g., ["payload.webhook_payload.bar"]
if bar
is unique per event).Marvin
09/04/2025, 2:13 PMsome.webhook.one
to see if multiple events are being ingested per delivery, and inspect their payloads/ids.
- Inspect the automation definition and activity to confirm which values it’s using for scoping:
- prefect automations inspect "<your automation name>" --json
Docs for deeper reference
- Event triggers concepts: Event triggers
- Automations API (schemas): Automations schema API
- Creating and using webhooks: Create a webhook
If you can share:
- Whether your webhook provider retries deliveries
- A couple of event examples from the timeline (IDs and payloads)
- The full automation (inspect output)
…I can help pinpoint whether the duplicates are coming from multiple deliveries or from trigger scoping, and suggest the exact for_each
path that fits your payload.Marvin
09/04/2025, 2:17 PMfor_each: ["event"]
, you’re effectively saying “fire once per event name per occurrence.” That means every duplicate of some.webhook.one
can create another action.
Ways to make this robust and idempotent
1) Make the action idempotent (strongly recommended)
- In the “Run deployment” action, set an idempotency key that’s unique to the webhook occurrence, e.g. {{ event.id }}
(or a unique value from your payload).
- Result: even if the automation fires twice, Prefect will only create one flow run; subsequent attempts with the same key will return the same run.
- In the UI, this is the “Idempotency key” field on the action. If you configure via JSON, add the idempotency key on the run-deployment action using a template like:
{
"type": "run-deployment",
"deployment_id": "<your-deployment-id>",
"parameters": {...},
"idempotency_key": "{{ event.id }}"
}
- Docs on event triggers and action templating: Event triggers and automations
2) Tighten the trigger scope
- Add a Match that pins the resource to your specific webhook so only that webhook can satisfy the trigger.
- In the UI: Automation -> Trigger -> Match -> Resource -> select your webhook (ID shown in your event as webhook/9b8c23e0-...
).
- Change for_each to “resource” (instead of “event”) so the counter/state is partitioned by the webhook resource, not just the event name.
- In the UI: Automation -> Trigger -> Advanced -> For each -> Resource.
- Conceptually equivalent JSON:
{
"type": "event",
"expect": [
"some.webhook.one"
],
"match": {
"resource": {
"prefect.resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"
}
},
"for_each": [
"resource"
],
"posture": "Reactive",
"threshold": 1,
"within": 240
}
Notes:
- This doesn’t deduplicate duplicates by itself — it just narrows scope. Keep the idempotency key above for real protection.
3) Verify the sender isn’t retrying
- Check the Events UI to see if there are multiple distinct some.webhook.one
events for that time. If there are, the sender retried. Ensure your webhook responds with 2xx quickly; Prefect acknowledges the event before running actions, so slow actions won’t block the webhook response.
- If the sender always retries, the idempotency key will still protect you.
4) Optional guardrails
- If you want to suppress creating many concurrent runs from duplicates, add a deployment or work pool concurrency limit. This won’t dedup, but it prevents duplicate runs from executing in parallel while you harden the trigger/action.
Key takeaways
- “threshold: 1” + “within” does not deduplicate — it will fire for each delivery.
- Use an idempotency key on the Run deployment action, typically {{ event.id }}
.
- Narrow match by the specific webhook resource and use for_each: ["resource"]
to scope evaluation.
If you can share:
- The action configuration (are you using Run deployment?), and
- The Prefect version (assuming 3.x OSS),
I can suggest an exact config that you can paste in.Wolfgang Steitz
09/04/2025, 2:52 PM{
"name": "trigger run",
"description": "",
"enabled": true,
"trigger": {
"type": "event",
"match": {},
"match_related": {},
"after": [],
"expect": [
"some.webhook.om"
],
"for_each": [
"payload.webhook_payload.someID"
],
"posture": "Reactive",
"threshold": 1,
"within": 600.0
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "f49705f5-85ca-413e-be2b-6a88eef8b2c5",
"parameters": {},
"job_variables": {}
},
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "92b4581d-4fd6-4f04-9f82-e5852af5dbc1",
"parameters": {},
"job_variables": {}
}
],
"actions_on_trigger": [],
"actions_on_resolve": [],
"owner_resource": null,
"id": "c57bc606-fe79-48d9-9fbf-2d2fd82c0e86"
}
]
Marvin
09/04/2025, 2:52 PMMarvin
09/04/2025, 2:53 PMexpect
is <http://some.webhook.om|some.webhook.om>
but the event you showed earlier was some.webhook.one
. If that’s a typo, the automation won’t match at all. Please confirm which one is correct.
2) Your for_each
choice is good (with a caveat)
- for_each: ["payload.webhook_payload.someID"]
is a solid way to deduplicate retries as long as someID
is:
- Present on every event, and
- Stable (same across retries of the same delivery)
- If someID
can be missing, those events will all share a “null” scope and could still fire multiple times. In that case, consider falling back to event.id
for idempotency (see next point).
3) Add idempotency keys to each action (prevents duplicate flow runs even if the trigger fires again)
- This is the most reliable guard. Use the same unique identifier you used in for_each
. Because you have two different deployments, give each action a distinct key to avoid collisions between them.
Example:
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "f49705f5-85ca-413e-be2b-6a88eef8b2c5",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depA"
},
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "92b4581d-4fd6-4f04-9f82-e5852af5dbc1",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depB"
}
]
- If you do not have a stable someID
, use {{ event.id }}
instead and add a suffix per deployment (e.g., {{ event.id }}-depA
/ -depB
).
4) Narrow the trigger to your specific webhook (optional but recommended)
- This avoids accidental matches if you emit other events with the same name:
"match": {
"resource": {
"prefect.resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"
}
}
5) Threshold/within semantics
- With threshold: 1
and within: 600
, the first event for a given for_each
key fires immediately; additional events with the same someID
within 10 minutes coalesce. Outside that window, a repeat with the same someID
can fire again — which is why the action-level idempotency_key
is important if you truly want “once per ID ever.”
Verification tips
- Inspect the automation to confirm the effective config:
prefect automations inspect "trigger run" --json
- Check the Event Timeline to verify whether there are multiple some.webhook.*
events at the same time (indicating webhook retries).
Helpful docs
- Event triggers concepts: Event triggers
- Automations schema/API: Automations schema API
- Webhooks: Create a webhook
If you can confirm the correct event name and whether someID
is always present, I can provide an exact JSON you can paste in.Wolfgang Steitz
09/04/2025, 2:57 PMMarvin
09/04/2025, 2:57 PMMarvin
09/04/2025, 2:59 PMsome.webhook.one
- You want to scope by payload.webhook_payload.someID
- You want idempotency keys on each run-deployment action
Python (requests)
import os
import requests
API_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
headers = {
"Content-Type": "application/json",
# For Cloud only:
# "Authorization": f"Bearer {os.environ['PREFECT_API_KEY']}",
}
automation = {
"name": "trigger run",
"description": "",
"enabled": True,
"trigger": {
"type": "event",
"expect": ["some.webhook.one"], # confirm this is the exact event name
"match": {},
"match_related": {},
"after": [],
"for_each": ["payload.webhook_payload.someID"],
"posture": "Reactive",
"threshold": 1,
"within": 600
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "f49705f5-85ca-413e-be2b-6a88eef8b2c5",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depA"
},
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "92b4581d-4fd6-4f04-9f82-e5852af5dbc1",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depB"
}
],
"actions_on_trigger": [],
"actions_on_resolve": []
}
resp = <http://requests.post|requests.post>(f"{API_URL}/automations/", json=automation, headers=headers, timeout=30)
resp.raise_for_status()
print("Created automation:", resp.json())
cURL
curl -X POST "$PREFECT_API_URL/automations/" \
-H "Content-Type: application/json" \
-d '{
"name": "trigger run",
"description": "",
"enabled": true,
"trigger": {
"type": "event",
"expect": ["some.webhook.one"],
"match": {},
"match_related": {},
"after": [],
"for_each": ["payload.webhook_payload.someID"],
"posture": "Reactive",
"threshold": 1,
"within": 600
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "f49705f5-85ca-413e-be2b-6a88eef8b2c5",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depA"
},
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "92b4581d-4fd6-4f04-9f82-e5852af5dbc1",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depB"
}
],
"actions_on_trigger": [],
"actions_on_resolve": []
}'
Verify it was created
prefect automations ls
prefect automations inspect "trigger run" --json
Optional: restrict to a specific webhook endpoint
- If you want to ensure only your webhook endpoint triggers this automation, add a match on the webhook resource ID you saw in your event:
"match": {
"resource": {
"prefect.resource.id": "webhook/9b8c23e0-cecc-4ccd-92a5-7b0bf5f2b7a6"
}
}
Notes
- The CLI does not have a “create” command for automations (you can inspect, pause/resume, delete).
- If someID
might be missing, switch the idempotency keys to {{ event.id }}
instead.
- Docs for reference: Event triggers and Automations schema API.
If you’d like, share your API URL and I’ll tailor the snippet exactly (OSS default is http://127.0.0.1:4200/api).Wolfgang Steitz
09/04/2025, 3:15 PMMarvin
09/04/2025, 3:15 PMMarvin
09/04/2025, 3:17 PMPREFECT_API_AUTH_STRING
to requests as HTTP Basic Auth. Two easy options:
Option A — let requests build the header
import os
import requests
from requests.auth import HTTPBasicAuth
API_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
AUTH_STRING = os.getenv("PREFECT_API_AUTH_STRING") # format: "username:password"
auth = None
if AUTH_STRING:
if ":" not in AUTH_STRING:
raise ValueError("PREFECT_API_AUTH_STRING must be 'username:password'")
user, password = AUTH_STRING.split(":", 1)
auth = HTTPBasicAuth(user, password)
headers = {"Content-Type": "application/json"}
automation = {
"name": "trigger run",
"enabled": True,
"trigger": {
"type": "event",
"expect": ["some.webhook.one"],
"match": {},
"match_related": {},
"after": [],
"for_each": ["payload.webhook_payload.someID"],
"posture": "Reactive",
"threshold": 1,
"within": 600
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "f49705f5-85ca-413e-be2b-6a88eef8b2c5",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depA"
},
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "92b4581d-4fd6-4f04-9f82-e5852af5dbc1",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depB"
}
]
}
resp = <http://requests.post|requests.post>(f"{API_URL}/automations/", json=automation, headers=headers, auth=auth, timeout=30)
resp.raise_for_status()
print(resp.json())
Option B — build the Basic header yourself
import os
import base64
import requests
API_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
AUTH_STRING = os.getenv("PREFECT_API_AUTH_STRING") # "username:password"
headers = {"Content-Type": "application/json"}
if AUTH_STRING:
token = base64.b64encode(AUTH_STRING.encode()).decode()
headers["Authorization"] = f"Basic {token}"
# ... same automation dict as above ...
resp = <http://requests.post|requests.post>(f"{API_URL}/automations/", json=automation, headers=headers, timeout=30)
resp.raise_for_status()
print(resp.json())
Notes
- PREFECT_API_AUTH_STRING
format is username:password
(Basic Auth).
- If you also set a Prefect Cloud API key, use Bearer auth instead (not needed for OSS).
- If your password contains a colon, split on the first colon only (as shown).
- Docs: Security settingsWolfgang Steitz
09/04/2025, 5:31 PMMarvin
09/04/2025, 5:31 PMMarvin
09/04/2025, 5:32 PMmatch: { "payload.webhook_payload.source": "my-system" }
.
- Keep expect: ["some.webhook.one"]
and your for_each: ["payload.webhook_payload.someID"]
.
Given your situation, I’d recommend:
- Keep match
empty or use a payload marker.
- Keep for_each: ["payload.webhook_payload.someID"]
.
- Add idempotency keys to each Run deployment action:
- idempotency_key: "{{ payload.webhook_payload.someID }}-depA"
and ...-depB
- If someID
can be absent or non-unique, switch those to {{ event.id }}
.
If you do want a stable resource filter later, make sure you:
- Create one webhook in the UI and reuse that single URL long-term.
- Avoid recreating the webhook or resetting the server DB; otherwise the resource id will change.Wolfgang Steitz
09/04/2025, 5:34 PMMarvin
09/04/2025, 5:34 PMMarvin
09/04/2025, 5:35 PMintegration/my-webhook
) instead of relying on an auto-generated value. Then you can safely add a match
on that resource if you want. Keep your for_each
on a stable payload key and add idempotency keys on actions as we discussed.
Example: minimal “forwarder” that turns an external webhook into a Prefect event
- This runs anywhere that can reach your Prefect OSS API (PREFECT_API_URL)
- It uses emit_event
to publish into your OSS server
- Set a stable resource id so you can optionally match on it later
# pip install fastapi uvicorn prefect
import os
from fastapi import FastAPI, Request
from prefect.events.utilities import emit_event
# Configure this process to talk to your OSS server
# export PREFECT_API_URL="<http://127.0.0.1:4200/api>"
# Optional (if you enabled auth on the API):
# export PREFECT_API_AUTH_STRING="username:password"
app = FastAPI()
STABLE_RESOURCE_ID = "integration/my-webhook" # choose a constant identifier for this webhook source
@app.post("/my-webhook")
async def my_webhook(request: Request):
body = await request.json()
# Extract or compute a stable id for dedup; use whatever you were calling "someID"
some_id = (
body.get("someID")
or body.get("id") # or any field that is stable across retries
)
# Emit the Prefect event
emit_event(
event="some.webhook.one", # must match your automation's 'expect'
resource={"prefect.resource.id": STABLE_RESOURCE_ID},
payload={"webhook_payload": body}, # preserve original payload
)
return {"status": "ok", "forwarded": True, "someID": some_id}
Automation setup to pair with the above
- You no longer need to match on a changing resource; either:
- Leave match
empty, or
- Match the stable resource you chose:
- match: { "resource": { "prefect.resource.id": "integration/my-webhook" } }
- Keep your dedup scope and idempotency keys:
- for_each: ["payload.webhook_payload.someID"]
(or another stable field)
- Action `idempotency_key`: {{ payload.webhook_payload.someID }}-depA
and ...-depB
- If your source can’t provide a stable someID
, switch the idempotency keys to {{ event.id }}
.
Why this works
- In OSS, you control the event structure. By fixing resource.id
yourself, you avoid the “changing resource id” problem and can tighten matching if you want.
- Dedup is best handled via:
- for_each
on a stable field from the payload, and
- idempotency keys on each Run deployment action, so even if the trigger fires again, you won’t get duplicate flow runs.
Environment and security
- Ensure your forwarder process has:
- PREFECT_API_URL
pointing at OSS (e.g., `http://127.0.0.1:4200/api`)
- If you enabled API auth, set PREFECT_API_AUTH_STRING="username:password"
in the environment; Prefect’s client will use Basic Auth automatically.
Docs for reference
- Event triggers and automations concepts: Event triggers
- Automations schema/API: Automations schema API
- Security settings (for API auth): Security settings
If you share an example of the incoming third‑party webhook payload, I can suggest the exact for_each
path and the idempotency_key
template to use.Wolfgang Steitz
09/04/2025, 5:46 PMMarvin
09/04/2025, 5:46 PMMarvin
09/04/2025, 5:49 PMfrom datetime import datetime, timedelta, timezone
from prefect.events.utilities import emit_event
def handle_webhook(payload: dict):
run_at = (datetime.now(tz=timezone.utc) + timedelta(minutes=20)).isoformat()
emit_event(
event="some.webhook.one",
resource={"prefect.resource.id": "integration/my-webhook"},
payload={"webhook_payload": payload, "run_at": run_at},
)
Automation JSON (update your two actions)
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "f49705f5-85ca-413e-be2b-6a88eef8b2c5",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depA",
"scheduled_time": "{{ payload.run_at }}"
},
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "92b4581d-4fd6-4f04-9f82-e5852af5dbc1",
"parameters": {},
"job_variables": {},
"idempotency_key": "{{ payload.webhook_payload.someID }}-depB",
"scheduled_time": "{{ payload.run_at }}"
}
]
Notes
- scheduled_time must be an ISO 8601 timestamp (UTC recommended).
- With this approach, you avoid relying on template-time arithmetic and it will work consistently in Prefect OSS 3.4.14.
- Keep your for_each and idempotency keys as you have them to deduplicate retries.
Optional: if your instance supports datetime arithmetic in templates, you can compute it directly from the event time, e.g.:
"scheduled_time": "{{ event.received | add('20m') }}"
However, support for time filters like add may vary by version; the “compute run_at in payload” approach above is the safest.Wolfgang Steitz
09/09/2025, 2:21 PMMarvin
09/09/2025, 2:21 PMMarvin
09/09/2025, 2:27 PMidempotency_key
to a RunDeployment automation action won’t work today.
- Is there a specific Prefect version required? No—this isn’t a versioning issue. As of current Prefect 3.x, the RunDeployment automation action does not accept or pass through an idempotency_key
.
- Is your JSON correct? Likely yes, but the field isn’t supported on the RunDeployment action, so it’s ignored.
- Does this work with the open-source version? The underlying idempotency feature works in both Prefect Server (open source) and Prefect Cloud, but only when you create flow runs via the API or Python helper, not via the Automation RunDeployment action.
What does support idempotency today
- Python helper (works on Server and Cloud):
from prefect.deployments import run_deployment
run = run_deployment(
name="my-flow/my-deployment",
idempotency_key="my-dedupe-key",
parameters={"x": 1},
flow_run_name="deduped-run",
)
We verified the callable supports idempotency_key
.
- REST API for creating flow runs supports idempotency (Server and Cloud): see Create flow run API.
Current Automations limitation
- The RunDeployment action’s schema does not include idempotency_key
. That’s why putting it in your automation JSON has no effect. References: Automations API/model docs: Automations (API) and Automations models.
Workarounds if you need idempotency from an Automation
- Replace RunDeployment with a CallWebhook action that invokes the Prefect REST API endpoint to create a flow run with an idempotency_key
. This works for both Cloud (with a service account token) and Server (no auth by default). I can help you craft the exact URL/body for your deployment.
- Or trigger a small “router” flow (via RunDeployment or any trigger you like) that calls prefect.deployments.run_deployment(..., idempotency_key=...)
for the target deployment.
If you can share your end goal (what you’re deduplicating and how you want keys generated), I can suggest a concrete webhook payload or a minimal “router” flow example tailored to your setup.