Anyone who has worked with Prefect 3 Events can yo...
# ask-community
g
Anyone who has worked with Prefect 3 Events can you query or subscribe to event in a flow_run?
I have an upstream process that will be emitting events to prefect and creating flow that process files, I then get a trailing or caboose event in which I want to trigger another flow, but only once all the previous flow have finished in either completed or failed(max retry)
k
you absolutely can!
from prefect.events import get_events_subscriber
it takes an
EventFilter
as input and you can use it as a context manager. it's an infinite loop so you'll need to add your own logic to exit it once you've seen the events you're looking for
g
This is awesome and what I was really hoping we could do with Prefect 3
k
Copy code
seen_events = set()
filtered_events = set(event_filter.event)

async with get_events_subscriber(filter=event_filter) as subscriber:
    async for event in subscriber:
        logger.info(f"📬 Received event {event.event!r}")
        seen_events.add(event)
            if seen_events.issuperset(filtered_events)):
                return
something like that
g
Awesome I am gonna do some testing in my local env
@Kevin Grismore Does this only catch events from the time when the subscriber is started, or does is scan all events
ie, Is it more like Kafka than Pub/Sub
k
you can specify a start time (including in the past) at which the subscriber should look for events as part of the
EventFilter
it's a little nested in there but if you dig through the data models you'll find it
g
OK cool, if i don't add the filter does it scan all events? Or is there a default
k
Copy code
since: DateTime = Field(
        default_factory=lambda: cast(
            DateTime,
            pendulum.now("UTC").start_of("day").subtract(days=180),
        ),
        description="Only include events after this time (inclusive)",
    )
    until: DateTime = Field(
        default_factory=lambda: cast(DateTime, pendulum.now("UTC")),
        description="Only include events prior to this time (inclusive)",
    )
scans the last 180 days it seems
g
Thanks, Great info
k
of course!