Wolfgang Steitz
07/11/2025, 7:34 AMMarvin
07/11/2025, 7:34 AMMarvin
07/11/2025, 7:36 AMpython
from prefect import flow, serve
from prefect.server.schemas.webhooks import WebhookTrigger
from prefect.events import Event
@flow
def my_webhook_flow(event_details: dict):
print(f"Processing webhook event: {event_details}")
# Your flow logic here
async def handle_webhook(event: Event):
# Extract a unique identifier from your webhook payload
# This could be a transaction ID, timestamp, or other unique field
event_id = event.payload.get("event_id") # adjust based on your webhook payload
# Create a deployment
deployment = my_webhook_flow.from_source(
source=".", # Assuming flow is in current directory
name="webhook-triggered-deployment",
).deploy()
# Create a unique run name using the event_id
run_name = f"webhook-trigger-{event_id}"
# Start the flow run with the unique name
flow_run = await deployment.run_deployment(
name=run_name,
parameters={"event_details": event.payload},
)
return flow_run
if __name__ == "__main__":
# Start the webhook server
serve(
my_webhook_flow,
event_handlers={
"my-webhook-event": handle_webhook
}
)
Key points about this implementation:
1. The flow is defined with @flow
decorator and accepts the webhook payload as a parameter.
2. The handle_webhook
function:
- Extracts a unique identifier from your webhook payload
- Creates a deployment
- Uses a unique run name based on the event ID
- Triggers the flow run with the webhook payload
3. The serve()
function sets up the webhook listener and maps events to your handler.
To use this:
1. Save this script (e.g., as webhook_flow.py
)
2. Run it with: python webhook_flow.py
3. Send webhook events to: <http://your-server:4200/webhook/my-webhook-event>
The unique run name ensures that even if the same webhook event is received multiple times, Prefect will recognize it as a duplicate and won't create multiple runs.
If you're still seeing multiple triggers, check:
1. That your webhook sender isn't retrying automatically
2. That you're using a truly unique identifier for each webhook event
3. That your webhook endpoint isn't being called multiple times by the source system
You can also add logging to debug the webhook handling:
python
from prefect import flow, serve
import logging
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
@flow
def my_webhook_flow(event_details: dict):
<http://logger.info|logger.info>(f"Starting flow run with event: {event_details}")
# Your flow logic here
async def handle_webhook(event: Event):
<http://logger.info|logger.info>(f"Received webhook event: {event.payload}")
# ... rest of the handler code ...
This implementation follows Prefect 3.x best practices and provides a robust way to handle webhook-triggered flows. The key is using the unique run name to ensure idempotency.Wolfgang Steitz
07/11/2025, 7:44 AMMarvin
07/11/2025, 7:45 AMMarvin
07/11/2025, 7:45 AMpython
from prefect import flow
from prefect.client import get_client
from prefect.events import Event
from prefect.automations import create_automation
async def trigger_multiple_flows(event: Event):
async with get_client() as client:
# Trigger first flow
await client.create_flow_run(
name=f"flow-run-1-{event.id}",
deployment_id="your-deployment-id-1",
# Optional: Pass event data as parameters
parameters={"event_data": event.payload}
)
# Trigger second flow
await client.create_flow_run(
name=f"flow-run-2-{event.id}",
deployment_id="your-deployment-id-2",
parameters={"event_data": event.payload}
)
# Create the automation
automation = create_automation(
name="multi-flow-trigger",
description="Triggers multiple flows when an event is received",
match={
"prefect.resource.id": "your-resource-id", # Match your webhook resource
"event": "your-event-name" # The event name you're listening for
},
actions=[trigger_multiple_flows]
)
To use this:
1. Replace "your-deployment-id-1"
and "your-deployment-id-2"
with the actual deployment IDs you want to trigger. You can find these in the Prefect UI or via the CLI using prefect deployment ls
.
2. Update the match
criteria to match your webhook:
- `prefect.resource.id`: The identifier for your webhook resource
- `event`: The specific event name you're listening for
3. Save this script (e.g., as flow_automation.py
) and create the automation:
bash
python flow_automation.py
To make the automation more robust, here's an enhanced version with error handling and logging:
python
from prefect import flow
from prefect.client import get_client
from prefect.events import Event
from prefect.automations import create_automation
from prefect.logging import get_logger
logger = get_logger("flow-automation")
async def trigger_multiple_flows(event: Event):
<http://logger.info|logger.info>(f"Received event: {event.id}")
async with get_client() as client:
deployments_to_trigger = [
"your-deployment-id-1",
"your-deployment-id-2",
# Add more deployment IDs as needed
]
for idx, deployment_id in enumerate(deployments_to_trigger, 1):
try:
flow_run = await client.create_flow_run(
name=f"flow-run-{idx}-{event.id}",
deployment_id=deployment_id,
parameters={"event_data": event.payload}
)
<http://logger.info|logger.info>(f"Successfully created flow run {flow_run.id} for deployment {deployment_id}")
except Exception as e:
logger.error(f"Failed to create flow run for deployment {deployment_id}: {str(e)}")
# Optionally, raise the exception if you want the automation to fail
# raise
automation = create_automation(
name="multi-flow-trigger",
description="Triggers multiple flows when an event is received",
match={
"prefect.resource.id": "your-resource-id",
"event": "your-event-name"
},
actions=[trigger_multiple_flows]
)
Some important notes:
1. Each flow run gets a unique name using the event ID, preventing duplicate runs.
2. The automation will only trigger once per event, solving your multiple-trigger issue.
3. You can add multiple actions to the automation if needed:
python
automation = create_automation(
name="multi-flow-trigger",
match={"event": "your-event-name"},
actions=[
trigger_multiple_flows,
another_action,
# ... more actions
]
)
4. To find your deployment IDs, you can use:Marvin
07/11/2025, 7:45 AMbash
prefect deployment ls
Or programmatically:
python
async with get_client() as client:
deployments = await client.read_deployments()
for deployment in deployments:
print(f"Name: {deployment.name}, ID: {deployment.id}")
5. To test your automation, you can manually trigger an event:
python
from prefect.events import Event
event = Event(
resource={"prefect.resource.id": "your-resource-id"},
event_name="your-event-name",
payload={"test": "data"}
)
await trigger_multiple_flows(event)
This implementation ensures each event triggers your flows exactly once and provides proper logging and error handling.Wolfgang Steitz
07/11/2025, 9:29 AMMarvin
07/11/2025, 9:29 AMMarvin
07/11/2025, 9:30 AMpython
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.events import Event
from prefect.automations import Automation, Trigger, Action
async def trigger_flows(event: Event):
async with get_client() as client:
# List of deployment IDs to trigger
deployments = [
"deployment-id-1",
"deployment-id-2",
# Add more deployment IDs as needed
]
for idx, deployment_id in enumerate(deployments, 1):
try:
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment_id,
name=f"event-triggered-run-{event.id}-{idx}",
parameters={"event_data": event.payload}
)
print(f"Created flow run {flow_run.id} for deployment {deployment_id}")
except Exception as e:
print(f"Error creating flow run for deployment {deployment_id}: {e}")
# Create the automation
automation = Automation(
name="multi-flow-trigger",
description="Triggers multiple flows when a specific event occurs",
triggers=[
Trigger(
match={
"event": "your-event-name", # Replace with your event name
"prefect.resource.id": "your-resource-id" # Replace with your resource ID
}
)
],
actions=[Action(fn=trigger_flows)]
)
# Save the automation
if __name__ == "__main__":
automation.save()
To use this:
1. Replace the placeholder values:
- "your-event-name"
with the event name you're listening for
- "your-resource-id"
with your webhook resource ID
- Add your actual deployment IDs to the deployments
list
2. Save the script (e.g., as flow_automation.py
)
3. Run it to create the automation:
bash
python flow_automation.py
Here's a more detailed version with better error handling and logging:
```python
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.events import Event
from prefect.automations import Automation, Trigger, Action
from prefect.logging import get_logger
from typing import Dict, Any
logger = get_logger("flow-automation")
async def trigger_flows(event: Event):
"""
Trigger multiple flow runs based on an event.
"""
logger.info(f"Processing event: {event.id}")
# Map of deployment IDs and their descriptions
deployments_to_trigger = {
"deployment-id-1": "First Flow",
"deployment-id-2": "Second Flow",
# Add more as needed
}
async with get_client() as client:
for deployment_id, description in deployments_to_trigger.items():
try:
# Create a unique name for this flow run
run_name = f"event-{event.id}-{description.lower().replace(' ', '-')}"
# Create the flow run
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment_id,
name=run_name,
parameters={
"event_data": event.payload,
"event_id": event.id,
"trigger_time": event.occurred
}
)
logger.info(
f"Successfully created flow run {flow_run.id} "
f"for {description} (deployment: {deployment_id})"
)
except Exception as e:
logger.error(
f"Failed to create flow run for {description} "
f"(deployment: {deployment_id}): {str(e)}"
)
# Depending on your needs, you might want to raise the exception
# raise
# Create the automation
automation = Automation(Marvin
07/11/2025, 9:30 AMTo verify your deployment IDs and names, you can use this helper script:
```python
from prefect.client.orchestration import get_client
import asyncio
async def list_deployments():
async with get_client() as client:
deployments = await client.read_deployments()
print("\nAvailable Deployments:")
print("-" * 50)
for deployment in deployments:
print(f"Name: {deployment.name}")
print(f"ID: {deployment.id}")
print(f"Flow Name: {deployment.flow_name}")
print("-" * 50)
if __name__ == "__main__":
asyncio.run(list_deployments())
Some important notes:
1. The automation will only trigger once per event, solving your multiple-trigger issue.
2. Each flow run gets a unique name using the event ID and flow description.
3. The automation passes the event payload, ID, and timestamp to each flow as parameters.
4. You can monitor your automations in the Prefect UI or using the CLI:
bash
prefect automation ls
5. To test your automation, you can manually emit an event using the Prefect CLI:
bash
prefect event send "your-event-name" --resource-id "your-resource-id" --payload '{"key": "value"}'
This implementation follows the current Prefect 3.x patterns and should reliably trigger your flows exactly once per event.