Saad80
07/24/2025, 5:37 PMMarvin
07/24/2025, 5:37 PMMarvin
07/24/2025, 5:38 PMSaad80
07/24/2025, 5:49 PMMarvin
07/24/2025, 5:49 PMMarvin
07/24/2025, 5:50 PM{service-name}.{namespace-name}.localpython
import boto3
def update_ray_dns(new_ip, zone_id, record_name):
    route53 = boto3.client('route53')
    response = route53.change_resource_record_sets(
        HostedZoneId=zone_id,
        ChangeBatch={
            'Changes': [
                {
                    'Action': 'UPSERT',
                    'ResourceRecordSet': {
                        'Name': record_name,
                        'Type': 'A',
                        'TTL': 60,
                        'ResourceRecords': [{'Value': new_ip}]
                    }
                }
            ]
        }
    )
    return responsepython
from prefect import flow
import boto3
def get_ray_head_ip():
    ssm = boto3.client('ssm')
    response = ssm.get_parameter(Name='/ray/head-node-ip')
    return response['Parameter']['Value']
@flow
def your_flow():
    ray_ip = get_ray_head_ip()
    # Use ray_ip to connect to Ray clusterpython
import boto3
def get_ray_head_ip_by_tag():
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances(
        Filters=[
            {
                'Name': 'tag:Role',
                'Values': ['ray-head-node']
            },
            {
                'Name': 'instance-state-name',
                'Values': ['running']
            }
        ]
    )
    # Get the private IP of the first matching instance
    return response['Reservations'][0]['Instances'][0]['PrivateIpAddress']Saad80
07/24/2025, 6:03 PMMarvin
07/24/2025, 6:03 PMMarvin
07/24/2025, 6:03 PMpython
from prefect import flow, task
import boto3
from typing import Optional
class RayClusterConfig:
    def __init__(self, region_name: str = "us-east-1"):
        self.ssm = boto3.client('ssm', region_name=region_name)
        self.parameter_name = "/ray/head-node-ip"
    
    def get_head_ip(self) -> str:
        try:
            response = self.ssm.get_parameter(Name=self.parameter_name)
            return response['Parameter']['Value']
        except self.ssm.exceptions.ParameterNotFound:
            raise ValueError(f"Ray head node IP not found in Parameter Store at {self.parameter_name}")
    
    def update_head_ip(self, new_ip: str) -> None:
        self.ssm.put_parameter(
            Name=self.parameter_name,
            Value=new_ip,
            Type='String',
            Overwrite=True
        )
@task
def get_ray_connection() -> str:
    config = RayClusterConfig()
    return config.get_head_ip()
@flow
def your_ray_flow():
    ray_ip = get_ray_connection()
    # Use ray_ip to connect to Ray cluster
    print(f"Connected to Ray cluster at {ray_ip}")python
import boto3
import requests
from time import sleep
def get_current_private_ip():
    # Get instance metadata
    response = requests.get('<http://169.254.169.254/latest/meta-data/local-ipv4>')
    return response.text
def update_loop():
    config = RayClusterConfig()
    while True:
        current_ip = get_current_private_ip()
        try:
            config.update_head_ip(current_ip)
        except Exception as e:
            print(f"Failed to update IP: {e}")
        sleep(60)  # Check every minute
if __name__ == "__main__":
    update_loop()Marvin
07/24/2025, 6:03 PMTo set up the EC2 instance tags, make sure your Ray head node is tagged appropriately:
```bash
aws ec2 create-tags --resources i-1234567890abcdef0 --tags Key=RayClusterRole,Value=head-nodejson
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ssm:GetParameter",
                "ssm:PutParameter"
            ],
            "Resource": "arn:aws:ssm:*:*:parameter/ray/*"
        }
    ]
}json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeInstances"
            ],
            "Resource": "*"
        }
    ]
}Saad80
07/25/2025, 6:31 PMMarvin
07/25/2025, 6:32 PMMarvin
07/25/2025, 6:33 PMpython
from prefect import flow
from prefect.blocks.system import Secret
@flow
def your_ray_flow():
    # This gets evaluated at runtime, not deployment time
    ray_ip_secret = Secret.load("ray-head-ip")
    ray_ip = ray_ip_secret.get()
    # Use ray_ip to connect to Ray clusterbash
prefect block register -m prefect.blocks.systemyaml
deployments:
  ray-deployment:
    name: ray-deployment
    work_pool:
      name: your-work-pool
      work_queue: default
    flow:
      path: ./your_flow.py:your_ray_flowpython
from prefect import flow, task
from prefect.blocks.system import Secret
@task
def get_ray_connection():
    # This gets evaluated at runtime
    ray_ip_secret = Secret.load("ray-head-ip")
    return ray_ip_secret.get()
@flow
def your_ray_flow():
    ray_ip = get_ray_connection()
    # Use ray_ip to connect to Ray clusterpython
# This would run on your Ray head node or as a Lambda
from prefect.client.cloud import get_client
from prefect.blocks.system import Secret
async def update_ray_ip(new_ip: str):
    async with get_client() as client:
        secret_block = await Secret.load("ray-head-ip")
        secret_block.value = new_ip
        await secret_block.save()Saad80
07/25/2025, 6:52 PMMarvin
07/25/2025, 6:52 PMMarvin
07/25/2025, 6:52 PMpython
from prefect import flow
import os
@flow
def your_ray_flow():
    # This will be evaluated when the flow actually runs
    ray_ip = os.getenv("RAY_HEAD_IP")
    if not ray_ip:
        raise ValueError("RAY_HEAD_IP environment variable not set")
    # Use ray_ip to connect to Ray clusterSaad80
07/25/2025, 6:52 PMMarvin
07/25/2025, 6:52 PMMarvin
07/25/2025, 6:52 PMpython
from prefect import flow
import boto3
@flow
def your_ray_flow():
    # This entire block executes at runtime
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances(
        Filters=[
            {
                'Name': 'tag:RayClusterRole',
                'Values': ['head-node']
            },
            {
                'Name': 'instance-state-name',
                'Values': ['running']
            }
        ]
    )
    
    if not response['Reservations'] or not response['Reservations'][0]['Instances']:
        raise ValueError("No running Ray head node found")
    
    ray_ip = response['Reservations'][0]['Instances'][0]['PrivateIpAddress']
    # Use ray_ip to connect to Ray clusterSaad80
07/25/2025, 6:53 PMMarvin
07/25/2025, 6:53 PMMarvin
07/25/2025, 6:54 PMpython
from prefect import flow, task
@task
def get_ray_ip():
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances(...)
    return response['Reservations'][0]['Instances'][0]['PrivateIpAddress']
@flow
def your_ray_flow():
    ray_ip = get_ray_ip()
    # Use ray_ippython
from prefect_aws import AwsCredentials
from prefect import flow
@flow
def your_ray_flow():
    aws_credentials = AwsCredentials.load("default")
    ec2 = aws_credentials.get_boto3_client("ec2")
    # Rest of the codepython
from prefect import flow
from contextlib import contextmanager
@contextmanager
def get_ec2_client():
    import boto3
    yield boto3.client('ec2')
@flow
def your_ray_flow():
    with get_ec2_client() as ec2:
        response = ec2.describe_instances(...)