redsquare
12/14/2022, 12:25 PMAnna Geller
12/14/2022, 1:10 PMimport asyncio
from prefect.cli.cloud import get_current_workspace
from prefect.client.cloud import get_cloud_client, CloudUnauthorizedError
async def get_workspace() -> str:
try:
async with get_cloud_client() as client:
workspaces = await client.read_workspaces()
current_workspace = get_current_workspace(workspaces)
workspace_handle = current_workspace.split("/")[-1]
return workspace_handle
except CloudUnauthorizedError:
return "default" # means: local Orion instance and the default CLI Profile
def get_env() -> str:
"""
This could be replaced by some other logic to return whether you run sth in dev vs. prod
:return: string representing the environment, same as assigned to block names
"""
return asyncio.run(get_workspace())
redsquare
12/14/2022, 1:14 PMAnna Geller
12/14/2022, 4:51 PMfrom prefect.blocks.core import Block
from prefect.blocks.notifications import SlackWebhook
from prefect.settings import PREFECT_API_URL
from pydantic import Field
from typing import Any
from uuid import UUID
from dataplatform.environment import get_env
class Workspace(Block):
"""
Manage alerts and workspace metadata
Args:
name: environment name e.g. dev, staging, prod
metadata: key-value pairs representing workspace information
Example:
Load a stored JSON value:
```python
from dataplatform.blocks import Workspace
workspace = Workspace.load("BLOCK_NAME")
workspace.send_alert("Alert from Prefect! :rocket:")
"""
_block_type_name = "Workspace"
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/19W3Di10hhb4oma2Qer0x6/764d1e7b4b9974cd268c775a488b9d26/image16.png?h=250"
_block_schema_capabilities = ["send_alert", "send_alert_on_failure"]
name: str = Field(default="dev", description="The name of your workspace.")
block_name: str = Field(
default="default",
description="Block name used for default blocks bound to this workspace",
)
metadata: Any = Field(
default=dict(workspace_owner="Data Team"),
description="A JSON-compatible field for storing additional workspace settings.",
)
def send_alert(self, message: str) -> None:
webhook = SlackWebhook.load(self.block_name)
webhook.notify(message)
@staticmethod
def _get_task_run_page_url(task_run_id: UUID) -> str:
"""
Returns a link to the task run page.
Args:
task_run_id: the task run id.
"""
api_url = PREFECT_API_URL.value() or "http://ephemeral-orion/api"
ui_url = (
api_url.replace("api", "app")
.replace("app/accounts", "account")
.replace("workspaces", "workspace")
)
return f"{ui_url}/flow-runs/task-run/{task_run_id}"
def send_alert_on_failure(self, state, failure_reason: str = None):
task_run_id = state.state_details.task_run_id
flow_run_id = state.state_details.flow_run_id
reason = failure_reason or f"Flow run {flow_run_id}"
url = self._get_task_run_page_url(task_run_id)
workspace = get_env()
if state.name == "Failed":
self.send_alert(f"`{reason}` failed in {workspace}. Details: {url}")```
as extension to this demo block https://github.com/anna-geller/prefect-dataplatform/blob/main/dataplatform/blocks/workspace.pyredsquare
12/14/2022, 5:00 PM