<@ULVA73B9P> can we do event driven flows?
# ask-marvin
b
@Marvin can we do event driven flows?
m
Yes, Prefect allows for event-driven flows using automations and triggers. Here are some useful references: 1. Automations and Event Triggers: You can create compound triggers to run a deployment only after specific events occur. For example, the following trigger waits for three different flows to write their results to a remote filesystem. Once these events have all been detected within a specified timeframe (e.g., one hour), the trigger fires and executes the desired deployment.
Copy code
json
    {
      "type": "compound",
      "require": "all",
      "within": 3600,
      "triggers": [
        {
          "type": "event",
          "posture": "Reactive",
          "expect": ["prefect.block.remote-file-system.write_path.called"],
          "match_related": {
            "prefect.resource.name": "daily-customer-export",
            "prefect.resource.role": "flow"
          }
        },
        {
          "type": "event",
          "posture": "Reactive",
          "expect": ["prefect.block.remote-file-system.write_path.called"],
          "match_related": {
            "prefect.resource.name": "daily-revenue-export",
            "prefect.resource.role": "flow"
          }
        },
        {
          "type": "event",
          "posture": "Reactive",
          "expect": ["prefect.block.remote-file-system.write_path.called"],
          "match_related": {
            "prefect.resource.name": "daily-expenses-export",
            "prefect.resource.role": "flow"
          }
        }
      ]
    }
Prefect Documentation on Automations 2. GitHub Issue with Additional Context: There's an ongoing discussion on making event-driven deployment triggers more convenient, potentially via CLI commands or flow decorators. GitHub Issue: Event-driven Deployment It's always a bit of a dull existence, but at least this allows some flexibility if you ever find yourself needing to automate based on events.
b
@Marvin can this work with AWS SNS or RabbitMQ message?
m
Currently, there are no direct references in the Prefect documentation or GitHub issues specifically addressing AWS SNS or RabbitMQ as triggers for event-driven flows. However, you can potentially achieve this by using Prefect's event-driven capabilities together with custom integrations. Here are the general steps: 1. Use SNS or RabbitMQ to Trigger Execution: Set up an AWS SNS Topic or RabbitMQ Queue that publishes events or messages when certain conditions are met. 2. Custom Listener Service: Create a custom service or Lambda function that listens for these SNS or RabbitMQ events. When an event is received, this service can trigger a Prefect flow run. 3. Prefect API: Use the Prefect API to programmatically trigger flows based on these custom integrations. Here is a simplistic example of how you might set this up using AWS Lambda and Prefect: AWS Lambda Example (Python):
Copy code
python
import json
import requests

def lambda_handler(event, context):
    # Extract event details
    message = json.loads(event['Records'][0]['Sns']['Message'])
    
    # Prefect API url to trigger flows (replace <endpoint>)
    prefect_api_url = '<https://api.prefect.io/api/flow_runs/>'
    
    # Payload to trigger a flow
    payload = {
        "flow_id": "YOUR_FLOW_ID",
        "parameters": message.get('parameters', {})
    }
    
    # Trigger the flow run
    response = <http://requests.post|requests.post>(prefect_api_url, json=payload)
    response.raise_for_status()
    
    return {
        'statusCode': 200,
        'body': json.dumps('Flow triggered successfully!')
    }
This approach allows you to use the flexibility of SNS or RabbitMQ events combined with the automation capabilities of Prefect. For detailed setups and more complex integrations, you might have to dive into both the AWS and Prefect documentation to tailor the solution to your specific needs. Here is some further reading that might be useful: - Prefect Automations Documentation - AWS Lambda Documentation - RabbitMQ Documentation This solution, though a bit tedious, would lend itself to various kinds of event-driven architectures. Marvelous, isn't it?
n
hi @Bruno Nirello yeah you can have AWS hit our webhook, which puts an event in the prefect ecosystem that you can trigger deployments off of in the simplest case, you can do
Copy code
@flow
def my_flow(foo: str):
  pass

my_flow.serve(
  triggers=[DeploymentTrigger(expect={"aws.pinged"}, # event name you'd choose for webhook
  parameters={"foo": "{{ event.resource.get('some-label') }}"}
)
@Marvin do thorough search and then concisely describe the pieces of a system like • file added to s3 bucket • aws hits prefect webhook -> emits
aws.pinged
event • deployment triggered fired, templates event into
parameteters
• flow runs
m
To integrate AWS S3 and Prefect using webhooks, follow these pieces: 1. File Added to S3 Bucket: - Setup: Configure an S3 bucket to trigger an SNS topic or Lambda function whenever a file is added. - Event: The S3 event notification sends a message to SNS or directly triggers the Lambda function. 2. AWS Hits Prefect Webhook: - Webhook: The SNS topic or Lambda function sends an HTTP POST request to the Prefect webhook. - Event Emission: The Prefect webhook emits an
aws.pinged
event. 3. Deployment Trigger: - Automation Configuration: Create an automation in Prefect that listens for the
aws.pinged
event. Automations Guide - Parameters: Use templating to pass event details into the deployment's parameters. 4. Flow Execution: - Deployment: The deployment associated with the trigger starts a flow run using the provided parameters. - Flow Run: The flow is executed to process the newly added file in the S3 bucket. Here is an example of handling the webhook in Prefect: Webhook Configuration Example (Curl):
Copy code
bash
curl -X POST <https://api.prefect.cloud/hooks/34iV2SFke3mVa6y5Y-YUoA> -d "model_id=adhoc" -d "run_count=10" -d "friendly_name=test-user-input"
This allows you to dynamically set webhook values and trigger Prefect events, leading to automated workflow runs based on S3 events. For more detailed configurations, refer to: - Prefect Webhooks Guide - Example Use Case with a Flow By building this pipeline, you can ensure that your data processing begins as soon as new files are added to your S3 bucket. Ah, the monotony of endless automation.