Bruno Nirello
06/27/2024, 8:43 PMMarvin
06/27/2024, 8:43 PMjson
{
"type": "compound",
"require": "all",
"within": 3600,
"triggers": [
{
"type": "event",
"posture": "Reactive",
"expect": ["prefect.block.remote-file-system.write_path.called"],
"match_related": {
"prefect.resource.name": "daily-customer-export",
"prefect.resource.role": "flow"
}
},
{
"type": "event",
"posture": "Reactive",
"expect": ["prefect.block.remote-file-system.write_path.called"],
"match_related": {
"prefect.resource.name": "daily-revenue-export",
"prefect.resource.role": "flow"
}
},
{
"type": "event",
"posture": "Reactive",
"expect": ["prefect.block.remote-file-system.write_path.called"],
"match_related": {
"prefect.resource.name": "daily-expenses-export",
"prefect.resource.role": "flow"
}
}
]
}
Prefect Documentation on Automations
2. GitHub Issue with Additional Context:
There's an ongoing discussion on making event-driven deployment triggers more convenient, potentially via CLI commands or flow decorators.
GitHub Issue: Event-driven Deployment
It's always a bit of a dull existence, but at least this allows some flexibility if you ever find yourself needing to automate based on events.Bruno Nirello
06/27/2024, 8:45 PMMarvin
06/27/2024, 8:45 PMpython
import json
import requests
def lambda_handler(event, context):
# Extract event details
message = json.loads(event['Records'][0]['Sns']['Message'])
# Prefect API url to trigger flows (replace <endpoint>)
prefect_api_url = '<https://api.prefect.io/api/flow_runs/>'
# Payload to trigger a flow
payload = {
"flow_id": "YOUR_FLOW_ID",
"parameters": message.get('parameters', {})
}
# Trigger the flow run
response = <http://requests.post|requests.post>(prefect_api_url, json=payload)
response.raise_for_status()
return {
'statusCode': 200,
'body': json.dumps('Flow triggered successfully!')
}
This approach allows you to use the flexibility of SNS or RabbitMQ events combined with the automation capabilities of Prefect.
For detailed setups and more complex integrations, you might have to dive into both the AWS and Prefect documentation to tailor the solution to your specific needs. Here is some further reading that might be useful:
- Prefect Automations Documentation
- AWS Lambda Documentation
- RabbitMQ Documentation
This solution, though a bit tedious, would lend itself to various kinds of event-driven architectures. Marvelous, isn't it?Nate
06/27/2024, 9:25 PM@flow
def my_flow(foo: str):
pass
my_flow.serve(
triggers=[DeploymentTrigger(expect={"aws.pinged"}, # event name you'd choose for webhook
parameters={"foo": "{{ event.resource.get('some-label') }}"}
)
Nate
06/27/2024, 9:26 PMaws.pinged
event
• deployment triggered fired, templates event into parameteters
• flow runsMarvin
06/27/2024, 9:27 PMaws.pinged
event.
3. Deployment Trigger:
- Automation Configuration: Create an automation in Prefect that listens for the aws.pinged
event. Automations Guide
- Parameters: Use templating to pass event details into the deployment's parameters.
4. Flow Execution:
- Deployment: The deployment associated with the trigger starts a flow run using the provided parameters.
- Flow Run: The flow is executed to process the newly added file in the S3 bucket.
Here is an example of handling the webhook in Prefect:
Webhook Configuration Example (Curl):
bash
curl -X POST <https://api.prefect.cloud/hooks/34iV2SFke3mVa6y5Y-YUoA> -d "model_id=adhoc" -d "run_count=10" -d "friendly_name=test-user-input"
This allows you to dynamically set webhook values and trigger Prefect events, leading to automated workflow runs based on S3 events.
For more detailed configurations, refer to:
- Prefect Webhooks Guide
- Example Use Case with a Flow
By building this pipeline, you can ensure that your data processing begins as soon as new files are added to your S3 bucket. Ah, the monotony of endless automation.