| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- #!/usr/bin/env python3
- """
- Key Management Script for Emergency Access Server
- Manages key files, tests access, and provides key operations
- """
- import os
- import sys
- import json
- import yaml
- import argparse
- import requests
- import base64
- import secrets
- import string
- from pathlib import Path
- from typing import Dict, Any, Optional, List
- import subprocess
- import shutil
- def load_config(config_path: str) -> Dict[str, Any]:
- """Load configuration from file"""
- try:
- with open(config_path, 'r') as f:
- if config_path.endswith(('.yaml', '.yml')):
- return yaml.safe_load(f)
- else:
- return json.load(f)
- except FileNotFoundError:
- raise Exception(f"Configuration file {config_path} not found")
- except Exception as e:
- raise Exception(f"Failed to load configuration: {str(e)}")
- def get_server_url(config: Dict[str, Any]) -> str:
- """Get server URL from configuration"""
- host = config.get('server', {}).get('host', '127.0.0.1')
- port = config.get('server', {}).get('port', 1127)
- return f"http://{host}:{port}"
- def test_key_access(config: Dict[str, Any], key_id: str, password: str) -> Dict[str, Any]:
- """Test access to a specific key"""
- keys = config.get('keys', {})
- if key_id not in keys:
- return {'success': False, 'error': f"Key '{key_id}' not found in configuration"}
- key_config = keys[key_id]
- server_url = get_server_url(config)
- url = f"{server_url}{key_config['route']}"
- username = key_config['username']
- try:
- # Create basic auth header
- auth_string = base64.b64encode(f"{username}:{password}".encode()).decode()
- headers = {
- 'Authorization': f'Basic {auth_string}',
- 'User-Agent': 'emergency-access-test/1.0'
- }
- # Make request with timeout
- response = requests.get(url, headers=headers, timeout=10)
- return {
- 'success': response.status_code == 200,
- 'status_code': response.status_code,
- 'response_text': response.text[:500], # Limit response size
- 'url': url,
- 'username': username
- }
- except requests.exceptions.RequestException as e:
- return {
- 'success': False,
- 'error': f"Request failed: {str(e)}",
- 'url': url,
- 'username': username
- }
- def test_health_endpoint(config: Dict[str, Any], password: str) -> Dict[str, Any]:
- """Test health endpoint access"""
- server_url = get_server_url(config)
- health_route = config.get('routes', {}).get('health_route', '/health-check')
- username = config.get('routes', {}).get('health_username', 'health_monitor')
- url = f"{server_url}{health_route}"
- try:
- auth_string = base64.b64encode(f"{username}:{password}".encode()).decode()
- headers = {
- 'Authorization': f'Basic {auth_string}',
- 'User-Agent': 'emergency-access-test/1.0'
- }
- response = requests.get(url, headers=headers, timeout=10)
- return {
- 'success': response.status_code == 200,
- 'status_code': response.status_code,
- 'response_text': response.text[:500],
- 'url': url,
- 'username': username
- }
- except requests.exceptions.RequestException as e:
- return {
- 'success': False,
- 'error': f"Request failed: {str(e)}",
- 'url': url,
- 'username': username
- }
- def generate_key_content(key_type: str = 'ssh', bits: int = 4096) -> str:
- """Generate key content based on type"""
- if key_type == 'ssh':
- # Generate SSH key using ssh-keygen
- temp_key = f"/tmp/emergency_key_{secrets.token_hex(8)}"
- try:
- result = subprocess.run([
- 'ssh-keygen', '-t', 'rsa', '-b', str(bits),
- '-f', temp_key, '-N', '', '-C', 'emergency-access-key'
- ], capture_output=True, text=True)
- if result.returncode != 0:
- return f"# SSH Key Generation Failed\n# Error: {result.stderr}\n"
- # Read private key
- with open(temp_key, 'r') as f:
- private_key = f.read()
- # Clean up
- os.unlink(temp_key)
- if os.path.exists(f"{temp_key}.pub"):
- os.unlink(f"{temp_key}.pub")
- return private_key
- except FileNotFoundError:
- return "# SSH key generation requires ssh-keygen to be installed\n# Please install OpenSSH client tools\n"
- except Exception as e:
- return f"# SSH key generation failed: {str(e)}\n"
- elif key_type == 'gpg':
- return """# GPG Private Key Placeholder
- # Replace this with your actual GPG private key
- # Export with: gpg --export-secret-keys --armor KEY_ID
- """
- elif key_type == 'api':
- # Generate a secure API key
- api_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(64))
- return f"API_KEY={api_key}\n"
- elif key_type == 'password':
- # Generate a secure password
- password = ''.join(secrets.choice(string.ascii_letters + string.digits + "!@#$%^&*") for _ in range(32))
- return f"PASSWORD={password}\n"
- else:
- return f"# {key_type.upper()} Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace with actual {key_type} content\n"
- def create_key_file(file_path: str, content: str, permissions: int = 0o600) -> None:
- """Create key file with content and secure permissions"""
- path = Path(file_path)
- # Create parent directory if needed
- path.parent.mkdir(parents=True, exist_ok=True)
- # Write content
- with open(path, 'w') as f:
- f.write(content)
- # Set secure permissions
- os.chmod(path, permissions)
- print(f"🔑 Created key file: {file_path} (permissions: {oct(permissions)})")
- def backup_key_file(file_path: str) -> Optional[str]:
- """Create backup of key file"""
- if not os.path.exists(file_path):
- return None
- backup_path = f"{file_path}.backup.{secrets.token_hex(4)}"
- shutil.copy2(file_path, backup_path)
- print(f"📁 Created backup: {backup_path}")
- return backup_path
- def list_key_files(config: Dict[str, Any]) -> None:
- """List all key files and their status"""
- keys = config.get('keys', {})
- if not keys:
- print("No keys configured")
- return
- print("\n" + "=" * 80)
- print("KEY FILE STATUS")
- print("=" * 80)
- for key_id, key_config in keys.items():
- file_path = key_config.get('file', '')
- print(f"\n📋 Key: {key_id}")
- print(f" File: {file_path}")
- if not file_path:
- print(" Status: ❌ No file path configured")
- continue
- path = Path(file_path)
- if not path.exists():
- print(" Status: ❌ File does not exist")
- else:
- # Get file info
- stat = path.stat()
- size = stat.st_size
- permissions = oct(stat.st_mode)[-3:]
- print(f" Status: ✅ Exists")
- print(f" Size: {size} bytes")
- print(f" Permissions: {permissions}")
- # Check if permissions are secure
- if stat.st_mode & 0o077: # Check if group/other have any permissions
- print(" Security: ⚠️ File permissions may be too permissive")
- else:
- print(" Security: ✅ Secure permissions")
- # Check if readable
- try:
- with open(path, 'r') as f:
- first_line = f.readline().strip()
- if first_line:
- print(f" Preview: {first_line[:50]}...")
- else:
- print(" Preview: Empty file")
- except Exception as e:
- print(f" Preview: ❌ Cannot read ({str(e)})")
- def rotate_key(config: Dict[str, Any], key_id: str, key_type: str = 'ssh', backup: bool = True) -> None:
- """Rotate a key by generating new content"""
- keys = config.get('keys', {})
- if key_id not in keys:
- print(f"❌ Key '{key_id}' not found")
- return
- key_config = keys[key_id]
- file_path = key_config.get('file', '')
- if not file_path:
- print(f"❌ No file path configured for key '{key_id}'")
- return
- # Create backup if requested and file exists
- backup_path = None
- if backup and os.path.exists(file_path):
- backup_path = backup_key_file(file_path)
- try:
- # Generate new key content
- print(f"🔄 Generating new {key_type} key...")
- new_content = generate_key_content(key_type)
- # Write new content
- create_key_file(file_path, new_content)
- print(f"✅ Rotated key '{key_id}'")
- if backup_path:
- print(f" Backup saved: {backup_path}")
- except Exception as e:
- print(f"❌ Failed to rotate key: {str(e)}")
- # Restore backup if available
- if backup_path and os.path.exists(backup_path):
- shutil.copy2(backup_path, file_path)
- print(f"🔄 Restored from backup")
- def validate_key_files(config: Dict[str, Any]) -> bool:
- """Validate all key files exist and are accessible"""
- keys = config.get('keys', {})
- all_valid = True
- print("\n" + "=" * 60)
- print("KEY FILE VALIDATION")
- print("=" * 60)
- for key_id, key_config in keys.items():
- file_path = key_config.get('file', '')
- print(f"\n🔍 Validating {key_id}...")
- if not file_path:
- print(f" ❌ No file path configured")
- all_valid = False
- continue
- path = Path(file_path)
- # Check existence
- if not path.exists():
- print(f" ❌ File does not exist: {file_path}")
- all_valid = False
- continue
- # Check readability
- try:
- with open(path, 'r') as f:
- content = f.read()
- if not content.strip():
- print(f" ⚠️ File is empty")
- else:
- print(f" ✅ File exists and readable ({len(content)} bytes)")
- except Exception as e:
- print(f" ❌ Cannot read file: {str(e)}")
- all_valid = False
- continue
- # Check permissions
- stat = path.stat()
- if stat.st_mode & 0o077:
- print(f" ⚠️ Permissions may be too permissive: {oct(stat.st_mode)[-3:]}")
- else:
- print(f" ✅ Secure permissions: {oct(stat.st_mode)[-3:]}")
- if all_valid:
- print(f"\n✅ All key files validated successfully")
- else:
- print(f"\n❌ Some key files have issues")
- return all_valid
- def main():
- parser = argparse.ArgumentParser(
- description="Manage emergency access key files and test access",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- Examples:
- # List all key files and their status
- python manage_keys.py --list-files
- # Test access to a specific key
- python manage_keys.py --test-key backup --password "your_password"
- # Test health endpoint
- python manage_keys.py --test-health --password "health_password"
- # Create a new SSH key file
- python manage_keys.py --create-key-file /path/to/key.txt --key-type ssh
- # Rotate a key (generate new content)
- python manage_keys.py --rotate-key backup --key-type ssh --backup
- # Validate all key files
- python manage_keys.py --validate
- # Generate key content only (print to stdout)
- python manage_keys.py --generate-content ssh
- """
- )
- parser.add_argument(
- '--config', '-c',
- default='config.json',
- help='Configuration file path (default: config.json)'
- )
- # File operations
- parser.add_argument(
- '--list-files', '-l',
- action='store_true',
- help='List all key files and their status'
- )
- parser.add_argument(
- '--validate', '-v',
- action='store_true',
- help='Validate all key files'
- )
- parser.add_argument(
- '--create-key-file',
- help='Create a key file at the specified path'
- )
- parser.add_argument(
- '--key-type',
- choices=['ssh', 'gpg', 'api', 'password', 'generic'],
- default='ssh',
- help='Type of key to generate (default: ssh)'
- )
- parser.add_argument(
- '--rotate-key',
- help='Rotate (regenerate) a key by key ID'
- )
- parser.add_argument(
- '--backup',
- action='store_true',
- default=True,
- help='Create backup before rotation (default: true)'
- )
- parser.add_argument(
- '--no-backup',
- action='store_true',
- help='Skip backup creation'
- )
- parser.add_argument(
- '--generate-content',
- help='Generate key content and print to stdout'
- )
- # Testing operations
- parser.add_argument(
- '--test-key',
- help='Test access to a specific key by key ID'
- )
- parser.add_argument(
- '--test-health',
- action='store_true',
- help='Test health endpoint access'
- )
- parser.add_argument(
- '--password', '-p',
- help='Password for testing (will prompt if not provided)'
- )
- parser.add_argument(
- '--test-all',
- action='store_true',
- help='Test access to all configured keys'
- )
- args = parser.parse_args()
- # Handle backup flag
- if args.no_backup:
- args.backup = False
- # Handle generate content
- if args.generate_content:
- content = generate_key_content(args.generate_content)
- print(content)
- return
- # Load configuration for other operations
- try:
- config = load_config(args.config)
- except Exception as e:
- print(f"❌ {e}")
- sys.exit(1)
- try:
- if args.list_files:
- list_key_files(config)
- elif args.validate:
- success = validate_key_files(config)
- sys.exit(0 if success else 1)
- elif args.create_key_file:
- content = generate_key_content(args.key_type)
- create_key_file(args.create_key_file, content)
- elif args.rotate_key:
- rotate_key(config, args.rotate_key, args.key_type, args.backup)
- elif args.test_key:
- password = args.password
- if not password:
- import getpass
- password = getpass.getpass(f"Enter password for key '{args.test_key}': ")
- result = test_key_access(config, args.test_key, password)
- if result['success']:
- print(f"✅ Key '{args.test_key}' access successful")
- print(f" URL: {result['url']}")
- print(f" Username: {result['username']}")
- else:
- print(f"❌ Key '{args.test_key}' access failed")
- print(f" URL: {result.get('url', 'N/A')}")
- print(f" Error: {result.get('error', 'Unknown error')}")
- if 'status_code' in result:
- print(f" Status: {result['status_code']}")
- elif args.test_health:
- password = args.password
- if not password:
- import getpass
- password = getpass.getpass("Enter health endpoint password: ")
- result = test_health_endpoint(config, password)
- if result['success']:
- print(f"✅ Health endpoint access successful")
- else:
- print(f"❌ Health endpoint access failed")
- print(f" Error: {result.get('error', 'Unknown error')}")
- elif args.test_all:
- keys = config.get('keys', {})
- if not keys:
- print("No keys configured to test")
- return
- password = args.password
- if not password:
- import getpass
- password = getpass.getpass("Enter password for key testing: ")
- print(f"\n🧪 Testing access to {len(keys)} keys...")
- for key_id in keys:
- result = test_key_access(config, key_id, password)
- status = "✅" if result['success'] else "❌"
- print(f" {status} {key_id}")
- else:
- parser.print_help()
- except Exception as e:
- print(f"❌ {e}")
- sys.exit(1)
- if __name__ == '__main__':
- main()
|