Fredrik Hoem Grelland
09/18/2025, 8:34 AMMarvin
09/18/2025, 8:34 AMMarvin
09/18/2025, 8:43 AM{{ event.id }}
) and fetch the full payload from the events API inside your flow.
Here’s the pattern that works well in Prefect 3.x:
1) In your Automation’s Run Deployment action, add a small parameter
- Add a flow parameter like trigger_event_id
with a Jinja template:
- {{ event.id }}
2) In your flow, look up the event by ID and read its payload
- Use the events client to fetch the event by its ID. Example:
from prefect import flow, get_run_logger
from prefect.events.clients import get_events_client
from prefect.events.filters import EventFilter, EventIDFilter
@flow
async def my_flow(trigger_event_id: str):
logger = get_run_logger()
async with get_events_client() as ec:
events = await ec.read_events(
EventFilter(id=EventIDFilter(any_=[trigger_event_id]))
)
if not events:
raise RuntimeError(f"No event found for id {trigger_event_id}")
evt = events[0] # the triggering event
payload = evt.payload # this is the full payload you avoided passing as params
<http://logger.info|logger.info>(f"Received payload keys: {list(payload.keys())}")
# ...use payload as needed...
Documentation links:
- Events client (Python): prefect.events.clients
- Read events (REST): /events/read
Notes and alternatives:
- This keeps your parameters tiny (just the event ID) and avoids the 512KB limit.
- If your payloads are extremely large or you need long-term retention, another robust pattern is to have the event source put the payload in durable storage (e.g., S3/GCS/Azure) and pass only a reference (key/URL) to the flow. Then the flow reads the object at runtime.
- There isn’t a built-in “inject the triggering event payload into the flow context” feature today; fetching by event ID is the cleanest approach from within a flow run.
Would you like a quick example for S3/GCS-based lookup too? Also, are you on Prefect Cloud or a self-hosted server? I can tailor the example accordingly.Fredrik Hoem Grelland
09/18/2025, 9:12 AMMarvin
09/18/2025, 9:13 AMMarvin
09/18/2025, 9:16 AMread_events
method; if you’re not seeing it, you may be on an older 3.x build.
Two ways forward:
- Preferred (latest Prefect 3.x): use get_events_client().read_events(...)
- Portable fallback: call the REST /events/read
endpoint directly
Preferred approach (async flow) — fetch by event ID you pass as a tiny param:
from prefect import flow, get_run_logger
from prefect.events.clients import get_events_client
from prefect.events.filters import EventFilter, EventIDFilter
@flow
async def my_flow(trigger_event_id: str):
logger = get_run_logger()
async with get_events_client() as ec:
events = await ec.read_events(
EventFilter(id=EventIDFilter(any_=[trigger_event_id]))
)
if not events:
raise RuntimeError(f"No event found for id {trigger_event_id}")
evt = events[0]
payload = evt.payload
<http://logger.info|logger.info>(f"Payload keys: {list(payload.keys())}")
# ... use payload ...
Docs:
- Events client: prefect.events.clients
- Read events (REST): /events/read
- Filters: EventFilter types are referenced from the events client docs
REST fallback (if your client doesn’t have read_events yet):
import os
import httpx
# Prefect Cloud/Server environment should already set PREFECT_API_URL and PREFECT_API_KEY
api_url = os.getenv("PREFECT_API_URL") # e.g., <https://api.prefect.cloud/api/accounts/<aid>/workspaces/<wid>>
api_key = os.environ["PREFECT_API_KEY"]
def fetch_event_by_id(event_id: str):
url = f"{api_url}/events/read"
body = {"filter": {"id": {"any_": [event_id]}}}
headers = {"Authorization": f"Bearer {api_key}"}
r = <http://httpx.post|httpx.post>(url, json=body, headers=headers, timeout=30)
r.raise_for_status()
events = r.json()
if not events:
raise RuntimeError(f"No event found for id {event_id}")
return events[0]
# inside your flow code
evt = fetch_event_by_id(trigger_event_id)
payload = evt["payload"]
If you’d like, tell me your Prefect version (prefect version
) and whether you’re on Cloud or self-hosted, and I’ll tailor the snippet. Also, to avoid the 512KB parameter limit, keep passing only {{ event.id }}
(or a storage reference) rather than the full payload.Fredrik Hoem Grelland
09/18/2025, 9:23 AMMarvin
09/18/2025, 9:23 AMMarvin
09/18/2025, 9:23 AMread_events
on the events client. That was my mistake.
What works today:
- Keep passing only the tiny {{ event.id }}
as a parameter
- Inside the flow, call the REST endpoint /events/read
to fetch the full event (and its payload) by ID
Example (sync flow or inside a task):
import os
import httpx
from prefect import flow, get_run_logger
@flow
def my_flow(trigger_event_id: str):
logger = get_run_logger()
api_url = os.getenv("PREFECT_API_URL") # e.g. <https://api.prefect.cloud/api/accounts/<aid>/workspaces/<wid>>
api_key = os.getenv("PREFECT_API_KEY") # required for Cloud
if not api_url:
raise RuntimeError("PREFECT_API_URL is not set")
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
body = {
"filter": {
"id": {"any_": [trigger_event_id]}
}
}
r = <http://httpx.post|httpx.post>(f"{api_url}/events/read", json=body, headers=headers, timeout=30)
r.raise_for_status()
events = r.json()
if not events:
raise RuntimeError(f"No event found for id {trigger_event_id}")
evt = events[0]
payload = evt.get("payload", {})
<http://logger.info|logger.info>(f"Payload keys: {list(payload.keys())}")
# ... use payload ...
Notes and links:
- REST read endpoint: /events/read
- The events client in 3.x is for emitting/subscribing, not querying historical events; there’s no Python convenience for reads right now.
- This pattern avoids the 512KB parameter limit by passing only {{ event.id }}
.
If you’d like, I can share an async-flow version or help wire the Jinja param in your Automation UI. Are you on Cloud or self-hosted?Fredrik Hoem Grelland
09/18/2025, 9:25 AMMarvin
09/18/2025, 9:25 AMMarvin
09/18/2025, 9:26 AMPREFECT_API_URL
and (for Cloud) PREFECT_API_KEY
from the environment.
import os
import httpx
def get_event_by_id(event_id: str, *, api_url: str | None = None, api_key: str | None = None, timeout: float = 30.0) -> dict:
"""
Fetch a Prefect event by its ID using the /events/read endpoint.
- For Prefect Cloud, ensure PREFECT_API_URL and PREFECT_API_KEY are set.
- For Prefect Server (self-hosted), PREFECT_API_URL is required; API key is typically not needed.
"""
if api_url is None:
api_url = os.getenv("PREFECT_API_URL")
if not api_url:
raise RuntimeError("PREFECT_API_URL must be set or passed to get_event_by_id(api_url=...).")
if api_key is None:
api_key = os.getenv("PREFECT_API_KEY")
url = f"{api_url.rstrip('/')}/events/read"
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
body = {
"filter": {
"id": {"any_": [event_id]}
}
}
with httpx.Client(timeout=timeout) as client:
resp = <http://client.post|client.post>(url, json=body, headers=headers)
try:
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
raise RuntimeError(f"Failed to read event {event_id}: {exc.response.text}") from exc
events = resp.json()
if not isinstance(events, list):
raise RuntimeError(f"Unexpected response from /events/read: {events!r}")
if not events:
raise LookupError(f"No event found for id {event_id}")
return events[0]
Example usage inside your flow after passing only {{ event.id }}
as a parameter:
from prefect import flow, get_run_logger
@flow
def my_flow(trigger_event_id: str):
logger = get_run_logger()
evt = get_event_by_id(trigger_event_id)
payload = evt.get("payload", {})
<http://logger.info|logger.info>(f"Payload keys: {list(payload.keys())}")
# ... use payload ...
Reference: /events/read
If you’d like an async version as well, I can share that too.Fredrik Hoem Grelland
09/18/2025, 9:44 AMMarvin
09/18/2025, 9:44 AMMarvin
09/18/2025, 9:47 AMPREFECT_API_URL
(looks like https://api.prefect.cloud/api/accounts/<aid>/workspaces/<wid>)
- An auth token for the workspace (exposed to the flow so the engine can call the API)
- On self-hosted Server, you’ll get PREFECT_API_URL
and typically don’t need an API key unless you’ve added auth.
How to call the REST API from your flow
- The helper I shared earlier will use PREFECT_API_URL
and PREFECT_API_KEY
from the environment if present.
- This works out of the box in most ECS setups; the worker injects these so your flow can contact the API.
Drop-in sync helper (supports optional override key)
import os
import httpx
def get_event_by_id(
event_id: str,
*,
api_url: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict:
"""
Fetch a Prefect event by its ID using the /events/read endpoint.
- Uses PREFECT_API_URL and PREFECT_API_KEY from the environment if not provided.
- For Prefect Cloud, ensure network egress to the Cloud API from your ECS task.
"""
api_url = api_url or os.getenv("PREFECT_API_URL")
if not api_url:
raise RuntimeError("PREFECT_API_URL must be set or passed to get_event_by_id(api_url=...).")
# Prefer an explicit api_key if you pass one; otherwise use the environment.
auth_key = api_key or os.getenv("PREFECT_API_KEY")
url = f"{api_url.rstrip('/')}/events/read"
headers = {"Content-Type": "application/json"}
if auth_key:
headers["Authorization"] = f"Bearer {auth_key}"
body = {
"filter": {
"id": {"any_": [event_id]}
}
}
with httpx.Client(timeout=timeout) as client:
resp = <http://client.post|client.post>(url, json=body, headers=headers)
try:
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
raise RuntimeError(f"Failed to read event {event_id}: {exc.response.text}") from exc
events = resp.json()
if not isinstance(events, list):
raise RuntimeError(f"Unexpected response from /events/read: {events!r}")
if not events:
raise LookupError(f"No event found for id {event_id}")
return events[0]
If you see 401/403 calling /events/read
- It likely means the token in your flow’s environment doesn’t have permission to read events.
- Create a service account in Prefect Cloud with “read events” permission, generate an API key, and inject it as an extra env var into your ECS work pool (e.g., EVENTS_READ_API_KEY
). Then call the helper like:
- get_event_by_id(event_id, api_key=os.environ["EVENTS_READ_API_KEY"])
- Do not overwrite Prefect’s own env vars; just add your own in the work pool’s job variables (env) so they’re merged into the task definition.
Helpful docs
- ECS worker guide: ECS work pool/worker
- Connect to Cloud / auth: Connect to Cloud
- Read events API: /events/read
If you want, share whether you’re on Cloud or self-hosted and I can suggest the minimal permission set and where to add the env in the ECS work pool UI.Fredrik Hoem Grelland
09/18/2025, 11:01 AMMarvin
09/18/2025, 11:02 AMMarvin
09/18/2025, 11:02 AM{
"filter": {
"id": {
"any_": [
"01995c4c-ef0b-74fc-8a66-970328e96a43"
]
}
}
}
Headers
Authorization: Bearer <your_api_key>
Content-Type: application/json
Example with httpx
import os
import httpx
api_url = f"<https://api.prefect.cloud/api/accounts/{os.environ['ACCOUNT_ID']}/workspaces/{os.environ['WORKSPACE_ID']}>"
api_key = os.environ["PREFECT_API_KEY"]
event_id = "01995c4c-ef0b-74fc-8a66-970328e96a43"
r = <http://httpx.post|httpx.post>(
f"{api_url}/events/read",
json={"filter": {"id": {"any_": [event_id]}}},
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
r.raise_for_status()
print(r.json())
Privileges
- If you get 403 (not 404), your key lacks permission. Use a Prefect Cloud user or service account key that has permission to read workspace events (Viewer or higher typically suffices).
- 404 is path or workspace/account mismatch. Double-check the account_id and workspace_id and that you didn’t include /api twice (e.g., /api/.../api/...).
Docs
- Read events endpoint: /events/read
- API keys: Connect to Cloud and API keys
If you still hit a 404 with the corrected URL, paste the exact URL (with IDs redacted) and we’ll sanity-check it.Fredrik Hoem Grelland
09/18/2025, 11:04 AMMarvin
09/18/2025, 11:04 AMMarvin
09/18/2025, 11:06 AMPREFECT_API_URL
. In Cloud it looks like:
- <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}>
- Endpoint path: append /events/read
- Full: <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/events/read>
- Method: POST
- Body: use id.any_
, not id.id
or id
- Auth: Authorization: Bearer <your_api_key>
Minimal request body
{
"filter": {
"id": {
"any_": [
"01995c4c-ef0b-74fc-8a66-970328e96a43"
]
}
}
}
Quick test with curl
curl -sS -X POST \
-H "Authorization: Bearer $PREFECT_API_KEY" \
-H "Content-Type: application/json" \
"$PREFECT_API_URL/events/read" \
-d '{"filter":{"id":{"any_":["01995c4c-ef0b-74fc-8a66-970328e96a43"]}}}'
Common 404 causes
- Wrong path: using /api/events/...
instead of /events/read
- Double “/api”: if your base already includes /api/accounts/.../workspaces/...
, don’t add another /api
- Wrong account/workspace IDs: double-check the IDs in the base URL match the workspace where the event occurred
Permissions
- A 404 here is path mismatch, not permissions. If you hit a permission issue, you’ll get 401/403 instead.
- For Prefect Cloud, a user or service account key with Workspace “Viewer” or higher can read events in that workspace.
From code (sync), using the base URL in your flow environment
import os
import httpx
def get_event_by_id(event_id: str) -> dict:
api_url = os.environ["PREFECT_API_URL"] # provided in flow runs on Cloud
api_key = os.getenv("PREFECT_API_KEY") # provided in most worker setups
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
url = f"{api_url.rstrip('/')}/events/read"
body = {"filter": {"id": {"any_": [event_id]}}}
with httpx.Client(timeout=30) as client:
r = <http://client.post|client.post>(url, json=body, headers=headers)
r.raise_for_status()
events = r.json()
if not isinstance(events, list):
raise RuntimeError(f"Unexpected response: {events!r}")
if not events:
raise LookupError(f"No event found for id {event_id}")
return events[0]
Reference in source (route exists)
- Server events router in the codebase (contains read_events
): prefect/server/api/events.py
If you still see a 404 after switching to /events/read
, paste the redacted URL you’re hitting (and confirm your PREFECT_API_URL
value) and I’ll sanity-check it.Fredrik Hoem Grelland
09/18/2025, 11:21 AMMarvin
09/18/2025, 11:21 AMMarvin
09/18/2025, 11:21 AMid.any_
in the filter body. Example:
{
"filter": {
"id": {
"any_": [
"01995c4c-ef0b-74fc-8a66-970328e96a43"
]
}
}
}
Drop‑in sync helper (handles both list or object responses)
import os
import httpx
def get_event_by_id(
event_id: str,
*,
api_url: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict:
"""
Fetch a Prefect event by its ID using the Cloud REST API:
POST {PREFECT_API_URL}/events/filter
- In Cloud, PREFECT_API_URL is workspace-scoped and available in flow runs.
- Use a user/service account API key with permission to read workspace events.
"""
api_url = api_url or os.getenv("PREFECT_API_URL")
if not api_url:
raise RuntimeError("PREFECT_API_URL must be set or passed to get_event_by_id(api_url=...).")
api_key = api_key or os.getenv("PREFECT_API_KEY")
url = f"{api_url.rstrip('/')}/events/filter"
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
body = {"filter": {"id": {"any_": [event_id]}}}
with httpx.Client(timeout=timeout) as client:
resp = <http://client.post|client.post>(url, json=body, headers=headers)
try:
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
raise RuntimeError(f"Failed to read event {event_id}: {exc.response.text}") from exc
data = resp.json()
# Some deployments return a list, others may wrap results in an object
if isinstance(data, list):
events = data
elif isinstance(data, dict):
# Try common keys if response is wrapped
events = data.get("events") or data.get("results") or []
else:
raise RuntimeError(f"Unexpected response from /events/filter: {data!r}")
if not events:
raise LookupError(f"No event found for id {event_id}")
return events[0]
Curl sanity check
curl -sS -X POST \
-H "Authorization: Bearer $PREFECT_API_KEY" \
-H "Content-Type: application/json" \
"$PREFECT_API_URL/events/filter" \
-d '{"filter":{"id":{"any_":["01995c4c-ef0b-74fc-8a66-970328e96a43"]}}}'
Notes
- 404s are usually from the wrong path (e.g., adding /api twice or hitting the wrong workspace). Using PREFECT_API_URL
in flow runs avoids this.
- Permissions: a user or service account key with workspace “Viewer” or higher can read events.
- For Server/self-hosted, the route implementation lives in the events router: server events router (paths may differ from Cloud’s public API).Fredrik Hoem Grelland
09/18/2025, 11:25 AMMarvin
09/18/2025, 11:25 AMMarvin
09/18/2025, 11:27 AM{
"filter": {
"id": {
"any_": ["01995c4c-ef0b-74fc-8a66-970328e96a43"]
},
"occurred": {
"after_": "2025-01-01T00:00:00Z" // pick a time before the event occurred
// Optionally add "before_": "2025-12-31T23:59:59Z"
}
}
}
Drop-in helper that widens the window automatically (last 7 days)
import os
import httpx
from datetime import datetime, timedelta, timezone
def get_event_by_id(event_id: str, lookback_hours: int = 24*7) -> dict:
api_url = os.environ["PREFECT_API_URL"]
api_key = os.getenv("PREFECT_API_KEY")
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
after_ts = (datetime.now(timezone.utc) - timedelta(hours=lookback_hours)).isoformat()
body = {
"filter": {
"id": {"any_": [event_id]},
"occurred": {"after_": after_ts}
}
}
url = f"{api_url.rstrip('/')}/events/filter"
with httpx.Client(timeout=30) as client:
r = <http://client.post|client.post>(url, json=body, headers=headers)
r.raise_for_status()
data = r.json()
events = data if isinstance(data, list) else data.get("events") or data.get("results") or []
if not events:
raise LookupError(f"No event found for id {event_id} within the last {lookback_hours} hours")
return events[0]
Docs for the endpoint
- Cloud API: POST /events/filter
Notes
- 404s are typically path/base-URL issues; 403s are permission issues; an empty 200 response usually means the event is outside the default time window.
- I don’t have your workspace’s exact event retention policy handy; if you still can’t find much older events even with a wide window, check your Cloud plan’s retention or ping us and we’ll take a look.