Josh Stone
10/01/2025, 3:08 PMMarvin
10/01/2025, 3:08 PMMarvin
10/01/2025, 3:13 PMevent payload, but the subscriber tries to parse it as if it did. The result is a KeyError: 'event' inside the async iterator.
Why this occurs
- Prefect’s events websocket may include non-event messages (e.g., connection/checkpoint frames) — you can see related constants like EVENT_WEBSOCKET_CHECKPOINTS in the code.
- Older versions of the subscriber didn’t always filter these frames before trying to validate them as events, which would surface as the error you’re seeing.
A couple of things to check first
1) Which Prefect version are you on (and Cloud vs OSS Server)? This is important — newer 3.x point releases have improved the events client.
2) The code you’re using to subscribe (including your filter). A snippet will help me reproduce quickly.
Recommended next steps
- Upgrade to the latest Prefect 3.x if you’re not already. There have been a number of events/logging improvements recently. Release notes: Prefect releases and the latest summary from our tool: 3.4.20.
- Use the high-level subscriber with a filter and context manager, e.g.:
import asyncio
from prefect.events import get_events_subscriber
from prefect.events.filters import EventFilter, EventNameFilter
async def consume_events():
# Limit to known event names to reduce noise
f = EventFilter(event=EventNameFilter(any_=["prefect.flow-run.*"]))
async with get_events_subscriber(filter=f) as sub:
async for event in sub:
# event is a Prefect Event model
print(event.name, event.occurred)
if __name__ == "__main__":
asyncio.run(consume_events())
- If you still hit the error on your current version, a temporary workaround is to guard the iteration so control frames don’t crash your task group. With Python 3.12, you can ignore the specific KeyError ExceptionGroup and continue:
import asyncio
from prefect.events import get_events_subscriber
from prefect.events.filters import EventFilter, EventNameFilter
async def consume_events():
f = EventFilter(event=EventNameFilter(any_=["prefect.flow-run.*"]))
async with get_events_subscriber(filter=f) as sub:
while True:
try:
async for event in sub:
print(event.name)
except* KeyError:
# Likely a control/checkpoint frame slipped through — skip and resume
continue
if __name__ == "__main__":
asyncio.run(consume_events())
- If you’re using a lower-level websocket directly, make sure to ignore non-event frames before validation:
# Pseudocode if you're consuming raw messages yourself
async for msg in websocket:
if isinstance(msg, dict) and "event" in msg:
event = Event.model_validate(msg["event"])
handle(event)
else:
# control/keepalive/checkpoint message — ignore
continue
Useful references
- Events clients API: Events clients API (Prefect 3)
- Blog overview of event-driven patterns: Event-driven flows with Prefect
- Background on heartbeats/zombie detection: Detect zombie flows
If you can share:
- Your Prefect version
- Cloud vs OSS Server (and server version if OSS)
- The code you use to set up monitor_events and the filter
…I can try to reproduce and, if needed, file a bug upstream. Based on your traceback, it looks like you are already using `async for event in subscriber`; if you’re on an older 3.x point release, upgrading is the most likely permanent fix here.