Florian Giroud
07/08/2025, 5:40 PMMarvin
07/08/2025, 5:40 PMMarvin
07/08/2025, 5:41 PMpython
from prefect import flow
from typing import Dict
@flow
def process_webhook_data(webhook_body: Dict):
    print(f"Received webhook data: {webhook_body}")
    # Process your webhook data here
    return webhook_body
# Deploy the flow
if __name__ == "__main__":
    process_webhook_data.deploy(
        name="webhook-processor",
        work_pool="my-work-pool"
    )webhook-processorjson
     {
         "webhook_body": "{{ event.payload }}"
     }webhook_bodyjson
{
    "customer": "acme",
    "order_id": "12345",
    "items": ["widget", "gadget"]
}webhook_bodycurlNate
07/08/2025, 5:42 PMFlorian Giroud
07/18/2025, 6:33 PMMarvin
07/18/2025, 6:33 PMMarvin
07/18/2025, 6:34 PMpython
from prefect.events import Event
from prefect.automations import create_automation
# Define an automation that waits for a sequence of events
automation = create_automation(
    name="Batch Events Example",
    description="Triggers after accumulating multiple specific events",
    triggers=[
        {
            "type": "sequence",
            "triggers": [
                {"type": "event", "match": {"name": "first_event"}},
                {"type": "event", "match": {"name": "second_event"}},
                {"type": "event", "match": {"name": "third_event"}}
            ],
            "within": "1h"  # All events must occur within 1 hour
        }
    ],
    actions=[
        # Your actions here
    ]
)python
automation = create_automation(
    name="Composite Events Example",
    triggers=[
        {
            "type": "composite",
            "operator": "and",
            "triggers": [
                {"type": "event", "match": {"name": "event_a"}},
                {"type": "event", "match": {"name": "event_b"}},
            ]
        }
    ],
    actions=[
        # Your actions here
    ]
)sequencecompositewithinFlorian Giroud
07/18/2025, 6:35 PMMarvin
07/18/2025, 6:35 PMMarvin
07/18/2025, 6:36 PMpython
from prefect import flow, task
import redis
from datetime import timedelta
@task
def store_event(event_data):
    # Store the event in Redis
    r = redis.Redis()
    r.lpush("pending_events", event_data)
@task
def get_accumulated_events():
    # Get all events and clear the list
    r = redis.Redis()
    events = r.lrange("pending_events", 0, -1)
    r.delete("pending_events")
    return events
@flow(retries=3)
def process_events():
    events = get_accumulated_events()
    if not events:
        return
    
    # Process all events in bulk here
    for event in events:
        # Your processing logic here
        pass
# Deploy with a schedule
if __name__ == "__main__":
    process_events.serve(
        name="bulk-event-processor",
        interval=timedelta(minutes=30)  # Adjust based on your needs
    )python
from prefect import flow, task
from prefect.events import Event
from prefect.automations import create_automation
@flow
def process_bulk_events(events):
    # Process all events here
    for event in events:
        # Your processing logic here
        pass
# Create a state variable to track events
events_list = []
automation = create_automation(
    name="Bulk Event Processor",
    triggers=[
        {
            "type": "event",
            "match": {"name": "your_event_name"}
        }
    ],
    actions=[
        {
            "type": "python",
            "function": lambda event: events_list.append(event)
        },
        {
            "type": "condition",
            "condition": lambda: len(events_list) >= 100,  # Adjust threshold as needed
            "then": [
                {
                    "type": "run_deployment",
                    "deployment": "process-bulk-events/production",
                    "parameters": {"events": events_list.copy()},
                    "after": lambda: events_list.clear()
                }
            ]
        }
    ]
)python
from prefect import flow
from prefect.automations import create_automation
from datetime import timedelta
automation = create_automation(
    name="Time Window Event Processor",
    triggers=[
        {
            "type": "sequence",
            "triggers": [
                {"type": "event", "match": {"name": "your_event_name"}}
            ],
            "within": "1h",  # Collect all events within 1 hour
            "reset_on_completion": True
        }
    ],
    actions=[
        {
            "type": "run_deployment",
            "deployment": "process-bulk-events/production",
            "parameters": {"events": "${{ trigger.events }}"}
        }
    ]
)Marvin
07/18/2025, 6:36 PM