Frank Colson
01/22/2024, 3:36 AMfrom prefect.blocks.core import Block
from prefect_aws import AwsCredentials
from pydantic import VERSION as PYDANTIC_VERSION
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field
else:
from pydantic import Field
class CloudMapBlock(Block):
namespace_name: str = Field(..., description="The namespace to use for the CloudMap service")
discovery_name: str = Field(..., description="The name of the CloudMap service to discover")
credentials: AwsCredentials = Field(default_factory=AwsCredentials, description="The AWS credentials to use for the CloudMap service")
def _get_servicediscovery_client(self):
return self.credentials.get_client("servicediscovery")
@sync_compatible
async def get_host(self, return_port: bool = False):
client = self._get_servicediscovery_client()
response = await run_sync_in_worker_thread(
client.discover_instances,
NamespaceName=self.namespace_name,
ServiceName=self.discovery_name,
)
if not response["Instances"]:
raise ValueError(f"No instances found for {self.namespace_name}/{self.discovery_name}")
first_health_instance = next(filter(lambda x: x["HealthStatus"] in ["HEALTHY", "UNKNOWN"], response["Instances"]), None)
if first_health_instance is None:
raise ValueError(f"No healthy instances found for {self.namespace_name}/{self.discovery_name}")
hostname = first_health_instance["Attributes"]["AWS_INSTANCE_IPV4"]
port_number = first_health_instance["Attributes"].get("AWS_INSTANCE_PORT", None)
if port_number is not None and return_port:
return f"{hostname}:{port_number}"
return hostname