Mehdi Lallouache
08/04/2025, 8:49 AMMarvin
08/04/2025, 8:49 AMMarvin
08/04/2025, 8:50 AMpython
from prefect.client.orchestration import get_client
from prefect.events import Event
from prefect.automations import create_automation
from prefect.automations.triggers import when
from prefect.automations.actions import send_notification
async def check_consecutive_states(event: Event, state_to_check: str, n: int = 3):
"""Check if a flow has entered the same state n times in a row"""
client = get_client()
# Get recent flow runs for this flow
flow_runs = await client.read_flow_runs(
flow_id=event.resource.id,
sort="-start_time",
limit=n
)
# Check if the last n runs are all in the specified state
consecutive_count = 0
for run in flow_runs:
if run.state.type.value == state_to_check:
consecutive_count += 1
else:
break
return consecutive_count >= n
# Create the automation
automation = await create_automation(
name="Alert on Consecutive Failed Runs",
description="Send alert when flow fails 3 times in a row",
trigger=when(
event_trigger=when.flow_run.completed(),
condition=lambda event: check_consecutive_states(event, "FAILED", 3)
),
actions=[
send_notification(
message="⚠️ Alert: Flow '{event.resource.name}' has failed 3 times in a row!",
# You can use various notification blocks here: email, slack, etc.
block_name="email-notification"
)
]
)
Key points about this implementation:
1. The automation triggers whenever a flow run completes (when.flow_run.completed()
)
2. It checks the last N runs of the flow to see if they're all in the specified state
3. If the condition is met (N consecutive occurrences of the state), it sends a notification
To use this, you'll need to:
1. Set up a notification block in Prefect (email, Slack, etc.)
2. Adjust the state_to_check
parameter to match your needs (e.g., "FAILED", "CRASHED", etc.)
3. Modify the n
parameter to specify how many consecutive occurrences you want to check for
You can also customize the notification message and add additional actions if needed.
Some variations you might consider:
- Check for different states (not just failures)
- Add more sophisticated conditions (like time windows)
- Send different types of notifications (Slack, email, webhooks, etc.)
- Include more details in the notification (run IDs, timestamps, etc.)
For setting up notification blocks, you can use the Prefect UI or create them programmatically. For example, for email notifications:
python
from prefect.blocks.notifications import EmailServerNotifications
email_block = EmailServerNotifications(
username="<mailto:your-email@example.com|your-email@example.com>",
password="your-password",
host="<http://smtp.example.com|smtp.example.com>",
port=587,
from_address="<mailto:alerts@example.com|alerts@example.com>"
)
email_block.save("email-notification")
Would you like me to provide more specific details about any part of this implementation, or would you like to see examples for different types of notifications?Mehdi Lallouache
08/04/2025, 8:52 AMMarvin
08/04/2025, 8:52 AMMarvin
08/04/2025, 8:52 AMFlow '{{event.resource.name}}' has entered state {{event.payload.state.type.value}}
However, I should note that while the UI makes it easy to set up basic automations, the specific requirement of checking for "n consecutive states" is more complex and currently requires a custom implementation through code (as shown in my previous response). The UI doesn't have a built-in way to look at the history of previous runs.
Alternative Approaches Through the UI:
1. You could set up simpler automations that trigger on each state change and then manually monitor the frequency
2. Create multiple automations for different scenarios you want to monitor
Would you like me to:
1. Show you how to set up a basic state-change automation through the UI?
2. Explain how to create and configure the notification blocks in the UI?
3. Or would you prefer to explore a different approach to monitoring consecutive states?Mehdi Lallouache
08/04/2025, 9:30 AMMarvin
08/04/2025, 9:30 AMMarvin
08/04/2025, 9:31 AM"retention_period": "P7D"
)
- For Prefect Cloud, the exact retention periods aren't publicly documented, but they are tied to your subscription level
3. Accessing Historical Logs:
- Recent logs are available in real-time through the UI and API
- The retention period starts after a flow run reaches a terminal state
- For paid tiers, you typically get access to longer historical data
If you need specific retention period information for your use case, I'd recommend:
1. Checking your current plan details in Prefect Cloud
2. Contacting Prefect support for exact retention periods for your tier
3. For critical logs, consider implementing your own log archival solution by forwarding logs to a long-term storage system
Would you like me to:
1. Explain how to set up custom log handling for longer retention?
2. Show you how to query available logs through the API?
3. Provide information about different Prefect Cloud tiers and their features?Mehdi Lallouache
08/04/2025, 9:32 AMMarvin
08/04/2025, 9:32 AMMarvin
08/04/2025, 9:33 AMMehdi Lallouache
08/08/2025, 7:41 AMMarvin
08/08/2025, 7:41 AMexplore_module_offerings
📊 Tools used:
• check_cli_command
(1x)
• display_callable_signature
(15x)
• explore_module_offerings
(8x)
• get_latest_prefect_release_notes
(3x)
• research_prefect_topic
(2x)
• review_common_3x_gotchas
(2x)
• search_prefect_3x_docs
(17x)Marvin
08/08/2025, 7:44 AM