Gregory Hunt
07/15/2024, 5:44 PMGregory Hunt
07/15/2024, 5:47 PMKevin Grismore
07/15/2024, 6:07 PMKevin Grismore
07/15/2024, 6:09 PMfrom prefect.events import get_events_subscriber
it takes an EventFilter
as input and you can use it as a context manager. it's an infinite loop so you'll need to add your own logic to exit it once you've seen the events you're looking forGregory Hunt
07/15/2024, 6:10 PMKevin Grismore
07/15/2024, 6:12 PMseen_events = set()
filtered_events = set(event_filter.event)
async with get_events_subscriber(filter=event_filter) as subscriber:
async for event in subscriber:
logger.info(f"📬 Received event {event.event!r}")
seen_events.add(event)
if seen_events.issuperset(filtered_events)):
return
Kevin Grismore
07/15/2024, 6:12 PMGregory Hunt
07/15/2024, 6:13 PMGregory Hunt
07/15/2024, 6:16 PMGregory Hunt
07/15/2024, 6:17 PMKevin Grismore
07/15/2024, 6:22 PMEventFilter
Kevin Grismore
07/15/2024, 6:22 PMGregory Hunt
07/15/2024, 6:23 PMKevin Grismore
07/15/2024, 6:25 PMsince: DateTime = Field(
default_factory=lambda: cast(
DateTime,
pendulum.now("UTC").start_of("day").subtract(days=180),
),
description="Only include events after this time (inclusive)",
)
until: DateTime = Field(
default_factory=lambda: cast(DateTime, pendulum.now("UTC")),
description="Only include events prior to this time (inclusive)",
)
Kevin Grismore
07/15/2024, 6:25 PMGregory Hunt
07/15/2024, 6:26 PMKevin Grismore
07/15/2024, 6:26 PM