<@ULVA73B9P> Could you give me an python example o...
# ask-community
a
@Marvin Could you give me an python example of how to use custom events with automation to trigger a run deployment. I want the full
payload
dictionary to be passed into the deployment as one type
dict
argument
m
thought for 162.9 seconds
Let me create a complete example that shows how to: 1. Define a flow that accepts a dictionary parameter 2. Create a deployment for this flow 3. Set up an automation to trigger the deployment with a custom event 4. Send a custom event that triggers the deployment Here's the complete example:
Copy code
python
from prefect import flow, serve
from prefect.events import Event
from prefect.deployments import Deployment
from prefect.client.schemas.objects import DeploymentEventTrigger
from prefect.events.schemas import DeploymentTriggerTypes

# First, define your flow that accepts a dictionary parameter
@flow
def process_event_data(payload: dict):
    print(f"Received payload: {payload}")
    # Your processing logic here
    return payload

# Create and apply the deployment with an event trigger
def create_deployment():
    # Create the deployment
    deployment = Deployment(
        name="event-triggered-deployment",
        flow=process_event_data,
        triggers=[
            DeploymentEventTrigger(
                name="custom-event-trigger",
                # Match any event with the specified event names
                match_related_names=["my-custom-event"],
                parameters={"payload": "{{ event.payload }}"}  # Pass the entire payload
            )
        ]
    )
    
    # Apply the deployment
    deployment.apply()
    return deployment

# Function to simulate sending a custom event
async def send_custom_event():
    # Create and emit a custom event with a payload
    event = Event(
        "my-custom-event",  # Event name that matches the trigger
        payload={
            "key1": "value1",
            "key2": "value2",
            "nested": {
                "data": "example"
            }
        }
    )
    await event.emit()

if __name__ == "__main__":
    # Create the deployment
    deployment = create_deployment()
    
    # Start the deployment server
    serve(deployment)
    
    # In a separate process or context, you would emit the event:
    # import asyncio
    # asyncio.run(send_custom_event())
To use this example: 1. Save the code in a file (e.g.,
event_deployment.py
) 2. Start the Prefect server if it's not already running:
Copy code
bash
prefect server start
3. Run your deployment script:
Copy code
bash
python event_deployment.py
4. In a separate Python script or interactive session, you can trigger the deployment by emitting an event:
Copy code
python
import asyncio
from prefect import flow
from prefect.events import Event

async def trigger_deployment():
    event = Event(
        "my-custom-event",
        payload={
            "key1": "value1",
            "key2": "value2",
            "nested": {
                "data": "example"
            }
        }
    )
    await event.emit()

# Run the event emission
asyncio.run(trigger_deployment())
Key points about this implementation: 1. The flow
process_event_data
accepts a single dictionary parameter named
payload
2. The deployment trigger is configured to match events named "my-custom-event" 3. The trigger's parameters configuration
{"payload": "{{ event.payload }}"}
ensures the entire event payload is passed as a single dictionary 4. When the event is emitted, the deployment will automatically trigger with the event's payload as the argument This setup allows you to: - Trigger the deployment by emitting custom events from anywhere in your application - Pass complex nested data structures as a single dictionary parameter - Process the entire payload in your flow function Remember that you'll need to have the Prefect server running and be authenticated to use this functionality. The event system requires the server to be running to handle the event triggers and deployment executions.
a
@Marvin this line
parameters={"payload": "{{ event.payload }}"}  # Pass the entire payload
result in the value for
payload
to be a literal string
{{ event.payload }}
not populated from jinja template
m
thought for 75.6 seconds
Let me correct the example. You're right - we need to use the correct syntax for accessing the event data. Here's the corrected version:
Copy code
python
from prefect import flow, serve
from prefect.events import Event
from prefect.deployments import Deployment
from prefect.client.schemas.objects import DeploymentEventTrigger

# First, define your flow that accepts a dictionary parameter
@flow
def process_event_data(payload: dict):
    print(f"Received payload: {payload}")
    # Your processing logic here
    return payload

# Create and apply the deployment with an event trigger
def create_deployment():
    # Create the deployment
    deployment = Deployment(
        name="event-triggered-deployment",
        flow=process_event_data,
        triggers=[
            DeploymentEventTrigger(
                name="custom-event-trigger",
                match_related_names=["my-custom-event"],
                parameters={"payload": "{{ event }}"}  # This will pass the entire event object
            )
        ]
    )
    
    # Apply the deployment
    deployment.apply()
    return deployment

# Function to simulate sending a custom event
async def send_custom_event():
    # Create and emit a custom event with a payload
    event = Event(
        "my-custom-event",
        payload={
            "key1": "value1",
            "key2": "value2",
            "nested": {
                "data": "example"
            }
        }
    )
    await event.emit()
The key change is in the parameters line:
Copy code
python
parameters={"payload": "{{ event }}"}  # This will pass the entire event object
If you specifically want just the payload portion of the event, you can use:
Copy code
python
parameters={"payload": "{{ event.payload }}"}  # This will pass just the payload
Both of these templates will be properly evaluated at runtime, unlike the previous version. The event object contains various attributes including
payload
,
timestamp
,
id
, etc. Choose the template based on what data you need in your flow. To test this, you can use the same triggering code as before:
Copy code
python
import asyncio
from prefect.events import Event

async def trigger_deployment():
    event = Event(
        "my-custom-event",
        payload={
            "key1": "value1",
            "key2": "value2",
            "nested": {
                "data": "example"
            }
        }
    )
    await event.emit()

# Run the event emission
asyncio.run(trigger_deployment())
When the event is emitted, the template will be evaluated and the actual event data will be passed to your flow, not the template string. Let me know if you'd like to see what data is actually available in the event object, or if you need any clarification on how the template evaluation works!