Arthur Ren
07/14/2025, 10:57 PMpayload
dictionary to be passed into the deployment as one type dict
argumentMarvin
07/14/2025, 10:57 PMMarvin
07/14/2025, 11:00 PMpython
from prefect import flow, serve
from prefect.events import Event
from prefect.deployments import Deployment
from prefect.client.schemas.objects import DeploymentEventTrigger
from prefect.events.schemas import DeploymentTriggerTypes
# First, define your flow that accepts a dictionary parameter
@flow
def process_event_data(payload: dict):
print(f"Received payload: {payload}")
# Your processing logic here
return payload
# Create and apply the deployment with an event trigger
def create_deployment():
# Create the deployment
deployment = Deployment(
name="event-triggered-deployment",
flow=process_event_data,
triggers=[
DeploymentEventTrigger(
name="custom-event-trigger",
# Match any event with the specified event names
match_related_names=["my-custom-event"],
parameters={"payload": "{{ event.payload }}"} # Pass the entire payload
)
]
)
# Apply the deployment
deployment.apply()
return deployment
# Function to simulate sending a custom event
async def send_custom_event():
# Create and emit a custom event with a payload
event = Event(
"my-custom-event", # Event name that matches the trigger
payload={
"key1": "value1",
"key2": "value2",
"nested": {
"data": "example"
}
}
)
await event.emit()
if __name__ == "__main__":
# Create the deployment
deployment = create_deployment()
# Start the deployment server
serve(deployment)
# In a separate process or context, you would emit the event:
# import asyncio
# asyncio.run(send_custom_event())
To use this example:
1. Save the code in a file (e.g., event_deployment.py
)
2. Start the Prefect server if it's not already running:
bash
prefect server start
3. Run your deployment script:
bash
python event_deployment.py
4. In a separate Python script or interactive session, you can trigger the deployment by emitting an event:
python
import asyncio
from prefect import flow
from prefect.events import Event
async def trigger_deployment():
event = Event(
"my-custom-event",
payload={
"key1": "value1",
"key2": "value2",
"nested": {
"data": "example"
}
}
)
await event.emit()
# Run the event emission
asyncio.run(trigger_deployment())
Key points about this implementation:
1. The flow process_event_data
accepts a single dictionary parameter named payload
2. The deployment trigger is configured to match events named "my-custom-event"
3. The trigger's parameters configuration {"payload": "{{ event.payload }}"}
ensures the entire event payload is passed as a single dictionary
4. When the event is emitted, the deployment will automatically trigger with the event's payload as the argument
This setup allows you to:
- Trigger the deployment by emitting custom events from anywhere in your application
- Pass complex nested data structures as a single dictionary parameter
- Process the entire payload in your flow function
Remember that you'll need to have the Prefect server running and be authenticated to use this functionality. The event system requires the server to be running to handle the event triggers and deployment executions.Arthur Ren
07/14/2025, 11:05 PMparameters={"payload": "{{ event.payload }}"} # Pass the entire payload
result in the value for payload
to be a literal string {{ event.payload }}
not populated from jinja templateMarvin
07/14/2025, 11:05 PMMarvin
07/14/2025, 11:06 PMpython
from prefect import flow, serve
from prefect.events import Event
from prefect.deployments import Deployment
from prefect.client.schemas.objects import DeploymentEventTrigger
# First, define your flow that accepts a dictionary parameter
@flow
def process_event_data(payload: dict):
print(f"Received payload: {payload}")
# Your processing logic here
return payload
# Create and apply the deployment with an event trigger
def create_deployment():
# Create the deployment
deployment = Deployment(
name="event-triggered-deployment",
flow=process_event_data,
triggers=[
DeploymentEventTrigger(
name="custom-event-trigger",
match_related_names=["my-custom-event"],
parameters={"payload": "{{ event }}"} # This will pass the entire event object
)
]
)
# Apply the deployment
deployment.apply()
return deployment
# Function to simulate sending a custom event
async def send_custom_event():
# Create and emit a custom event with a payload
event = Event(
"my-custom-event",
payload={
"key1": "value1",
"key2": "value2",
"nested": {
"data": "example"
}
}
)
await event.emit()
The key change is in the parameters line:
python
parameters={"payload": "{{ event }}"} # This will pass the entire event object
If you specifically want just the payload portion of the event, you can use:
python
parameters={"payload": "{{ event.payload }}"} # This will pass just the payload
Both of these templates will be properly evaluated at runtime, unlike the previous version. The event object contains various attributes including payload
, timestamp
, id
, etc. Choose the template based on what data you need in your flow.
To test this, you can use the same triggering code as before:
python
import asyncio
from prefect.events import Event
async def trigger_deployment():
event = Event(
"my-custom-event",
payload={
"key1": "value1",
"key2": "value2",
"nested": {
"data": "example"
}
}
)
await event.emit()
# Run the event emission
asyncio.run(trigger_deployment())
When the event is emitted, the template will be evaluated and the actual event data will be passed to your flow, not the template string.
Let me know if you'd like to see what data is actually available in the event object, or if you need any clarification on how the template evaluation works!