<@ULVA73B9P> is there a way to use Automate in pre...
# ask-marvin
b
@Marvin is there a way to use Automate in prefect 3.0 to start a flow when a AWS SQS arrives?
m
thought for 364.5 seconds
Short answer: Yes — with Prefect 3 Automations you can trigger a flow run when an SQS message arrives. There isn’t a direct “SQS trigger” built into Prefect, but you can bridge SQS to Prefect using either a Prefect Webhook or a small Lambda that calls Prefect. Recommended patterns 1) Prefect Webhook + AWS (zero/low code in AWS) - Create a Prefect Webhook (in Prefect Cloud UI) and copy its URL and secret. - Create an Automation with an Event Trigger that listens for your webhook’s events and add a “Run deployment” action. You can filter on payload fields (e.g., queue name, message attributes) and pass them as flow parameters. - Wire SQS to the webhook: - Easiest: Use AWS EventBridge Pipes with SQS as source and an API Destination pointing to the Prefect webhook URL. Configure the Authorization header (Bearer <webhook-secret>) and forward the SQS message as the HTTP body. - Alternative: Use an AWS Lambda subscribed to your SQS queue; in the Lambda, POST each record to the Prefect webhook URL. Example Lambda (Python) posting to a Prefect Webhook
Copy code
import json
import os
import urllib.request

WEBHOOK_URL = os.environ["PREFECT_WEBHOOK_URL"]        # from Prefect UI
WEBHOOK_SECRET = os.environ["PREFECT_WEBHOOK_SECRET"]  # from Prefect UI

def handler(event, context):
    # event["Records"] is the list of SQS messages
    for record in event.get("Records", []):
        payload = {
            # You can send the raw SQS record as payload; your Automation can filter on it
            "payload": record,
            # Optionally add your own event name or resource identifiers
            "event": "aws.sqs.message.received",
            "resource": {
                "id": record.get("eventSourceARN", "aws.sqs.unknown")
            },
        }
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            WEBHOOK_URL,
            data=data,
            headers={
                "Content-Type": "application/json",
                "Authorization": f"Bearer {WEBHOOK_SECRET}",
            },
            method="POST",
        )
        with urllib.request.urlopen(req) as resp:
            # 2xx indicates the webhook was accepted
            _ = resp.read()

    return {"status": "ok", "records": len(event.get("Records", []))}
In the Automation’s “Run deployment” action, map flow parameters from the incoming event payload, e.g.: - message_id:
{{ event.payload.messageId }}
- body:
{{ event.payload.body }}
- queue_arn:
{{ event.payload.eventSourceARN }}
2) Lambda directly runs the deployment (bypasses Automations) - In the same Lambda that’s triggered by SQS, call the Prefect API or use the Prefect client library to start a deployment run. This skips Automations but is straightforward if you just want “one message → one run”. - Set environment variables
PREFECT_API_URL
and
PREFECT_API_KEY
in Lambda, then:
Copy code
import asyncio
import os
from prefect.client.orchestration import get_client

DEPLOYMENT_ID = os.environ["PREFECT_DEPLOYMENT_ID"]

def handler(event, context):
    async def run():
        async with get_client() as client:
            for record in event.get("Records", []):
                params = {
                    "message_id": record.get("messageId"),
                    "body": record.get("body"),
                }
                await client.create_flow_run_from_deployment(
                    deployment_id=DEPLOYMENT_ID,
                    parameters=params,
                )
    asyncio.run(run())
    return {"status": "started", "records": len(event.get("Records", []))}
Notes and considerations - There’s no native SQS trigger in Prefect 3; the webhook or Lambda bridge is the recommended approach. - Webhooks + Automations give you Prefect-native filtering, retries, notifications, and parameter mapping. - If your queue can burst, consider setting Prefect concurrency limits on the deployment to control parallelism.
- For Lambda, ensure the function can scale and that you handle batching or per-record invocation as needed. Docs and references - Prefect 3 Automations, events, and triggers: Event triggers in Prefect 3 - Event-driven pipelines overview: Event-driven vs scheduled data pipelines - Patterns for microservice-style orchestration: Scalable microservices orchestration with Prefect Questions to tailor the setup - Are you using Prefect Cloud or self-hosted Prefect Server? - Do you prefer a serverless bridge (EventBridge Pipes/Lambda) or are you okay with a small, long-running poller? - What parameters do you want to pass from the SQS message into your flow?
b
@Marvin the webhook can only be created with prefect cloud ?
m
thought for 51.3 seconds
Short answer: Yes — inbound Automations Webhooks are a Prefect Cloud feature. They aren’t available in self‑hosted Prefect Server today. If you’re on Server/self-hosted and want to trigger flows from SQS, use a small bridge: - API Gateway + Lambda (triggered by SQS) that calls the Prefect API to start a deployment - Or a lightweight service/poller that reads SQS and uses the Prefect client to create runs Example (Python) for a poller or Lambda calling Prefect directly:
Copy code
import asyncio
import os
from prefect.client.orchestration import get_client

DEPLOYMENT_ID = os.environ["PREFECT_DEPLOYMENT_ID"]
# Also set PREFECT_API_URL and PREFECT_API_KEY in the environment

def handler(event, context=None):
    async def run():
        async with get_client() as client:
            for record in event.get("Records", []):
                await client.create_flow_run_from_deployment(
                    deployment_id=DEPLOYMENT_ID,
                    parameters={
                        "message_id": record.get("messageId"),
                        "body": record.get("body"),
                    },
                )
    asyncio.run(run())
    return {"status": "started", "records": len(event.get("Records", []))}
References - Event-driven triggers overview: Event triggers in Prefect 3 - Example patterns with serverless: Orchestrating event-driven serverless data pipelines (Prefect + Pulumi + AWS) Are you using Prefect Cloud or self-hosted? If Cloud, I can outline the exact steps to create the webhook + automation; if Server, I can share a minimal AWS setup (EventBridge Pipes → Lambda → Prefect) you can drop in.