Bruno Nirello
08/28/2025, 1:16 PMMarvin
08/28/2025, 1:17 PMMarvin
08/28/2025, 1:23 PMimport json
import os
import urllib.request
WEBHOOK_URL = os.environ["PREFECT_WEBHOOK_URL"] # from Prefect UI
WEBHOOK_SECRET = os.environ["PREFECT_WEBHOOK_SECRET"] # from Prefect UI
def handler(event, context):
# event["Records"] is the list of SQS messages
for record in event.get("Records", []):
payload = {
# You can send the raw SQS record as payload; your Automation can filter on it
"payload": record,
# Optionally add your own event name or resource identifiers
"event": "aws.sqs.message.received",
"resource": {
"id": record.get("eventSourceARN", "aws.sqs.unknown")
},
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
WEBHOOK_URL,
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {WEBHOOK_SECRET}",
},
method="POST",
)
with urllib.request.urlopen(req) as resp:
# 2xx indicates the webhook was accepted
_ = resp.read()
return {"status": "ok", "records": len(event.get("Records", []))}
In the Automation’s “Run deployment” action, map flow parameters from the incoming event payload, e.g.:
- message_id: {{ event.payload.messageId }}
- body: {{ event.payload.body }}
- queue_arn: {{ event.payload.eventSourceARN }}
2) Lambda directly runs the deployment (bypasses Automations)
- In the same Lambda that’s triggered by SQS, call the Prefect API or use the Prefect client library to start a deployment run. This skips Automations but is straightforward if you just want “one message → one run”.
- Set environment variables PREFECT_API_URL
and PREFECT_API_KEY
in Lambda, then:
import asyncio
import os
from prefect.client.orchestration import get_client
DEPLOYMENT_ID = os.environ["PREFECT_DEPLOYMENT_ID"]
def handler(event, context):
async def run():
async with get_client() as client:
for record in event.get("Records", []):
params = {
"message_id": record.get("messageId"),
"body": record.get("body"),
}
await client.create_flow_run_from_deployment(
deployment_id=DEPLOYMENT_ID,
parameters=params,
)
asyncio.run(run())
return {"status": "started", "records": len(event.get("Records", []))}
Notes and considerations
- There’s no native SQS trigger in Prefect 3; the webhook or Lambda bridge is the recommended approach.
- Webhooks + Automations give you Prefect-native filtering, retries, notifications, and parameter mapping.
- If your queue can burst, consider setting Prefect concurrency limits on the deployment to control parallelism.Marvin
08/28/2025, 1:23 PMBruno Nirello
08/28/2025, 2:15 PMMarvin
08/28/2025, 2:15 PMMarvin
08/28/2025, 2:15 PMimport asyncio
import os
from prefect.client.orchestration import get_client
DEPLOYMENT_ID = os.environ["PREFECT_DEPLOYMENT_ID"]
# Also set PREFECT_API_URL and PREFECT_API_KEY in the environment
def handler(event, context=None):
async def run():
async with get_client() as client:
for record in event.get("Records", []):
await client.create_flow_run_from_deployment(
deployment_id=DEPLOYMENT_ID,
parameters={
"message_id": record.get("messageId"),
"body": record.get("body"),
},
)
asyncio.run(run())
return {"status": "started", "records": len(event.get("Records", []))}
References
- Event-driven triggers overview: Event triggers in Prefect 3
- Example patterns with serverless: Orchestrating event-driven serverless data pipelines (Prefect + Pulumi + AWS)
Are you using Prefect Cloud or self-hosted? If Cloud, I can outline the exact steps to create the webhook + automation; if Server, I can share a minimal AWS setup (EventBridge Pipes → Lambda → Prefect) you can drop in.