David Martin Calalang
07/24/2025, 3:01 PMMarvin
07/24/2025, 3:01 PMMarvin
07/24/2025, 3:02 PMDavid Martin Calalang
07/24/2025, 3:06 PMMarvin
07/24/2025, 3:09 PMMarvin
07/24/2025, 3:10 PMbash
pip install "prefect[aws]" boto3
Here's a comprehensive example that shows how to create a Prefect flow that sends messages to an SNS topic:
python
from prefect import flow, task
from prefect_aws import AwsCredentials
import boto3
@task
def get_sns_client():
# Load AWS credentials from Prefect block
aws_credentials = AwsCredentials.load("default")
# Create boto3 SNS client using the credentials
session = boto3.Session(
aws_access_key_id=aws_credentials.aws_access_key_id,
aws_secret_access_key=aws_credentials.aws_secret_access_key,
region_name=aws_credentials.region_name # e.g., 'us-east-1'
)
return session.client('sns')
@task
def publish_message(client, topic_arn: str, message: str):
response = client.publish(
TopicArn=topic_arn,
Message=message
)
return response['MessageId']
@flow
def send_sns_notification(topic_arn: str, message: str):
# Get SNS client
sns_client = get_sns_client()
# Publish message
message_id = publish_message(sns_client, topic_arn, message)
print(f"Message sent successfully! Message ID: {message_id}")
return message_id
Before running this flow, you'll need to set up your AWS credentials in Prefect. Here's how to do that:
1. First, create an AWS Credentials block in Prefect:
python
from prefect_aws import AwsCredentials
aws_credentials = AwsCredentials(
aws_access_key_id="your_access_key_id",
aws_secret_access_key="your_secret_access_key",
region_name="your_region" # e.g., "us-east-1"
)
aws_credentials.save("default")
2. Then you can run your flow:
python
# Example usage
topic_arn = "arn:aws:sns:region:account-id:topic-name"
message = "Hello from Prefect!"
send_sns_notification(topic_arn, message)
Some important notes about this implementation:
1. Error Handling: You might want to add error handling for AWS-specific exceptions. Here's an enhanced version of the publish task:
python
from botocore.exceptions import ClientError
@task(retries=3, retry_delay_seconds=30)
def publish_message(client, topic_arn: str, message: str):
try:
response = client.publish(
TopicArn=topic_arn,
Message=message
)
return response['MessageId']
except ClientError as e:
print(f"Error publishing message: {e}")
raise
2. Best Practices:
- Store your AWS credentials securely using Prefect blocks
- Use environment variables or secrets management for sensitive information
- Consider implementing retries for resilience (as shown in the enhanced example)
- Use appropriate IAM permissions (minimal required permissions)
3. Additional Features: You can extend this basic example to include:
- Message attributes
- Different message structures (JSON)
- Subject lines
- Message filtering
Here's an example with more features:
python
@task
def publish_detailed_message(
client,
topic_arn: str,
message: str,
subject: str = None,
message_attributes: dict = None
):
publish_args = {
"TopicArn": topic_arn,
"Message": message
}
if subject:
publish_args["Subject"] = subject
if message_attributes:
publish_args["MessageAttributes"] = message_attributes
response = client.publish(**publish_args)
return response['MessageId']
@flow
def send_detailed_sns_notification(
topic_arn: str,
message: str,
subject: str = None,
message_attributes: dict = None
):
sns_client = get_sns_client()
message_id = publish_detailed_message(
sns_client,
topic_arn,
message,
subject,
message_attributes
)
return message_id
Usage example with additional features:
```pythonMarvin
07/24/2025, 3:10 PMMarvin
07/24/2025, 3:14 PMMarvin
07/24/2025, 3:14 PMpython
from prefect import flow, task
from prefect.logging import get_run_logger
import boto3
import os
from typing import Optional
@task
def get_sns_client():
# Using environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
return boto3.client('sns')
@task
def send_sns_alert(topic_arn: str, error_message: str, task_name: Optional[str] = None):
logger = get_run_logger()
sns = get_sns_client()
subject = f"Prefect Task Failure: {task_name}" if task_name else "Prefect Task Failure"
try:
response = sns.publish(
TopicArn=topic_arn,
Subject=subject[:100], # SNS subject has 100 character limit
Message=error_message
)
<http://logger.info|logger.info>(f"SNS alert sent successfully. MessageId: {response['MessageId']}")
return response['MessageId']
except Exception as e:
logger.error(f"Failed to send SNS alert: {str(e)}")
raise
# Example task that might fail
@task(on_failure=[lambda task, flow: send_sns_alert(
topic_arn="your-sns-topic-arn",
error_message=f"Task {task.name} failed with error: {str(task.get_state().result())}",
task_name=task.name
)])
def risky_task():
# Your task logic here
raise ValueError("Something went wrong!")
@flow
def main_flow():
try:
risky_task()
except Exception as e:
print(f"Flow caught error: {str(e)}")
To use this in your actual flows, here's a more complete example with multiple tasks:
python
from prefect import flow, task
from prefect.logging import get_run_logger
import boto3
import os
from typing import Optional, Callable
from functools import partial
def create_failure_handler(topic_arn: str) -> Callable:
"""Creates a failure handler that sends SNS notifications"""
def failure_handler(task, flow):
error_message = (
f"Task Failure in Flow '{flow.name}'\n"
f"Task: {task.name}\n"
f"Error: {str(task.get_state().result())}\n"
f"Flow Run ID: {flow.id}"
)
send_sns_alert.with_options(retries=3, retry_delay_seconds=5)(
topic_arn=topic_arn,
error_message=error_message,
task_name=task.name
)
return failure_handler
@task
def get_sns_client():
return boto3.client('sns')
@task
def send_sns_alert(topic_arn: str, error_message: str, task_name: Optional[str] = None):
logger = get_run_logger()
sns = get_sns_client()
subject = f"Prefect Task Failure: {task_name}" if task_name else "Prefect Task Failure"
try:
response = sns.publish(
TopicArn=topic_arn,
Subject=subject[:100],
Message=error_message
)
<http://logger.info|logger.info>(f"SNS alert sent successfully. MessageId: {response['MessageId']}")
return response['MessageId']
except Exception as e:
logger.error(f"Failed to send SNS alert: {str(e)}")
raise
# Create a failure handler with your SNS topic
sns_failure_handler = create_failure_handler(
topic_arn="arn:aws:sns:region:account-id:your-topic-name"
)
@task(on_failure=[sns_failure_handler])
def process_data():
# Your data processing logic
pass
@task(on_failure=[sns_failure_handler])
def validate_results():
# Your validation logic
pass
@flow
def main_flow():
logger = get_run_logger()
try:
# Your flow logic
process_data()
validate_results()
except Exception as e:
logger.error(f"Flow failed: {str(e)}")
raise
To use this code:
1. Make sure your environment variables are set:
```bash
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_keyMarvin
07/24/2025, 3:14 PM2. Replace the `topic_arn` with your actual SNS topic ARN.
3. Add the failure handler to any task that you want to monitor:
```python
@task(on_failure=[sns_failure_handler])
def your_task():
# Your task logic here
pass
Key features of this implementation:
1. Environment Variables: Uses AWS credentials from environment variables automatically through boto3
2. Retry Logic: The SNS alert task has retry capabilities
3. Detailed Error Messages: Includes task name, flow name, and error details in the notification
4. Logging: Proper logging for debugging and monitoring
5. Reusable: The failure handler can be easily added to any task
You can customize the error message format by modifying the failure_handler function. For example, you might want to add:
- Timestamp
- Environment information
- Link to the Prefect UI for the failed run
- Custom tags or metadata
Here's how you could enhance the error message:
python
from datetime import datetime
def create_failure_handler(topic_arn: str, environment: str = "production") -> Callable:
def failure_handler(task, flow):
timestamp = datetime.utcnow().isoformat()
error_message = (
f":warning: Task Failure Alert :warning:\n"
f"Timestamp (UTC): {timestamp}\n"
f"Environment: {environment}\n"
f"Flow: {flow.name}\n"
f"Task: {task.name}\n"
f"Error: {str(task.get_state().result())}\n"
f"Flow Run ID: {flow.id}\n"
f"Task Run ID: {task.id}\n"
# If you're using Prefect Cloud or have a UI URL:
# f"UI Link: {your_prefect_ui_base_url}/flow-runs/{flow.id}\n"
)
send_sns_alert.with_options(retries=3, retry_delay_seconds=5)(
topic_arn=topic_arn,
error_message=error_message,
task_name=task.name
)
return failure_handler
Would you like me to explain any part of this implementation in more detail or show you how to add any specific additional features to the notifications?David Martin Calalang
07/24/2025, 3:38 PMMarvin
07/24/2025, 3:38 PMMarvin
07/24/2025, 3:38 PMpython
from prefect import flow, task
from prefect.logging import get_run_logger
import boto3
from typing import Optional, Callable
@task
def get_sns_client():
return boto3.client('sns')
@task
def send_sns_alert(topic_arn: str, error_message: str, task_name: Optional[str] = None):
logger = get_run_logger()
sns = get_sns_client()
subject = f"Prefect Task Failure: {task_name}" if task_name else "Prefect Task Failure"
try:
response = sns.publish(
TopicArn=topic_arn,
Subject=subject[:100], # SNS subject has 100 character limit
Message=error_message
)
<http://logger.info|logger.info>(f"SNS alert sent successfully. MessageId: {response['MessageId']}")
return response['MessageId']
except Exception as e:
logger.error(f"Failed to send SNS alert: {str(e)}")
raise
def create_failure_handler(topic_arn: str) -> Callable:
"""Creates a failure handler that sends SNS notifications"""
def failure_handler(task, task_run, state): # Corrected signature with three parameters
error_message = (
f"Task Failure in Flow\n"
f"Task: {task.name}\n"
f"Error: {state.message}\n"
f"Flow Run ID: {task_run.flow_run_id}"
)
send_sns_alert.with_options(retries=3, retry_delay_seconds=5)(
topic_arn=topic_arn,
error_message=error_message,
task_name=task.name
)
return failure_handler
# Create a failure handler with your SNS topic
sns_failure_handler = create_failure_handler(
topic_arn="your-sns-topic-arn" # Replace with your actual SNS topic ARN
)
@task(on_failure=[sns_failure_handler])
def risky_task():
# Your task logic here
raise ValueError("Something went wrong!")
@flow
def main_flow():
try:
risky_task()
except Exception as e:
print(f"Flow caught error: {str(e)}")
if __name__ == "__main__":
main_flow()
The key changes are:
1. Updated the failure handler signature to accept three parameters:
- `task`: The task object
- `task_run`: The task run object
- `state`: The state object containing error information
2. Updated how we access the error information using state.message instead of trying to get it from the task directly
If you want a more detailed error message, you can enhance the failure handler like this:
python
from datetime import datetime
def create_failure_handler(topic_arn: str, environment: str = "production") -> Callable:
def failure_handler(task, task_run, state):
timestamp = datetime.utcnow().isoformat()
error_message = (
f"⚠️ Task Failure Alert ⚠️\n"
f"Timestamp (UTC): {timestamp}\n"
f"Environment: {environment}\n"
f"Task: {task.name}\n"
f"Error: {state.message}\n"
f"Flow Run ID: {task_run.flow_run_id}\n"
f"Task Run ID: {task_run.id}\n"
)
send_sns_alert.with_options(retries=3, retry_delay_seconds=5)(
topic_arn=topic_arn,
error_message=error_message,
task_name=task.name
)
return failure_handler
To use this in your code:
1. Replace the topic_arn with your actual SNS topic ARN
2. Make sure your AWS credentials are properly set in your environment variables
3. You can add the failure handler to any task you want to monitor:
python
@task(on_failure=[sns_failure_handler])
def your_task():
# Your task logic here
pass
This should now work correctly with Prefect 3.x. Would you like me to explain any part of this implementation in more detail or help you with any specific customization?