<@ULVA73B9P> i'm seeing this issue: File "/home/ap...
# ask-marvin
j
@Marvin i'm seeing this issue: File "/home/app/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 781, in aexit | raise BaseExceptionGroup( | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "/usr/local/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result | result = coro.send(None) | ^^^^^^^^^^^^^^^ | File "/tmp/tmptdqpt99xprefect/apps/workflows/clients/prefect_client.py", line 252, in monitor_events | async for event in subscriber: | File "/home/app/.venv/lib/python3.12/site-packages/prefect/events/clients.py", line 643, in anext | event: Event = Event.model_validate(message["event"]) | ~~~~~~~^^^^^^^^^ | KeyError: 'event' it seems like even though i've setup a subscription to events that i'm getting a keyrror from the event client. how do i deal with this?
m
thought for 284.6 seconds
Thanks for the traceback — that helps. The short version: this usually happens when the events websocket sends a control/keepalive frame that doesn’t contain an
event
payload, but the subscriber tries to parse it as if it did. The result is a
KeyError: 'event'
inside the async iterator. Why this occurs - Prefect’s events websocket may include non-event messages (e.g., connection/checkpoint frames) — you can see related constants like
EVENT_WEBSOCKET_CHECKPOINTS
in the code. - Older versions of the subscriber didn’t always filter these frames before trying to validate them as events, which would surface as the error you’re seeing. A couple of things to check first 1) Which Prefect version are you on (and Cloud vs OSS Server)? This is important — newer 3.x point releases have improved the events client. 2) The code you’re using to subscribe (including your filter). A snippet will help me reproduce quickly. Recommended next steps - Upgrade to the latest Prefect 3.x if you’re not already. There have been a number of events/logging improvements recently. Release notes: Prefect releases and the latest summary from our tool: 3.4.20. - Use the high-level subscriber with a filter and context manager, e.g.:
Copy code
import asyncio
from prefect.events import get_events_subscriber
from prefect.events.filters import EventFilter, EventNameFilter

async def consume_events():
    # Limit to known event names to reduce noise
    f = EventFilter(event=EventNameFilter(any_=["prefect.flow-run.*"]))
    async with get_events_subscriber(filter=f) as sub:
        async for event in sub:
            # event is a Prefect Event model
            print(event.name, event.occurred)

if __name__ == "__main__":
    asyncio.run(consume_events())
- If you still hit the error on your current version, a temporary workaround is to guard the iteration so control frames don’t crash your task group. With Python 3.12, you can ignore the specific KeyError ExceptionGroup and continue:
Copy code
import asyncio
from prefect.events import get_events_subscriber
from prefect.events.filters import EventFilter, EventNameFilter

async def consume_events():
    f = EventFilter(event=EventNameFilter(any_=["prefect.flow-run.*"]))
    async with get_events_subscriber(filter=f) as sub:
        while True:
            try:
                async for event in sub:
                    print(event.name)
            except* KeyError:
                # Likely a control/checkpoint frame slipped through — skip and resume
                continue

if __name__ == "__main__":
    asyncio.run(consume_events())
- If you’re using a lower-level websocket directly, make sure to ignore non-event frames before validation:
Copy code
# Pseudocode if you're consuming raw messages yourself
async for msg in websocket:
    if isinstance(msg, dict) and "event" in msg:
        event = Event.model_validate(msg["event"])
        handle(event)
    else:
        # control/keepalive/checkpoint message — ignore
        continue
Useful references - Events clients API: Events clients API (Prefect 3) - Blog overview of event-driven patterns: Event-driven flows with Prefect - Background on heartbeats/zombie detection: Detect zombie flows If you can share: - Your Prefect version - Cloud vs OSS Server (and server version if OSS) - The code you use to set up
monitor_events
and the filter …I can try to reproduce and, if needed, file a bug upstream. Based on your traceback, it looks like you are already using `async for event in subscriber`; if you’re on an older 3.x point release, upgrading is the most likely permanent fix here.