Frank Colson
01/22/2024, 3:36 AMfrom prefect.blocks.core import Block
from prefect_aws import AwsCredentials
from pydantic import VERSION as PYDANTIC_VERSION
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
if PYDANTIC_VERSION.startswith("2."):
    from pydantic.v1 import Field
else:
    from pydantic import Field
class CloudMapBlock(Block):
    namespace_name: str = Field(..., description="The namespace to use for the CloudMap service")
    discovery_name: str = Field(..., description="The name of the CloudMap service to discover")
    credentials: AwsCredentials = Field(default_factory=AwsCredentials, description="The AWS credentials to use for the CloudMap service")
    
    def _get_servicediscovery_client(self):
        return self.credentials.get_client("servicediscovery")
    
    @sync_compatible
    async def get_host(self, return_port: bool = False):
        client = self._get_servicediscovery_client()
        response = await run_sync_in_worker_thread(
            client.discover_instances,
            NamespaceName=self.namespace_name,
            ServiceName=self.discovery_name,
        )
        if not response["Instances"]:
            raise ValueError(f"No instances found for {self.namespace_name}/{self.discovery_name}")
        first_health_instance = next(filter(lambda x: x["HealthStatus"] in ["HEALTHY", "UNKNOWN"], response["Instances"]), None)
        if first_health_instance is None:
            raise ValueError(f"No healthy instances found for {self.namespace_name}/{self.discovery_name}")
        hostname = first_health_instance["Attributes"]["AWS_INSTANCE_IPV4"]
        port_number = first_health_instance["Attributes"].get("AWS_INSTANCE_PORT", None)
        if port_number is not None and return_port:
            return f"{hostname}:{port_number}"
        
        return hostname