Ravish Mallya
07/16/2025, 8:15 PMMarvin
07/16/2025, 8:16 PMMarvin
07/16/2025, 8:16 PMpython
from prefect import flow, task
@task(log_prints=True) # Captures print statements as logs
def my_task():
print("This will be captured in Prefect logs")
@flow(log_prints=True)
def my_flow():
print("Flow-level logging")
my_task()
2. Custom Logging
You can configure custom logging handlers to send logs to external systems:
python
import logging
from prefect import flow
# Configure custom logger
custom_handler = logging.StreamHandler()
custom_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
@flow
def my_flow():
logger = get_run_logger() # Get Prefect logger
logger.addHandler(custom_handler)
<http://logger.info|logger.info>("This will go to both Prefect and custom handler")
3. Metrics and Events
- Prefect automatically tracks various metrics like flow run duration, task run counts, and state transitions
- You can view these metrics in the Prefect UI
- You can set up notifications based on these metrics using the UI or programmatically
4. Best Practices for Observability:
- Use meaningful flow and task names for better tracking
- Implement appropriate logging levels (DEBUG, INFO, WARNING, ERROR)
- Set up notifications for critical state changes
- Use tags to organize and filter flows
- Leverage flow run metadata for custom metrics
5. Monitoring Through Prefect UI
- Access detailed logs for each flow and task run
- View flow run history and performance metrics
- Monitor infrastructure health and dependencies
- Track state changes and transitions
6. Integration with External Tools
You can integrate Prefect with various observability platforms:
- Send logs to ElasticSearch or Splunk
- Export metrics to Prometheus
- Set up custom webhooks for notifications
- Integrate with APM tools
Here's an example combining several observability features:
python
from prefect import flow, task, get_run_logger
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
log_prints=True,
tags=["critical"]
)
def monitored_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting critical operation")
# Your task logic here
logger.debug("Operation details...")
@flow(
log_prints=True,
timeout_seconds=3600, # Set timeout for monitoring
description="Flow with enhanced observability"
)
def observable_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Flow started")
monitored_task()
For more detailed information, you can refer to:
- Troubleshooting Guide
- Prefect Metrics Changelog
Remember that Prefect Cloud provides additional observability features like:
- Global flow run tracking
- Team-wide analytics
- Advanced notification settings
- SLA monitoring
- Infrastructure health metrics
Would you like me to elaborate on any specific aspect of observability in Prefect, such as setting up specific integrations or configuring advanced monitoring features?