12/14/2022, 12:25 PM
Would it be possible to include the workspace name on notifications?

Anna Geller

12/14/2022, 1:10 PM
you could do something similar to this to get the workspace name via code
import asyncio
from import get_current_workspace
from import get_cloud_client, CloudUnauthorizedError

async def get_workspace() -> str:
        async with get_cloud_client() as client:
            workspaces = await client.read_workspaces()
            current_workspace = get_current_workspace(workspaces)
            workspace_handle = current_workspace.split("/")[-1]
            return workspace_handle
    except CloudUnauthorizedError:
        return "default"  # means: local Orion instance and the default CLI Profile

def get_env() -> str:
    This could be replaced by some other logic to return whether you run sth in dev vs. prod
    :return: string representing the environment, same as assigned to block names


12/14/2022, 1:14 PM
Sure - I meant the notifications into Slack

Anna Geller

12/14/2022, 4:51 PM
gotcha, I thought that's the easy part. Here is a more comprehensive example:
from prefect.blocks.core import Block
from prefect.blocks.notifications import SlackWebhook
from prefect.settings import PREFECT_API_URL
from pydantic import Field
from typing import Any
from uuid import UUID
from dataplatform.environment import get_env

class Workspace(Block):

    Manage alerts and workspace metadata

        name: environment name e.g. dev, staging, prod
        metadata: key-value pairs representing workspace information

        Load a stored JSON value:
        from dataplatform.blocks import Workspace

        workspace = Workspace.load("BLOCK_NAME")
        workspace.send_alert("Alert from Prefect! :rocket:")
""" _block_type_name = "Workspace" _logo_url = "" _block_schema_capabilities = ["send_alert", "send_alert_on_failure"] name: str = Field(default="dev", description="The name of your workspace.") block_name: str = Field( default="default", description="Block name used for default blocks bound to this workspace", ) metadata: Any = Field( default=dict(workspace_owner="Data Team"), description="A JSON-compatible field for storing additional workspace settings.", ) def send_alert(self, message: str) -> None: webhook = SlackWebhook.load(self.block_name) webhook.notify(message) @staticmethod def _get_task_run_page_url(task_run_id: UUID) -> str: """ Returns a link to the task run page. Args: task_run_id: the task run id. """ api_url = PREFECT_API_URL.value() or "http://ephemeral-orion/api" ui_url = ( api_url.replace("api", "app") .replace("app/accounts", "account") .replace("workspaces", "workspace") ) return f"{ui_url}/flow-runs/task-run/{task_run_id}" def send_alert_on_failure(self, state, failure_reason: str = None): task_run_id = state.state_details.task_run_id flow_run_id = state.state_details.flow_run_id reason = failure_reason or f"Flow run {flow_run_id}" url = self._get_task_run_page_url(task_run_id) workspace = get_env() if == "Failed": self.send_alert(f"`{reason}` failed in {workspace}. Details: {url}")``` as extension to this demo block


12/14/2022, 5:00 PM
Thanks @Anna Geller
🙌 1