Ritesh
06/29/2025, 4:55 PMMarvin
06/29/2025, 4:55 PMMarvin
06/29/2025, 4:56 PMRitesh
06/29/2025, 4:59 PMMarvin
06/29/2025, 4:59 PMMarvin
06/29/2025, 5:01 PMbash
   prefect worker lsbash
   prefect worker ls --localbash
   prefect work-pool create "machine-a-pool" --type processbash
   prefect worker start -p "machine-a-pool"bash
   prefect worker start -p "machine-b-pool"Ritesh
06/29/2025, 5:04 PMMarvin
06/29/2025, 5:04 PMMarvin
06/29/2025, 5:05 PMpython
import paramiko
def start_remote_worker(
    hostname,
    username,
    key_path,
    work_pool_name,
    prefect_api_url
):
    # Initialize SSH client
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # Connect to remote machine
        ssh.connect(
            hostname=hostname,
            username=username,
            key_filename=key_path
        )
        
        # Command to start the worker
        # Note: Using nohup to keep the worker running after SSH disconnects
        command = f"""
            export PREFECT_API_URL={prefect_api_url}
            nohup prefect worker start -p "{work_pool_name}" > worker.log 2>&1 &
        """
        
        # Execute the command
        stdin, stdout, stderr = ssh.exec_command(command)
        
        # Wait for command to complete
        stdout.channel.recv_exit_status()
        
    finally:
        ssh.close()
# Example usage
start_remote_worker(
    hostname="machine-b-ip",
    username="ec2-user",  # or your EC2 username
    key_path="/path/to/your/key.pem",
    work_pool_name="machine-b-pool",
    prefect_api_url="<http://your-prefect-server:4200/api>"
)python
import boto3
def start_remote_worker_ssm(
    instance_id,
    work_pool_name,
    prefect_api_url
):
    ssm = boto3.client('ssm')
    
    command = f"""
        export PREFECT_API_URL={prefect_api_url}
        nohup prefect worker start -p "{work_pool_name}" > worker.log 2>&1 &
    """
    
    response = ssm.send_command(
        InstanceIds=[instance_id],
        DocumentName="AWS-RunShellScript",
        Parameters={'commands': [command]}
    )
    
    return response['Command']['CommandId']
# Example usage
start_remote_worker_ssm(
    instance_id="i-1234567890abcdef0",  # Machine B's instance ID
    work_pool_name="machine-b-pool",
    prefect_api_url="<http://your-prefect-server:4200/api>"
)PREFECT_API_URLworker.logRitesh
06/30/2025, 10:05 AMMarvin
06/30/2025, 10:05 AMMarvin
06/30/2025, 10:06 AMpython
   from prefect import flow
   
   @flow
   def my_flow():
       print("This will show up in the dashboard")  # Automatically capturedpython
   import logging
   from prefect import flow
   
   logger = logging.getLogger("my_logger")
   
   @flow
   def my_flow():
       <http://logger.info|logger.info>("This also appears in the dashboard")python
   from prefect import flow, task
   
   @task(log_prints=True)
   def my_task():
       print("This print will be captured")
   
   @flow(log_prints=True)
   def my_flow():
       print("Flow-level prints are captured too")python
   from prefect.logging import get_logger
   
   logger = get_logger()
   
   @flow
   def my_flow():
       <http://logger.info|logger.info>("Custom logger message")
       logger.warning("Warning message")
       logger.error("Error message")python
   from prefect.logging import bypass_logging
   
   @flow
   def my_flow():
       # Normal logging
       print("This is logged")
       
       with bypass_logging():
           print("This won't be logged")python
   from prefect import flow
   from prefect.logging import get_logger
   
   logger = get_logger()
   
   @flow
   def my_flow():
       <http://logger.info|logger.info>(
           "Processing data",
           extra={
               "count": 100,
               "status": "running",
               "metadata": {"source": "database"}
           }
       )bash
   export PREFECT_LOGGING_LEVEL="DEBUG"  # Set logging level
   export PREFECT_LOGGING_FORMAT="json"  # Use JSON format