João Moreira
08/02/2024, 10:53 AMfrom prefect.blocks.notifications import SlackWebhook
# Provide your Slack Webhook URL
slack_webhook_block = SlackWebhook(
url="<https://hooks.slack.com/services/XXXXX>"
)
# Save Block with a name for reusability
slack_webhook_block.save(
name="notification-block",
overwrite=True
)
#2 Get notification UUID and create a flow notification policy:
import asyncio
from prefect.client import get_client
from prefect.client.schemas.actions import FlowRunNotificationPolicyCreate
async def get_block_document_and_create_policy():
async with get_client() as client:
block_document = await client.read_block_document_by_name(
name="notification-block",
block_type_slug="slack-webhook"
)
# Check if block_document is retrieved successfully
if block_document is None:
print("Error: Could not retrieve block document. Aborting notification policy creation.")
return
# Access the ID of the block document
block_document_id = block_document.id
# Print the block document ID to check its type
print(f"Block document ID: {block_document_id}, type: {type(block_document_id)}")
# Ensure the block_document_id is a valid UUID string
block_document_uuid = str(block_document_id)
print(f"Formatted block document UUID: {block_document_uuid}")
# Create the notification policy object
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"], # Add relevant tags
is_active=True,
state_names=["FAILED"],
block_document_id=block_document_uuid,
message_template="Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}."
)
# Create the notification policy
await client.create_flow_run_notification_policy(policy)
print("Flow run notification policy created successfully!")
if __name__ == "__main__":
asyncio.run(get_block_document_and_create_policy())
This raises an error:
11:49:42.442 | DEBUG | prefect.profiles - Using profile 'production'
11:49:42.533 | DEBUG | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
Block document ID: 338d998e-442e-46ce-a99d-55dbc1e7770d, type: <class 'uuid.UUID'>
Formatted block document UUID: 338d998e-442e-46ce-a99d-55dbc1e7770d
Traceback (most recent call last):
File "/opt/sportmultimedia/Repo/prefect-sportmultimedia/api-FlowRunNotificationPolicyCreate.py", line 42, in <module>
asyncio.run(get_block_document_and_create_policy())
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/opt/sportmultimedia/Repo/prefect-sportmultimedia/api-FlowRunNotificationPolicyCreate.py", line 38, in get_block_document_and_create_policy
await client.create_flow_run_notification_policy(policy)
File "/opt/sportmultimedia/Venvs/prefect-sportmultimedia/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2387, in create_flow_run_notification_policy
policy = FlowRunNotificationPolicyCreate(
File "/opt/sportmultimedia/Venvs/prefect-sportmultimedia/lib/python3.10/site-packages/pydantic/v1/main.py", line 341, in __init__
raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for FlowRunNotificationPolicyCreate
block_document_id
value is not a valid uuid (type=type_error.uuid)
But, passed value is indeed a UUID.
Question 1: Anyone, any thoughts?
Question 2: In message_template if i use {timestamp} i also get an error. Does anyone know where message templating is available in documentation?Alexander Azzam
08/02/2024, 11:07 AMblock_document_uuid=str(…)
, and then pass that back in its now a string object and not a UUID object (even though the str has a UUID format).João Moreira
08/02/2024, 11:18 AM# Ensure the block_document_id is a valid UUID string
block_document_uuid = str(block_document_id)
Alexander Azzam
08/02/2024, 11:20 AMJoão Moreira
08/02/2024, 11:23 AM# Ensure the block_document_id is a valid UUID
if isinstance(block_document_id, uuid.UUID):
block_document_uuid = block_document_id
else:
block_document_uuid = uuid.UUID(block_document_id)
print(f"Formatted block document UUID: {block_document_uuid}")
# Create the notification policy object
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"], # Add relevant tags
is_active=True,
state_names=["FAILED"],
block_document_id=block_document_uuid,
message_template="Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}."
)
Alexander Azzam
08/02/2024, 11:31 AMJoão Moreira
08/02/2024, 11:35 AMprint(f"Block document ID: {block_document_id}, type: {type(block_document_id)}")
Block document ID: 338d998e-442e-46ce-a99d-55dbc1e7770d, type: <class 'uuid.UUID'>
Formatted block document UUID: 338d998e-442e-46ce-a99d-55dbc1e7770d
Error:
12:16:29.806 | DEBUG | prefect.profiles - Using profile 'production'
12:16:29.903 | DEBUG | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
Block document ID: 338d998e-442e-46ce-a99d-55dbc1e7770d, type: <class 'uuid.UUID'>
Formatted block document UUID: 338d998e-442e-46ce-a99d-55dbc1e7770d
Traceback (most recent call last):
File "/opt/sportmultimedia/Repo/prefect-sportmultimedia/api-FlowRunNotificationPolicyCreate.py", line 46, in <module>
asyncio.run(get_block_document_and_create_policy())
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/opt/sportmultimedia/Repo/prefect-sportmultimedia/api-FlowRunNotificationPolicyCreate.py", line 42, in get_block_document_and_create_policy
await client.create_flow_run_notification_policy(policy)
File "/opt/sportmultimedia/Venvs/prefect-sportmultimedia/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2387, in create_flow_run_notification_policy
policy = FlowRunNotificationPolicyCreate(
File "/opt/sportmultimedia/Venvs/prefect-sportmultimedia/lib/python3.10/site-packages/pydantic/v1/main.py", line 341, in __init__
raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for FlowRunNotificationPolicyCreate
block_document_id
value is not a valid uuid (type=type_error.uuid)
Alexander Azzam
08/02/2024, 11:37 AMJoão Moreira
08/02/2024, 11:57 AMimport asyncio
import uuid
from prefect.client import get_client
from prefect.client.schemas.actions import FlowRunNotificationPolicyCreate
async def get_block_document_and_create_policy():
async with get_client() as client:
block_document = await client.read_block_document_by_name(
name="notification-block",
block_type_slug="slack-webhook"
)
# Check if block_document is retrieved successfully
if block_document is None:
print("Error: Could not retrieve block document. Aborting notification policy creation.")
return
# Access the ID of the block document
block_document_id = block_document.id
# Print the block document ID to check its type
print(f"Block document ID: {block_document_id}, type: {type(block_document_id)}")
# Ensure the block_document_id is a valid UUID
# if isinstance(block_document_id, uuid.UUID):
# block_document_uuid = block_document_id
# else:
# block_document_uuid = uuid.UUID(block_document_id)
# print(f"Formatted block document UUID: {block_document_uuid}")
# Create the notification policy object
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"], # Add relevant tags
is_active=True,
state_names=["FAILED"],
# block_document_id=block_document_uuid,
block_document_id=block_document_id,
message_template="Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}."
)
# Create the notification policy
await client.create_flow_run_notification_policy(policy)
print("Flow run notification policy created successfully!")
if __name__ == "__main__":
asyncio.run(get_block_document_and_create_policy())
Error:
12:55:21.831 | DEBUG | prefect.profiles - Using profile 'production'
12:55:21.927 | DEBUG | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
Block document ID: 338d998e-442e-46ce-a99d-55dbc1e7770d, type: <class 'uuid.UUID'>
Traceback (most recent call last):
File "/opt/sportmultimedia/Repo/prefect-sportmultimedia/api-FlowRunNotificationPolicyCreate.py", line 47, in <module>
asyncio.run(get_block_document_and_create_policy())
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/opt/sportmultimedia/Repo/prefect-sportmultimedia/api-FlowRunNotificationPolicyCreate.py", line 43, in get_block_document_and_create_policy
await client.create_flow_run_notification_policy(policy)
File "/opt/sportmultimedia/Venvs/prefect-sportmultimedia/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2387, in create_flow_run_notification_policy
policy = FlowRunNotificationPolicyCreate(
File "/opt/sportmultimedia/Venvs/prefect-sportmultimedia/lib/python3.10/site-packages/pydantic/v1/main.py", line 341, in __init__
raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for FlowRunNotificationPolicyCreate
block_document_id
value is not a valid uuid (type=type_error.uuid)