Ben T
07/09/2025, 6:07 PMfrom prefect_slack import SlackWebhook
slack_webhook_block = SlackWebhook.load("slack-notifications")
what is the automation for if i have to do this?
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
slack_webhook_block.notify(
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
raise ValueError("oops!")
if __name__ == "__main__":
failing_flow()
Marvin
07/09/2025, 6:07 PMNate
07/09/2025, 6:08 PMMarvin
07/09/2025, 6:09 PMpython
from prefect.blocks.notifications import SlackWebhook
# Create and save the block
slack_webhook = SlackWebhook(
url="your-webhook-url" # Get this from Slack
)
slack_webhook.save("slack-notifications")
2. For basic flow state notifications, you don't need to write any notification code in your flow. The automation handles everything! Here's a minimal flow:
python
from prefect import flow
@flow
def my_flow():
# Your flow logic here
pass
3. The automation you set up in the UI will automatically send notifications based on the triggers you defined. For example, you can set up an automation to:
- Trigger when any flow enters a "Failed" state
- Send a notification using your "slack-notifications" block
- Include details about the flow run, state, and a link to the UI
The beauty of using automations is that:
- You don't need to modify your flow code
- You can change notification settings without touching code
- You can apply the same notification rules to multiple flows
- You get consistent formatting across all notifications
If you need more customized notifications, you can still use the block directly:
python
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow
def my_flow():
# Load the saved block
slack_webhook = SlackWebhook.load("slack-notifications")
# Send a custom notification
slack_webhook.notify("Custom message here!")
The automation approach is generally preferred because it:
1. Separates concerns (keeps notification logic out of flow code)
2. Provides consistent notifications across your entire workflow
3. Allows for centralized notification management
4. Can be modified without redeploying flows
For more details on setting up Slack integrations, check out the prefect-slack documentation.Ben T
07/10/2025, 1:11 AMMarvin
07/10/2025, 1:11 AMMarvin
07/10/2025, 1:14 AMpython
from prefect.blocks.notifications import SlackWebhook
# Test the block directly
slack_webhook = SlackWebhook.load("slack-notifications")
slack_webhook.notify("Test message") # This should send a message to your Slack channel
2. For the automation setup in the UI, here are the exact steps that should work:
a. Go to the Automations page in the Prefect UI
b. Click "Create Automation"
c. Set up the trigger:
- Select "Flow run state update" as the trigger type
- Choose the state you want to monitor (e.g., "Failed")
- Optionally add any flow name filters
d. Set up the action:
- Select "Send a notification" as the action type
- Choose your "slack-notifications" block
- In the message body, use this template:
Flow: {{ flow.name }}
State: {{ flow_run.state.name }}
Message: {{ flow_run.state.message }}
Time: {{ flow_run.state.timestamp }}
3. Create a test flow to verify:
python
from prefect import flow
@flow
def test_notifications():
raise Exception("Testing notifications")
if __name__ == "__main__":
test_notifications()
If this still isn't working, let's troubleshoot:
1. What version of Prefect are you running? (prefect version
)
2. Are you using Prefect Cloud or self-hosted?
3. When you run the test code to send a message directly via the block (step 1 above), does it work?
4. In the UI, after setting up the automation, does it show as "Active"?
5. When you run a flow that should trigger the automation, do you see any errors in the Prefect logs?
The most common issues are:
- Incorrect webhook URL in the block
- Automation not properly activated
- Missing permissions in the Slack app configuration
- Network connectivity issues between Prefect and Slack
Let me know which of these tests work and which don't, and we can narrow down the problem.Ben T
07/10/2025, 1:53 AMMarvin
07/10/2025, 1:53 AMMarvin
07/10/2025, 1:54 AM<http://hooks.slack.com|hooks.slack.com>
- *.<http://hooks.slack.com|hooks.slack.com>
- The IP ranges can be found in Slack's IP ranges documentation
2. Logging and Troubleshooting:
You can check several places for failure logs:
a. Prefect logs:
bash
prefect logs -f "flow-run/YOUR_FLOW_RUN_ID"
b. Enable debug logging in your Prefect deployment:
python
import logging
from prefect import flow
logging.basicConfig(level=logging.DEBUG)
@flow(log_prints=True)
def your_flow():
...
c. Test connectivity directly:
python
import requests
import logging
logging.basicConfig(level=logging.DEBUG)
# This will show detailed HTTP request/response info
logging.getLogger('urllib3').setLevel(logging.DEBUG)
webhook_url = "your_slack_webhook_url"
response = <http://requests.post|requests.post>(webhook_url, json={"text": "Test message"})
print(f"Status Code: {response.status_code}")
print(f"Response: {response.text}")
3. Quick Verification Steps:
1. First, verify basic connectivity:
bash
# Test if you can reach Slack's webhook domain
curl -v <https://hooks.slack.com>
2. Check if your Prefect worker has internet access:
python
from prefect import flow
import requests
@flow
def test_connectivity():
try:
response = requests.get("<https://hooks.slack.com>")
print(f"Status code: {response.status_code}")
except Exception as e:
print(f"Connection failed: {e}")
test_connectivity()
4. Common VNet Issues:
- Missing DNS resolution for <http://hooks.slack.com|hooks.slack.com>
- Outbound traffic blocked by default NSG rules
- Proxy requirements in your environment
- Missing route table entries for internet access
If you're still having issues after adding the NSG rules, we can dig deeper into the logs. Let me know what you find when you check the connectivity and logs.Ben T
07/10/2025, 3:10 AMfrom prefect.blocks.notifications import SlackWebhook
try:
# Test the block directly
slack_webhook = SlackWebhook.load("slack-notifications")
slack_webhook.notify("Test message")
except Exception as e:
logger.error(f"❌ Failed to send test Slack notification: {e}")
Marvin
07/10/2025, 3:10 AMMarvin
07/10/2025, 3:10 AMnotify
method is asynchronous in Prefect 3.x. You need to either use await
in an async function or use the sync version. Here's both approaches:
Async Version (preferred):
python
from prefect.blocks.notifications import SlackWebhook
import asyncio
async def test_slack():
try:
slack_webhook = await SlackWebhook.load("slack-notifications")
await slack_webhook.notify("Test message")
print("✅ Notification sent successfully!")
except Exception as e:
print(f"❌ Failed to send test Slack notification: {e}")
# Run the async function
asyncio.run(test_slack())
Sync Version (if you need to use it in a synchronous context):
python
from prefect.blocks.notifications import SlackWebhook
from prefect.utilities.asyncutils import sync_compatible
try:
slack_webhook = SlackWebhook.load("slack-notifications")
slack_webhook.notify_sync("Test message") # Note the _sync suffix
print("✅ Notification sent successfully!")
except Exception as e:
print(f"❌ Failed to send test Slack notification: {e}")
The error you're seeing is because you're trying to call the async notify
method without awaiting it. Choose either of the above approaches based on your context. The async version is generally preferred in Prefect 3.x, but both will work.
If you're testing this inside a flow, you can use the async version like this:
python
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow
async def test_flow():
try:
slack_webhook = await SlackWebhook.load("slack-notifications")
await slack_webhook.notify("Test message from flow")
print("✅ Notification sent successfully!")
except Exception as e:
print(f"❌ Failed to send test Slack notification: {e}")
if __name__ == "__main__":
asyncio.run(test_flow())