#!/usr/bin/env python3 """ Key Management Script for Emergency Access Server Manages key files, tests access, and provides key operations """ import os import sys import json import yaml import argparse import requests import base64 import secrets import string from pathlib import Path from typing import Dict, Any, Optional, List import subprocess import shutil def load_config(config_path: str) -> Dict[str, Any]: """Load configuration from file""" try: with open(config_path, 'r') as f: if config_path.endswith(('.yaml', '.yml')): return yaml.safe_load(f) else: return json.load(f) except FileNotFoundError: raise Exception(f"Configuration file {config_path} not found") except Exception as e: raise Exception(f"Failed to load configuration: {str(e)}") def get_server_url(config: Dict[str, Any]) -> str: """Get server URL from configuration""" host = config.get('server', {}).get('host', '127.0.0.1') port = config.get('server', {}).get('port', 1127) return f"http://{host}:{port}" def test_key_access(config: Dict[str, Any], key_id: str, password: str) -> Dict[str, Any]: """Test access to a specific key""" keys = config.get('keys', {}) if key_id not in keys: return {'success': False, 'error': f"Key '{key_id}' not found in configuration"} key_config = keys[key_id] server_url = get_server_url(config) url = f"{server_url}{key_config['route']}" username = key_config['username'] try: # Create basic auth header auth_string = base64.b64encode(f"{username}:{password}".encode()).decode() headers = { 'Authorization': f'Basic {auth_string}', 'User-Agent': 'emergency-access-test/1.0' } # Make request with timeout response = requests.get(url, headers=headers, timeout=10) return { 'success': response.status_code == 200, 'status_code': response.status_code, 'response_text': response.text[:500], # Limit response size 'url': url, 'username': username } except requests.exceptions.RequestException as e: return { 'success': False, 'error': f"Request failed: {str(e)}", 'url': url, 'username': username } def test_health_endpoint(config: Dict[str, Any], password: str) -> Dict[str, Any]: """Test health endpoint access""" server_url = get_server_url(config) health_route = config.get('routes', {}).get('health_route', '/health-check') username = config.get('routes', {}).get('health_username', 'health_monitor') url = f"{server_url}{health_route}" try: auth_string = base64.b64encode(f"{username}:{password}".encode()).decode() headers = { 'Authorization': f'Basic {auth_string}', 'User-Agent': 'emergency-access-test/1.0' } response = requests.get(url, headers=headers, timeout=10) return { 'success': response.status_code == 200, 'status_code': response.status_code, 'response_text': response.text[:500], 'url': url, 'username': username } except requests.exceptions.RequestException as e: return { 'success': False, 'error': f"Request failed: {str(e)}", 'url': url, 'username': username } def generate_key_content(key_type: str = 'ssh', bits: int = 4096) -> str: """Generate key content based on type""" if key_type == 'ssh': # Generate SSH key using ssh-keygen temp_key = f"/tmp/emergency_key_{secrets.token_hex(8)}" try: result = subprocess.run([ 'ssh-keygen', '-t', 'rsa', '-b', str(bits), '-f', temp_key, '-N', '', '-C', 'emergency-access-key' ], capture_output=True, text=True) if result.returncode != 0: return f"# SSH Key Generation Failed\n# Error: {result.stderr}\n" # Read private key with open(temp_key, 'r') as f: private_key = f.read() # Clean up os.unlink(temp_key) if os.path.exists(f"{temp_key}.pub"): os.unlink(f"{temp_key}.pub") return private_key except FileNotFoundError: return "# SSH key generation requires ssh-keygen to be installed\n# Please install OpenSSH client tools\n" except Exception as e: return f"# SSH key generation failed: {str(e)}\n" elif key_type == 'gpg': return """# GPG Private Key Placeholder # Replace this with your actual GPG private key # Export with: gpg --export-secret-keys --armor KEY_ID """ elif key_type == 'api': # Generate a secure API key api_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(64)) return f"API_KEY={api_key}\n" elif key_type == 'password': # Generate a secure password password = ''.join(secrets.choice(string.ascii_letters + string.digits + "!@#$%^&*") for _ in range(32)) return f"PASSWORD={password}\n" else: return f"# {key_type.upper()} Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace with actual {key_type} content\n" def create_key_file(file_path: str, content: str, permissions: int = 0o600) -> None: """Create key file with content and secure permissions""" path = Path(file_path) # Create parent directory if needed path.parent.mkdir(parents=True, exist_ok=True) # Write content with open(path, 'w') as f: f.write(content) # Set secure permissions os.chmod(path, permissions) print(f"๐Ÿ”‘ Created key file: {file_path} (permissions: {oct(permissions)})") def backup_key_file(file_path: str) -> Optional[str]: """Create backup of key file""" if not os.path.exists(file_path): return None backup_path = f"{file_path}.backup.{secrets.token_hex(4)}" shutil.copy2(file_path, backup_path) print(f"๐Ÿ“ Created backup: {backup_path}") return backup_path def list_key_files(config: Dict[str, Any]) -> None: """List all key files and their status""" keys = config.get('keys', {}) if not keys: print("No keys configured") return print("\n" + "=" * 80) print("KEY FILE STATUS") print("=" * 80) for key_id, key_config in keys.items(): file_path = key_config.get('file', '') print(f"\n๐Ÿ“‹ Key: {key_id}") print(f" File: {file_path}") if not file_path: print(" Status: โŒ No file path configured") continue path = Path(file_path) if not path.exists(): print(" Status: โŒ File does not exist") else: # Get file info stat = path.stat() size = stat.st_size permissions = oct(stat.st_mode)[-3:] print(f" Status: โœ… Exists") print(f" Size: {size} bytes") print(f" Permissions: {permissions}") # Check if permissions are secure if stat.st_mode & 0o077: # Check if group/other have any permissions print(" Security: โš ๏ธ File permissions may be too permissive") else: print(" Security: โœ… Secure permissions") # Check if readable try: with open(path, 'r') as f: first_line = f.readline().strip() if first_line: print(f" Preview: {first_line[:50]}...") else: print(" Preview: Empty file") except Exception as e: print(f" Preview: โŒ Cannot read ({str(e)})") def rotate_key(config: Dict[str, Any], key_id: str, key_type: str = 'ssh', backup: bool = True) -> None: """Rotate a key by generating new content""" keys = config.get('keys', {}) if key_id not in keys: print(f"โŒ Key '{key_id}' not found") return key_config = keys[key_id] file_path = key_config.get('file', '') if not file_path: print(f"โŒ No file path configured for key '{key_id}'") return # Create backup if requested and file exists backup_path = None if backup and os.path.exists(file_path): backup_path = backup_key_file(file_path) try: # Generate new key content print(f"๐Ÿ”„ Generating new {key_type} key...") new_content = generate_key_content(key_type) # Write new content create_key_file(file_path, new_content) print(f"โœ… Rotated key '{key_id}'") if backup_path: print(f" Backup saved: {backup_path}") except Exception as e: print(f"โŒ Failed to rotate key: {str(e)}") # Restore backup if available if backup_path and os.path.exists(backup_path): shutil.copy2(backup_path, file_path) print(f"๐Ÿ”„ Restored from backup") def validate_key_files(config: Dict[str, Any]) -> bool: """Validate all key files exist and are accessible""" keys = config.get('keys', {}) all_valid = True print("\n" + "=" * 60) print("KEY FILE VALIDATION") print("=" * 60) for key_id, key_config in keys.items(): file_path = key_config.get('file', '') print(f"\n๐Ÿ” Validating {key_id}...") if not file_path: print(f" โŒ No file path configured") all_valid = False continue path = Path(file_path) # Check existence if not path.exists(): print(f" โŒ File does not exist: {file_path}") all_valid = False continue # Check readability try: with open(path, 'r') as f: content = f.read() if not content.strip(): print(f" โš ๏ธ File is empty") else: print(f" โœ… File exists and readable ({len(content)} bytes)") except Exception as e: print(f" โŒ Cannot read file: {str(e)}") all_valid = False continue # Check permissions stat = path.stat() if stat.st_mode & 0o077: print(f" โš ๏ธ Permissions may be too permissive: {oct(stat.st_mode)[-3:]}") else: print(f" โœ… Secure permissions: {oct(stat.st_mode)[-3:]}") if all_valid: print(f"\nโœ… All key files validated successfully") else: print(f"\nโŒ Some key files have issues") return all_valid def main(): parser = argparse.ArgumentParser( description="Manage emergency access key files and test access", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # List all key files and their status python manage_keys.py --list-files # Test access to a specific key python manage_keys.py --test-key backup --password "your_password" # Test health endpoint python manage_keys.py --test-health --password "health_password" # Create a new SSH key file python manage_keys.py --create-key-file /path/to/key.txt --key-type ssh # Rotate a key (generate new content) python manage_keys.py --rotate-key backup --key-type ssh --backup # Validate all key files python manage_keys.py --validate # Generate key content only (print to stdout) python manage_keys.py --generate-content ssh """ ) parser.add_argument( '--config', '-c', default='config.json', help='Configuration file path (default: config.json)' ) # File operations parser.add_argument( '--list-files', '-l', action='store_true', help='List all key files and their status' ) parser.add_argument( '--validate', '-v', action='store_true', help='Validate all key files' ) parser.add_argument( '--create-key-file', help='Create a key file at the specified path' ) parser.add_argument( '--key-type', choices=['ssh', 'gpg', 'api', 'password', 'generic'], default='ssh', help='Type of key to generate (default: ssh)' ) parser.add_argument( '--rotate-key', help='Rotate (regenerate) a key by key ID' ) parser.add_argument( '--backup', action='store_true', default=True, help='Create backup before rotation (default: true)' ) parser.add_argument( '--no-backup', action='store_true', help='Skip backup creation' ) parser.add_argument( '--generate-content', help='Generate key content and print to stdout' ) # Testing operations parser.add_argument( '--test-key', help='Test access to a specific key by key ID' ) parser.add_argument( '--test-health', action='store_true', help='Test health endpoint access' ) parser.add_argument( '--password', '-p', help='Password for testing (will prompt if not provided)' ) parser.add_argument( '--test-all', action='store_true', help='Test access to all configured keys' ) args = parser.parse_args() # Handle backup flag if args.no_backup: args.backup = False # Handle generate content if args.generate_content: content = generate_key_content(args.generate_content) print(content) return # Load configuration for other operations try: config = load_config(args.config) except Exception as e: print(f"โŒ {e}") sys.exit(1) try: if args.list_files: list_key_files(config) elif args.validate: success = validate_key_files(config) sys.exit(0 if success else 1) elif args.create_key_file: content = generate_key_content(args.key_type) create_key_file(args.create_key_file, content) elif args.rotate_key: rotate_key(config, args.rotate_key, args.key_type, args.backup) elif args.test_key: password = args.password if not password: import getpass password = getpass.getpass(f"Enter password for key '{args.test_key}': ") result = test_key_access(config, args.test_key, password) if result['success']: print(f"โœ… Key '{args.test_key}' access successful") print(f" URL: {result['url']}") print(f" Username: {result['username']}") else: print(f"โŒ Key '{args.test_key}' access failed") print(f" URL: {result.get('url', 'N/A')}") print(f" Error: {result.get('error', 'Unknown error')}") if 'status_code' in result: print(f" Status: {result['status_code']}") elif args.test_health: password = args.password if not password: import getpass password = getpass.getpass("Enter health endpoint password: ") result = test_health_endpoint(config, password) if result['success']: print(f"โœ… Health endpoint access successful") else: print(f"โŒ Health endpoint access failed") print(f" Error: {result.get('error', 'Unknown error')}") elif args.test_all: keys = config.get('keys', {}) if not keys: print("No keys configured to test") return password = args.password if not password: import getpass password = getpass.getpass("Enter password for key testing: ") print(f"\n๐Ÿงช Testing access to {len(keys)} keys...") for key_id in keys: result = test_key_access(config, key_id, password) status = "โœ…" if result['success'] else "โŒ" print(f" {status} {key_id}") else: parser.print_help() except Exception as e: print(f"โŒ {e}") sys.exit(1) if __name__ == '__main__': main()