|
@@ -0,0 +1,544 @@
|
|
|
|
|
+#!/usr/bin/env python3
|
|
|
|
|
+
|
|
|
|
|
+"""
|
|
|
|
|
+Key Management Script for Emergency Access Server
|
|
|
|
|
+Manages key files, tests access, and provides key operations
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import os
|
|
|
|
|
+import sys
|
|
|
|
|
+import json
|
|
|
|
|
+import yaml
|
|
|
|
|
+import argparse
|
|
|
|
|
+import requests
|
|
|
|
|
+import base64
|
|
|
|
|
+import secrets
|
|
|
|
|
+import string
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+from typing import Dict, Any, Optional, List
|
|
|
|
|
+import subprocess
|
|
|
|
|
+import shutil
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def load_config(config_path: str) -> Dict[str, Any]:
|
|
|
|
|
+ """Load configuration from file"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ with open(config_path, 'r') as f:
|
|
|
|
|
+ if config_path.endswith(('.yaml', '.yml')):
|
|
|
|
|
+ return yaml.safe_load(f)
|
|
|
|
|
+ else:
|
|
|
|
|
+ return json.load(f)
|
|
|
|
|
+ except FileNotFoundError:
|
|
|
|
|
+ raise Exception(f"Configuration file {config_path} not found")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ raise Exception(f"Failed to load configuration: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def get_server_url(config: Dict[str, Any]) -> str:
|
|
|
|
|
+ """Get server URL from configuration"""
|
|
|
|
|
+ host = config.get('server', {}).get('host', '127.0.0.1')
|
|
|
|
|
+ port = config.get('server', {}).get('port', 1127)
|
|
|
|
|
+ return f"http://{host}:{port}"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def test_key_access(config: Dict[str, Any], key_id: str, password: str) -> Dict[str, Any]:
|
|
|
|
|
+ """Test access to a specific key"""
|
|
|
|
|
+ keys = config.get('keys', {})
|
|
|
|
|
+
|
|
|
|
|
+ if key_id not in keys:
|
|
|
|
|
+ return {'success': False, 'error': f"Key '{key_id}' not found in configuration"}
|
|
|
|
|
+
|
|
|
|
|
+ key_config = keys[key_id]
|
|
|
|
|
+ server_url = get_server_url(config)
|
|
|
|
|
+ url = f"{server_url}{key_config['route']}"
|
|
|
|
|
+ username = key_config['username']
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Create basic auth header
|
|
|
|
|
+ auth_string = base64.b64encode(f"{username}:{password}".encode()).decode()
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ 'Authorization': f'Basic {auth_string}',
|
|
|
|
|
+ 'User-Agent': 'emergency-access-test/1.0'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Make request with timeout
|
|
|
|
|
+ response = requests.get(url, headers=headers, timeout=10)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': response.status_code == 200,
|
|
|
|
|
+ 'status_code': response.status_code,
|
|
|
|
|
+ 'response_text': response.text[:500], # Limit response size
|
|
|
|
|
+ 'url': url,
|
|
|
|
|
+ 'username': username
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': f"Request failed: {str(e)}",
|
|
|
|
|
+ 'url': url,
|
|
|
|
|
+ 'username': username
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def test_health_endpoint(config: Dict[str, Any], password: str) -> Dict[str, Any]:
|
|
|
|
|
+ """Test health endpoint access"""
|
|
|
|
|
+ server_url = get_server_url(config)
|
|
|
|
|
+ health_route = config.get('routes', {}).get('health_route', '/health-check')
|
|
|
|
|
+ username = config.get('routes', {}).get('health_username', 'health_monitor')
|
|
|
|
|
+ url = f"{server_url}{health_route}"
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ auth_string = base64.b64encode(f"{username}:{password}".encode()).decode()
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ 'Authorization': f'Basic {auth_string}',
|
|
|
|
|
+ 'User-Agent': 'emergency-access-test/1.0'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ response = requests.get(url, headers=headers, timeout=10)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': response.status_code == 200,
|
|
|
|
|
+ 'status_code': response.status_code,
|
|
|
|
|
+ 'response_text': response.text[:500],
|
|
|
|
|
+ 'url': url,
|
|
|
|
|
+ 'username': username
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': f"Request failed: {str(e)}",
|
|
|
|
|
+ 'url': url,
|
|
|
|
|
+ 'username': username
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def generate_key_content(key_type: str = 'ssh', bits: int = 4096) -> str:
|
|
|
|
|
+ """Generate key content based on type"""
|
|
|
|
|
+ if key_type == 'ssh':
|
|
|
|
|
+ # Generate SSH key using ssh-keygen
|
|
|
|
|
+ temp_key = f"/tmp/emergency_key_{secrets.token_hex(8)}"
|
|
|
|
|
+ try:
|
|
|
|
|
+ result = subprocess.run([
|
|
|
|
|
+ 'ssh-keygen', '-t', 'rsa', '-b', str(bits),
|
|
|
|
|
+ '-f', temp_key, '-N', '', '-C', 'emergency-access-key'
|
|
|
|
|
+ ], capture_output=True, text=True)
|
|
|
|
|
+
|
|
|
|
|
+ if result.returncode != 0:
|
|
|
|
|
+ return f"# SSH Key Generation Failed\n# Error: {result.stderr}\n"
|
|
|
|
|
+
|
|
|
|
|
+ # Read private key
|
|
|
|
|
+ with open(temp_key, 'r') as f:
|
|
|
|
|
+ private_key = f.read()
|
|
|
|
|
+
|
|
|
|
|
+ # Clean up
|
|
|
|
|
+ os.unlink(temp_key)
|
|
|
|
|
+ if os.path.exists(f"{temp_key}.pub"):
|
|
|
|
|
+ os.unlink(f"{temp_key}.pub")
|
|
|
|
|
+
|
|
|
|
|
+ return private_key
|
|
|
|
|
+
|
|
|
|
|
+ except FileNotFoundError:
|
|
|
|
|
+ return "# SSH key generation requires ssh-keygen to be installed\n# Please install OpenSSH client tools\n"
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return f"# SSH key generation failed: {str(e)}\n"
|
|
|
|
|
+
|
|
|
|
|
+ elif key_type == 'gpg':
|
|
|
|
|
+ return """# GPG Private Key Placeholder
|
|
|
|
|
+# Replace this with your actual GPG private key
|
|
|
|
|
+# Export with: gpg --export-secret-keys --armor KEY_ID
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+ elif key_type == 'api':
|
|
|
|
|
+ # Generate a secure API key
|
|
|
|
|
+ api_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(64))
|
|
|
|
|
+ return f"API_KEY={api_key}\n"
|
|
|
|
|
+
|
|
|
|
|
+ elif key_type == 'password':
|
|
|
|
|
+ # Generate a secure password
|
|
|
|
|
+ password = ''.join(secrets.choice(string.ascii_letters + string.digits + "!@#$%^&*") for _ in range(32))
|
|
|
|
|
+ return f"PASSWORD={password}\n"
|
|
|
|
|
+
|
|
|
|
|
+ else:
|
|
|
|
|
+ return f"# {key_type.upper()} Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace with actual {key_type} content\n"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def create_key_file(file_path: str, content: str, permissions: int = 0o600) -> None:
|
|
|
|
|
+ """Create key file with content and secure permissions"""
|
|
|
|
|
+ path = Path(file_path)
|
|
|
|
|
+
|
|
|
|
|
+ # Create parent directory if needed
|
|
|
|
|
+ path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ # Write content
|
|
|
|
|
+ with open(path, 'w') as f:
|
|
|
|
|
+ f.write(content)
|
|
|
|
|
+
|
|
|
|
|
+ # Set secure permissions
|
|
|
|
|
+ os.chmod(path, permissions)
|
|
|
|
|
+ print(f"🔑 Created key file: {file_path} (permissions: {oct(permissions)})")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def backup_key_file(file_path: str) -> Optional[str]:
|
|
|
|
|
+ """Create backup of key file"""
|
|
|
|
|
+ if not os.path.exists(file_path):
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ backup_path = f"{file_path}.backup.{secrets.token_hex(4)}"
|
|
|
|
|
+ shutil.copy2(file_path, backup_path)
|
|
|
|
|
+ print(f"📁 Created backup: {backup_path}")
|
|
|
|
|
+ return backup_path
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def list_key_files(config: Dict[str, Any]) -> None:
|
|
|
|
|
+ """List all key files and their status"""
|
|
|
|
|
+ keys = config.get('keys', {})
|
|
|
|
|
+
|
|
|
|
|
+ if not keys:
|
|
|
|
|
+ print("No keys configured")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ print("\n" + "=" * 80)
|
|
|
|
|
+ print("KEY FILE STATUS")
|
|
|
|
|
+ print("=" * 80)
|
|
|
|
|
+
|
|
|
|
|
+ for key_id, key_config in keys.items():
|
|
|
|
|
+ file_path = key_config.get('file', '')
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\n📋 Key: {key_id}")
|
|
|
|
|
+ print(f" File: {file_path}")
|
|
|
|
|
+
|
|
|
|
|
+ if not file_path:
|
|
|
|
|
+ print(" Status: ❌ No file path configured")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ path = Path(file_path)
|
|
|
|
|
+
|
|
|
|
|
+ if not path.exists():
|
|
|
|
|
+ print(" Status: ❌ File does not exist")
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Get file info
|
|
|
|
|
+ stat = path.stat()
|
|
|
|
|
+ size = stat.st_size
|
|
|
|
|
+ permissions = oct(stat.st_mode)[-3:]
|
|
|
|
|
+
|
|
|
|
|
+ print(f" Status: ✅ Exists")
|
|
|
|
|
+ print(f" Size: {size} bytes")
|
|
|
|
|
+ print(f" Permissions: {permissions}")
|
|
|
|
|
+
|
|
|
|
|
+ # Check if permissions are secure
|
|
|
|
|
+ if stat.st_mode & 0o077: # Check if group/other have any permissions
|
|
|
|
|
+ print(" Security: ⚠️ File permissions may be too permissive")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(" Security: ✅ Secure permissions")
|
|
|
|
|
+
|
|
|
|
|
+ # Check if readable
|
|
|
|
|
+ try:
|
|
|
|
|
+ with open(path, 'r') as f:
|
|
|
|
|
+ first_line = f.readline().strip()
|
|
|
|
|
+ if first_line:
|
|
|
|
|
+ print(f" Preview: {first_line[:50]}...")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(" Preview: Empty file")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" Preview: ❌ Cannot read ({str(e)})")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def rotate_key(config: Dict[str, Any], key_id: str, key_type: str = 'ssh', backup: bool = True) -> None:
|
|
|
|
|
+ """Rotate a key by generating new content"""
|
|
|
|
|
+ keys = config.get('keys', {})
|
|
|
|
|
+
|
|
|
|
|
+ if key_id not in keys:
|
|
|
|
|
+ print(f"❌ Key '{key_id}' not found")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ key_config = keys[key_id]
|
|
|
|
|
+ file_path = key_config.get('file', '')
|
|
|
|
|
+
|
|
|
|
|
+ if not file_path:
|
|
|
|
|
+ print(f"❌ No file path configured for key '{key_id}'")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # Create backup if requested and file exists
|
|
|
|
|
+ backup_path = None
|
|
|
|
|
+ if backup and os.path.exists(file_path):
|
|
|
|
|
+ backup_path = backup_key_file(file_path)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Generate new key content
|
|
|
|
|
+ print(f"🔄 Generating new {key_type} key...")
|
|
|
|
|
+ new_content = generate_key_content(key_type)
|
|
|
|
|
+
|
|
|
|
|
+ # Write new content
|
|
|
|
|
+ create_key_file(file_path, new_content)
|
|
|
|
|
+
|
|
|
|
|
+ print(f"✅ Rotated key '{key_id}'")
|
|
|
|
|
+ if backup_path:
|
|
|
|
|
+ print(f" Backup saved: {backup_path}")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"❌ Failed to rotate key: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ # Restore backup if available
|
|
|
|
|
+ if backup_path and os.path.exists(backup_path):
|
|
|
|
|
+ shutil.copy2(backup_path, file_path)
|
|
|
|
|
+ print(f"🔄 Restored from backup")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def validate_key_files(config: Dict[str, Any]) -> bool:
|
|
|
|
|
+ """Validate all key files exist and are accessible"""
|
|
|
|
|
+ keys = config.get('keys', {})
|
|
|
|
|
+ all_valid = True
|
|
|
|
|
+
|
|
|
|
|
+ print("\n" + "=" * 60)
|
|
|
|
|
+ print("KEY FILE VALIDATION")
|
|
|
|
|
+ print("=" * 60)
|
|
|
|
|
+
|
|
|
|
|
+ for key_id, key_config in keys.items():
|
|
|
|
|
+ file_path = key_config.get('file', '')
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\n🔍 Validating {key_id}...")
|
|
|
|
|
+
|
|
|
|
|
+ if not file_path:
|
|
|
|
|
+ print(f" ❌ No file path configured")
|
|
|
|
|
+ all_valid = False
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ path = Path(file_path)
|
|
|
|
|
+
|
|
|
|
|
+ # Check existence
|
|
|
|
|
+ if not path.exists():
|
|
|
|
|
+ print(f" ❌ File does not exist: {file_path}")
|
|
|
|
|
+ all_valid = False
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Check readability
|
|
|
|
|
+ try:
|
|
|
|
|
+ with open(path, 'r') as f:
|
|
|
|
|
+ content = f.read()
|
|
|
|
|
+ if not content.strip():
|
|
|
|
|
+ print(f" ⚠️ File is empty")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f" ✅ File exists and readable ({len(content)} bytes)")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" ❌ Cannot read file: {str(e)}")
|
|
|
|
|
+ all_valid = False
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Check permissions
|
|
|
|
|
+ stat = path.stat()
|
|
|
|
|
+ if stat.st_mode & 0o077:
|
|
|
|
|
+ print(f" ⚠️ Permissions may be too permissive: {oct(stat.st_mode)[-3:]}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f" ✅ Secure permissions: {oct(stat.st_mode)[-3:]}")
|
|
|
|
|
+
|
|
|
|
|
+ if all_valid:
|
|
|
|
|
+ print(f"\n✅ All key files validated successfully")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"\n❌ Some key files have issues")
|
|
|
|
|
+
|
|
|
|
|
+ return all_valid
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def main():
|
|
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
|
|
+ description="Manage emergency access key files and test access",
|
|
|
|
|
+ formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
|
|
|
+ epilog="""
|
|
|
|
|
+Examples:
|
|
|
|
|
+ # List all key files and their status
|
|
|
|
|
+ python manage_keys.py --list-files
|
|
|
|
|
+
|
|
|
|
|
+ # Test access to a specific key
|
|
|
|
|
+ python manage_keys.py --test-key backup --password "your_password"
|
|
|
|
|
+
|
|
|
|
|
+ # Test health endpoint
|
|
|
|
|
+ python manage_keys.py --test-health --password "health_password"
|
|
|
|
|
+
|
|
|
|
|
+ # Create a new SSH key file
|
|
|
|
|
+ python manage_keys.py --create-key-file /path/to/key.txt --key-type ssh
|
|
|
|
|
+
|
|
|
|
|
+ # Rotate a key (generate new content)
|
|
|
|
|
+ python manage_keys.py --rotate-key backup --key-type ssh --backup
|
|
|
|
|
+
|
|
|
|
|
+ # Validate all key files
|
|
|
|
|
+ python manage_keys.py --validate
|
|
|
|
|
+
|
|
|
|
|
+ # Generate key content only (print to stdout)
|
|
|
|
|
+ python manage_keys.py --generate-content ssh
|
|
|
|
|
+ """
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--config', '-c',
|
|
|
|
|
+ default='config.json',
|
|
|
|
|
+ help='Configuration file path (default: config.json)'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # File operations
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--list-files', '-l',
|
|
|
|
|
+ action='store_true',
|
|
|
|
|
+ help='List all key files and their status'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--validate', '-v',
|
|
|
|
|
+ action='store_true',
|
|
|
|
|
+ help='Validate all key files'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--create-key-file',
|
|
|
|
|
+ help='Create a key file at the specified path'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--key-type',
|
|
|
|
|
+ choices=['ssh', 'gpg', 'api', 'password', 'generic'],
|
|
|
|
|
+ default='ssh',
|
|
|
|
|
+ help='Type of key to generate (default: ssh)'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--rotate-key',
|
|
|
|
|
+ help='Rotate (regenerate) a key by key ID'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--backup',
|
|
|
|
|
+ action='store_true',
|
|
|
|
|
+ default=True,
|
|
|
|
|
+ help='Create backup before rotation (default: true)'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--no-backup',
|
|
|
|
|
+ action='store_true',
|
|
|
|
|
+ help='Skip backup creation'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--generate-content',
|
|
|
|
|
+ help='Generate key content and print to stdout'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Testing operations
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--test-key',
|
|
|
|
|
+ help='Test access to a specific key by key ID'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--test-health',
|
|
|
|
|
+ action='store_true',
|
|
|
|
|
+ help='Test health endpoint access'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--password', '-p',
|
|
|
|
|
+ help='Password for testing (will prompt if not provided)'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ '--test-all',
|
|
|
|
|
+ action='store_true',
|
|
|
|
|
+ help='Test access to all configured keys'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ args = parser.parse_args()
|
|
|
|
|
+
|
|
|
|
|
+ # Handle backup flag
|
|
|
|
|
+ if args.no_backup:
|
|
|
|
|
+ args.backup = False
|
|
|
|
|
+
|
|
|
|
|
+ # Handle generate content
|
|
|
|
|
+ if args.generate_content:
|
|
|
|
|
+ content = generate_key_content(args.generate_content)
|
|
|
|
|
+ print(content)
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # Load configuration for other operations
|
|
|
|
|
+ try:
|
|
|
|
|
+ config = load_config(args.config)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"❌ {e}")
|
|
|
|
|
+ sys.exit(1)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ if args.list_files:
|
|
|
|
|
+ list_key_files(config)
|
|
|
|
|
+
|
|
|
|
|
+ elif args.validate:
|
|
|
|
|
+ success = validate_key_files(config)
|
|
|
|
|
+ sys.exit(0 if success else 1)
|
|
|
|
|
+
|
|
|
|
|
+ elif args.create_key_file:
|
|
|
|
|
+ content = generate_key_content(args.key_type)
|
|
|
|
|
+ create_key_file(args.create_key_file, content)
|
|
|
|
|
+
|
|
|
|
|
+ elif args.rotate_key:
|
|
|
|
|
+ rotate_key(config, args.rotate_key, args.key_type, args.backup)
|
|
|
|
|
+
|
|
|
|
|
+ elif args.test_key:
|
|
|
|
|
+ password = args.password
|
|
|
|
|
+ if not password:
|
|
|
|
|
+ import getpass
|
|
|
|
|
+ password = getpass.getpass(f"Enter password for key '{args.test_key}': ")
|
|
|
|
|
+
|
|
|
|
|
+ result = test_key_access(config, args.test_key, password)
|
|
|
|
|
+
|
|
|
|
|
+ if result['success']:
|
|
|
|
|
+ print(f"✅ Key '{args.test_key}' access successful")
|
|
|
|
|
+ print(f" URL: {result['url']}")
|
|
|
|
|
+ print(f" Username: {result['username']}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"❌ Key '{args.test_key}' access failed")
|
|
|
|
|
+ print(f" URL: {result.get('url', 'N/A')}")
|
|
|
|
|
+ print(f" Error: {result.get('error', 'Unknown error')}")
|
|
|
|
|
+ if 'status_code' in result:
|
|
|
|
|
+ print(f" Status: {result['status_code']}")
|
|
|
|
|
+
|
|
|
|
|
+ elif args.test_health:
|
|
|
|
|
+ password = args.password
|
|
|
|
|
+ if not password:
|
|
|
|
|
+ import getpass
|
|
|
|
|
+ password = getpass.getpass("Enter health endpoint password: ")
|
|
|
|
|
+
|
|
|
|
|
+ result = test_health_endpoint(config, password)
|
|
|
|
|
+
|
|
|
|
|
+ if result['success']:
|
|
|
|
|
+ print(f"✅ Health endpoint access successful")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"❌ Health endpoint access failed")
|
|
|
|
|
+ print(f" Error: {result.get('error', 'Unknown error')}")
|
|
|
|
|
+
|
|
|
|
|
+ elif args.test_all:
|
|
|
|
|
+ keys = config.get('keys', {})
|
|
|
|
|
+ if not keys:
|
|
|
|
|
+ print("No keys configured to test")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ password = args.password
|
|
|
|
|
+ if not password:
|
|
|
|
|
+ import getpass
|
|
|
|
|
+ password = getpass.getpass("Enter password for key testing: ")
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\n🧪 Testing access to {len(keys)} keys...")
|
|
|
|
|
+
|
|
|
|
|
+ for key_id in keys:
|
|
|
|
|
+ result = test_key_access(config, key_id, password)
|
|
|
|
|
+ status = "✅" if result['success'] else "❌"
|
|
|
|
|
+ print(f" {status} {key_id}")
|
|
|
|
|
+
|
|
|
|
|
+ else:
|
|
|
|
|
+ parser.print_help()
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"❌ {e}")
|
|
|
|
|
+ sys.exit(1)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
|
+ main()
|