Ritesh
06/29/2025, 4:55 PMMarvin
06/29/2025, 4:55 PMMarvin
06/29/2025, 4:56 PMRitesh
06/29/2025, 4:59 PMMarvin
06/29/2025, 4:59 PMMarvin
06/29/2025, 5:01 PMbash
prefect worker ls
This command shows all active workers and their associated work pools, including the machine they're running on.
3. On Each Machine:
- When you start a worker, you can see which work pool it's connected to in the logs
- Run this command to see active workers on the current machine:
bash
prefect worker ls --local
To ensure clear mapping between machines and work pools:
1. Use Descriptive Work Pool Names:
- Name your work pools to reflect their machine assignments (e.g., "machine-a-pool", "machine-b-pool")
- When creating a work pool:
bash
prefect work-pool create "machine-a-pool" --type process
2. Start Workers with Specific Work Pools:
- On Machine A:
bash
prefect worker start -p "machine-a-pool"
- On Machine B:
bash
prefect worker start -p "machine-b-pool"
This way, you can maintain a clear mapping of which workers are running on which machines and which work pools they're servicing.
Remember that a single work pool can have multiple workers from different machines connected to it, so if you need strict machine-specific execution, use separate work pools for each machine.Ritesh
06/29/2025, 5:04 PMMarvin
06/29/2025, 5:04 PMMarvin
06/29/2025, 5:05 PMpython
import paramiko
def start_remote_worker(
hostname,
username,
key_path,
work_pool_name,
prefect_api_url
):
# Initialize SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Connect to remote machine
ssh.connect(
hostname=hostname,
username=username,
key_filename=key_path
)
# Command to start the worker
# Note: Using nohup to keep the worker running after SSH disconnects
command = f"""
export PREFECT_API_URL={prefect_api_url}
nohup prefect worker start -p "{work_pool_name}" > worker.log 2>&1 &
"""
# Execute the command
stdin, stdout, stderr = ssh.exec_command(command)
# Wait for command to complete
stdout.channel.recv_exit_status()
finally:
ssh.close()
# Example usage
start_remote_worker(
hostname="machine-b-ip",
username="ec2-user", # or your EC2 username
key_path="/path/to/your/key.pem",
work_pool_name="machine-b-pool",
prefect_api_url="<http://your-prefect-server:4200/api>"
)
2. Using AWS Systems Manager (AWS-specific solution):
python
import boto3
def start_remote_worker_ssm(
instance_id,
work_pool_name,
prefect_api_url
):
ssm = boto3.client('ssm')
command = f"""
export PREFECT_API_URL={prefect_api_url}
nohup prefect worker start -p "{work_pool_name}" > worker.log 2>&1 &
"""
response = ssm.send_command(
InstanceIds=[instance_id],
DocumentName="AWS-RunShellScript",
Parameters={'commands': [command]}
)
return response['Command']['CommandId']
# Example usage
start_remote_worker_ssm(
instance_id="i-1234567890abcdef0", # Machine B's instance ID
work_pool_name="machine-b-pool",
prefect_api_url="<http://your-prefect-server:4200/api>"
)
Important considerations:
1. Security:
- Ensure proper security group rules allow communication between machines
- Use appropriate IAM roles if using AWS Systems Manager
- Keep SSH keys secure if using the SSH approach
2. Environment:
- The remote machine must have Prefect installed
- The PREFECT_API_URL
must be set correctly to point to your Prefect server
- Any required dependencies for your flows must be installed on Machine B
3. Monitoring:
- The worker logs will be in worker.log
on Machine B
- You can check worker status through the Prefect UI or CLI
- Consider implementing health checks to ensure the worker stays running
4. Best Practices:
- Consider using a process manager like Supervisor or systemd on Machine B
- Implement error handling and retry logic
- Monitor worker health and implement auto-restart if needed
Choose the approach that best fits your security requirements and infrastructure setup. The AWS Systems Manager approach is recommended if you're fully in the AWS ecosystem as it's more secure and manageable than direct SSH access.Ritesh
06/30/2025, 10:05 AMMarvin
06/30/2025, 10:05 AMMarvin
06/30/2025, 10:06 AMpython
from prefect import flow
@flow
def my_flow():
print("This will show up in the dashboard") # Automatically captured
2. Standard Python Logging:
python
import logging
from prefect import flow
logger = logging.getLogger("my_logger")
@flow
def my_flow():
<http://logger.info|logger.info>("This also appears in the dashboard")
3. Log Prints Decorator:
python
from prefect import flow, task
@task(log_prints=True)
def my_task():
print("This print will be captured")
@flow(log_prints=True)
def my_flow():
print("Flow-level prints are captured too")
4. Custom Log Handler:
python
from prefect.logging import get_logger
logger = get_logger()
@flow
def my_flow():
<http://logger.info|logger.info>("Custom logger message")
logger.warning("Warning message")
logger.error("Error message")
5. Context Manager for Temporary Logging:
python
from prefect.logging import bypass_logging
@flow
def my_flow():
# Normal logging
print("This is logged")
with bypass_logging():
print("This won't be logged")
6. Structured Logging with Extra Fields:
python
from prefect import flow
from prefect.logging import get_logger
logger = get_logger()
@flow
def my_flow():
<http://logger.info|logger.info>(
"Processing data",
extra={
"count": 100,
"status": "running",
"metadata": {"source": "database"}
}
)
Key features of these logging methods:
1. Automatic Capture:
- All stdout/stderr from flows and tasks is automatically captured
- Print statements work without additional configuration
- Exceptions and tracebacks are automatically logged
2. Log Levels:
- DEBUG
- INFO
- WARNING
- ERROR
- CRITICAL
3. Benefits:
- Real-time visibility in the UI
- Structured logging support
- Automatic timestamp and context information
- Log persistence for historical review
4. Configuration Options:
You can configure logging behavior through environment variables:
bash
export PREFECT_LOGGING_LEVEL="DEBUG" # Set logging level
export PREFECT_LOGGING_FORMAT="json" # Use JSON format
Remember that all these logs are automatically captured and displayed in the Prefect UI, making it easy to debug and monitor your flows. The UI will show logs in chronological order and allow you to filter by level and search through log content.