manage_keys.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. #!/usr/bin/env python3
  2. """
  3. Key Management Script for Emergency Access Server
  4. Manages key files, tests access, and provides key operations
  5. """
  6. import os
  7. import sys
  8. import json
  9. import yaml
  10. import argparse
  11. import requests
  12. import base64
  13. import secrets
  14. import string
  15. from pathlib import Path
  16. from typing import Dict, Any, Optional, List
  17. import subprocess
  18. import shutil
  19. def load_config(config_path: str) -> Dict[str, Any]:
  20. """Load configuration from file"""
  21. try:
  22. with open(config_path, 'r') as f:
  23. if config_path.endswith(('.yaml', '.yml')):
  24. return yaml.safe_load(f)
  25. else:
  26. return json.load(f)
  27. except FileNotFoundError:
  28. raise Exception(f"Configuration file {config_path} not found")
  29. except Exception as e:
  30. raise Exception(f"Failed to load configuration: {str(e)}")
  31. def get_server_url(config: Dict[str, Any]) -> str:
  32. """Get server URL from configuration"""
  33. host = config.get('server', {}).get('host', '127.0.0.1')
  34. port = config.get('server', {}).get('port', 1127)
  35. return f"http://{host}:{port}"
  36. def test_key_access(config: Dict[str, Any], key_id: str, password: str) -> Dict[str, Any]:
  37. """Test access to a specific key"""
  38. keys = config.get('keys', {})
  39. if key_id not in keys:
  40. return {'success': False, 'error': f"Key '{key_id}' not found in configuration"}
  41. key_config = keys[key_id]
  42. server_url = get_server_url(config)
  43. url = f"{server_url}{key_config['route']}"
  44. username = key_config['username']
  45. try:
  46. # Create basic auth header
  47. auth_string = base64.b64encode(f"{username}:{password}".encode()).decode()
  48. headers = {
  49. 'Authorization': f'Basic {auth_string}',
  50. 'User-Agent': 'emergency-access-test/1.0'
  51. }
  52. # Make request with timeout
  53. response = requests.get(url, headers=headers, timeout=10)
  54. return {
  55. 'success': response.status_code == 200,
  56. 'status_code': response.status_code,
  57. 'response_text': response.text[:500], # Limit response size
  58. 'url': url,
  59. 'username': username
  60. }
  61. except requests.exceptions.RequestException as e:
  62. return {
  63. 'success': False,
  64. 'error': f"Request failed: {str(e)}",
  65. 'url': url,
  66. 'username': username
  67. }
  68. def test_health_endpoint(config: Dict[str, Any], password: str) -> Dict[str, Any]:
  69. """Test health endpoint access"""
  70. server_url = get_server_url(config)
  71. health_route = config.get('routes', {}).get('health_route', '/health-check')
  72. username = config.get('routes', {}).get('health_username', 'health_monitor')
  73. url = f"{server_url}{health_route}"
  74. try:
  75. auth_string = base64.b64encode(f"{username}:{password}".encode()).decode()
  76. headers = {
  77. 'Authorization': f'Basic {auth_string}',
  78. 'User-Agent': 'emergency-access-test/1.0'
  79. }
  80. response = requests.get(url, headers=headers, timeout=10)
  81. return {
  82. 'success': response.status_code == 200,
  83. 'status_code': response.status_code,
  84. 'response_text': response.text[:500],
  85. 'url': url,
  86. 'username': username
  87. }
  88. except requests.exceptions.RequestException as e:
  89. return {
  90. 'success': False,
  91. 'error': f"Request failed: {str(e)}",
  92. 'url': url,
  93. 'username': username
  94. }
  95. def generate_key_content(key_type: str = 'ssh', bits: int = 4096) -> str:
  96. """Generate key content based on type"""
  97. if key_type == 'ssh':
  98. # Generate SSH key using ssh-keygen
  99. temp_key = f"/tmp/emergency_key_{secrets.token_hex(8)}"
  100. try:
  101. result = subprocess.run([
  102. 'ssh-keygen', '-t', 'rsa', '-b', str(bits),
  103. '-f', temp_key, '-N', '', '-C', 'emergency-access-key'
  104. ], capture_output=True, text=True)
  105. if result.returncode != 0:
  106. return f"# SSH Key Generation Failed\n# Error: {result.stderr}\n"
  107. # Read private key
  108. with open(temp_key, 'r') as f:
  109. private_key = f.read()
  110. # Clean up
  111. os.unlink(temp_key)
  112. if os.path.exists(f"{temp_key}.pub"):
  113. os.unlink(f"{temp_key}.pub")
  114. return private_key
  115. except FileNotFoundError:
  116. return "# SSH key generation requires ssh-keygen to be installed\n# Please install OpenSSH client tools\n"
  117. except Exception as e:
  118. return f"# SSH key generation failed: {str(e)}\n"
  119. elif key_type == 'gpg':
  120. return """# GPG Private Key Placeholder
  121. # Replace this with your actual GPG private key
  122. # Export with: gpg --export-secret-keys --armor KEY_ID
  123. """
  124. elif key_type == 'api':
  125. # Generate a secure API key
  126. api_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(64))
  127. return f"API_KEY={api_key}\n"
  128. elif key_type == 'password':
  129. # Generate a secure password
  130. password = ''.join(secrets.choice(string.ascii_letters + string.digits + "!@#$%^&*") for _ in range(32))
  131. return f"PASSWORD={password}\n"
  132. else:
  133. return f"# {key_type.upper()} Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace with actual {key_type} content\n"
  134. def create_key_file(file_path: str, content: str, permissions: int = 0o600) -> None:
  135. """Create key file with content and secure permissions"""
  136. path = Path(file_path)
  137. # Create parent directory if needed
  138. path.parent.mkdir(parents=True, exist_ok=True)
  139. # Write content
  140. with open(path, 'w') as f:
  141. f.write(content)
  142. # Set secure permissions
  143. os.chmod(path, permissions)
  144. print(f"🔑 Created key file: {file_path} (permissions: {oct(permissions)})")
  145. def backup_key_file(file_path: str) -> Optional[str]:
  146. """Create backup of key file"""
  147. if not os.path.exists(file_path):
  148. return None
  149. backup_path = f"{file_path}.backup.{secrets.token_hex(4)}"
  150. shutil.copy2(file_path, backup_path)
  151. print(f"📁 Created backup: {backup_path}")
  152. return backup_path
  153. def list_key_files(config: Dict[str, Any]) -> None:
  154. """List all key files and their status"""
  155. keys = config.get('keys', {})
  156. if not keys:
  157. print("No keys configured")
  158. return
  159. print("\n" + "=" * 80)
  160. print("KEY FILE STATUS")
  161. print("=" * 80)
  162. for key_id, key_config in keys.items():
  163. file_path = key_config.get('file', '')
  164. print(f"\n📋 Key: {key_id}")
  165. print(f" File: {file_path}")
  166. if not file_path:
  167. print(" Status: ❌ No file path configured")
  168. continue
  169. path = Path(file_path)
  170. if not path.exists():
  171. print(" Status: ❌ File does not exist")
  172. else:
  173. # Get file info
  174. stat = path.stat()
  175. size = stat.st_size
  176. permissions = oct(stat.st_mode)[-3:]
  177. print(f" Status: ✅ Exists")
  178. print(f" Size: {size} bytes")
  179. print(f" Permissions: {permissions}")
  180. # Check if permissions are secure
  181. if stat.st_mode & 0o077: # Check if group/other have any permissions
  182. print(" Security: ⚠️ File permissions may be too permissive")
  183. else:
  184. print(" Security: ✅ Secure permissions")
  185. # Check if readable
  186. try:
  187. with open(path, 'r') as f:
  188. first_line = f.readline().strip()
  189. if first_line:
  190. print(f" Preview: {first_line[:50]}...")
  191. else:
  192. print(" Preview: Empty file")
  193. except Exception as e:
  194. print(f" Preview: ❌ Cannot read ({str(e)})")
  195. def rotate_key(config: Dict[str, Any], key_id: str, key_type: str = 'ssh', backup: bool = True) -> None:
  196. """Rotate a key by generating new content"""
  197. keys = config.get('keys', {})
  198. if key_id not in keys:
  199. print(f"❌ Key '{key_id}' not found")
  200. return
  201. key_config = keys[key_id]
  202. file_path = key_config.get('file', '')
  203. if not file_path:
  204. print(f"❌ No file path configured for key '{key_id}'")
  205. return
  206. # Create backup if requested and file exists
  207. backup_path = None
  208. if backup and os.path.exists(file_path):
  209. backup_path = backup_key_file(file_path)
  210. try:
  211. # Generate new key content
  212. print(f"🔄 Generating new {key_type} key...")
  213. new_content = generate_key_content(key_type)
  214. # Write new content
  215. create_key_file(file_path, new_content)
  216. print(f"✅ Rotated key '{key_id}'")
  217. if backup_path:
  218. print(f" Backup saved: {backup_path}")
  219. except Exception as e:
  220. print(f"❌ Failed to rotate key: {str(e)}")
  221. # Restore backup if available
  222. if backup_path and os.path.exists(backup_path):
  223. shutil.copy2(backup_path, file_path)
  224. print(f"🔄 Restored from backup")
  225. def validate_key_files(config: Dict[str, Any]) -> bool:
  226. """Validate all key files exist and are accessible"""
  227. keys = config.get('keys', {})
  228. all_valid = True
  229. print("\n" + "=" * 60)
  230. print("KEY FILE VALIDATION")
  231. print("=" * 60)
  232. for key_id, key_config in keys.items():
  233. file_path = key_config.get('file', '')
  234. print(f"\n🔍 Validating {key_id}...")
  235. if not file_path:
  236. print(f" ❌ No file path configured")
  237. all_valid = False
  238. continue
  239. path = Path(file_path)
  240. # Check existence
  241. if not path.exists():
  242. print(f" ❌ File does not exist: {file_path}")
  243. all_valid = False
  244. continue
  245. # Check readability
  246. try:
  247. with open(path, 'r') as f:
  248. content = f.read()
  249. if not content.strip():
  250. print(f" ⚠️ File is empty")
  251. else:
  252. print(f" ✅ File exists and readable ({len(content)} bytes)")
  253. except Exception as e:
  254. print(f" ❌ Cannot read file: {str(e)}")
  255. all_valid = False
  256. continue
  257. # Check permissions
  258. stat = path.stat()
  259. if stat.st_mode & 0o077:
  260. print(f" ⚠️ Permissions may be too permissive: {oct(stat.st_mode)[-3:]}")
  261. else:
  262. print(f" ✅ Secure permissions: {oct(stat.st_mode)[-3:]}")
  263. if all_valid:
  264. print(f"\n✅ All key files validated successfully")
  265. else:
  266. print(f"\n❌ Some key files have issues")
  267. return all_valid
  268. def main():
  269. parser = argparse.ArgumentParser(
  270. description="Manage emergency access key files and test access",
  271. formatter_class=argparse.RawDescriptionHelpFormatter,
  272. epilog="""
  273. Examples:
  274. # List all key files and their status
  275. python manage_keys.py --list-files
  276. # Test access to a specific key
  277. python manage_keys.py --test-key backup --password "your_password"
  278. # Test health endpoint
  279. python manage_keys.py --test-health --password "health_password"
  280. # Create a new SSH key file
  281. python manage_keys.py --create-key-file /path/to/key.txt --key-type ssh
  282. # Rotate a key (generate new content)
  283. python manage_keys.py --rotate-key backup --key-type ssh --backup
  284. # Validate all key files
  285. python manage_keys.py --validate
  286. # Generate key content only (print to stdout)
  287. python manage_keys.py --generate-content ssh
  288. """
  289. )
  290. parser.add_argument(
  291. '--config', '-c',
  292. default='config.json',
  293. help='Configuration file path (default: config.json)'
  294. )
  295. # File operations
  296. parser.add_argument(
  297. '--list-files', '-l',
  298. action='store_true',
  299. help='List all key files and their status'
  300. )
  301. parser.add_argument(
  302. '--validate', '-v',
  303. action='store_true',
  304. help='Validate all key files'
  305. )
  306. parser.add_argument(
  307. '--create-key-file',
  308. help='Create a key file at the specified path'
  309. )
  310. parser.add_argument(
  311. '--key-type',
  312. choices=['ssh', 'gpg', 'api', 'password', 'generic'],
  313. default='ssh',
  314. help='Type of key to generate (default: ssh)'
  315. )
  316. parser.add_argument(
  317. '--rotate-key',
  318. help='Rotate (regenerate) a key by key ID'
  319. )
  320. parser.add_argument(
  321. '--backup',
  322. action='store_true',
  323. default=True,
  324. help='Create backup before rotation (default: true)'
  325. )
  326. parser.add_argument(
  327. '--no-backup',
  328. action='store_true',
  329. help='Skip backup creation'
  330. )
  331. parser.add_argument(
  332. '--generate-content',
  333. help='Generate key content and print to stdout'
  334. )
  335. # Testing operations
  336. parser.add_argument(
  337. '--test-key',
  338. help='Test access to a specific key by key ID'
  339. )
  340. parser.add_argument(
  341. '--test-health',
  342. action='store_true',
  343. help='Test health endpoint access'
  344. )
  345. parser.add_argument(
  346. '--password', '-p',
  347. help='Password for testing (will prompt if not provided)'
  348. )
  349. parser.add_argument(
  350. '--test-all',
  351. action='store_true',
  352. help='Test access to all configured keys'
  353. )
  354. args = parser.parse_args()
  355. # Handle backup flag
  356. if args.no_backup:
  357. args.backup = False
  358. # Handle generate content
  359. if args.generate_content:
  360. content = generate_key_content(args.generate_content)
  361. print(content)
  362. return
  363. # Load configuration for other operations
  364. try:
  365. config = load_config(args.config)
  366. except Exception as e:
  367. print(f"❌ {e}")
  368. sys.exit(1)
  369. try:
  370. if args.list_files:
  371. list_key_files(config)
  372. elif args.validate:
  373. success = validate_key_files(config)
  374. sys.exit(0 if success else 1)
  375. elif args.create_key_file:
  376. content = generate_key_content(args.key_type)
  377. create_key_file(args.create_key_file, content)
  378. elif args.rotate_key:
  379. rotate_key(config, args.rotate_key, args.key_type, args.backup)
  380. elif args.test_key:
  381. password = args.password
  382. if not password:
  383. import getpass
  384. password = getpass.getpass(f"Enter password for key '{args.test_key}': ")
  385. result = test_key_access(config, args.test_key, password)
  386. if result['success']:
  387. print(f"✅ Key '{args.test_key}' access successful")
  388. print(f" URL: {result['url']}")
  389. print(f" Username: {result['username']}")
  390. else:
  391. print(f"❌ Key '{args.test_key}' access failed")
  392. print(f" URL: {result.get('url', 'N/A')}")
  393. print(f" Error: {result.get('error', 'Unknown error')}")
  394. if 'status_code' in result:
  395. print(f" Status: {result['status_code']}")
  396. elif args.test_health:
  397. password = args.password
  398. if not password:
  399. import getpass
  400. password = getpass.getpass("Enter health endpoint password: ")
  401. result = test_health_endpoint(config, password)
  402. if result['success']:
  403. print(f"✅ Health endpoint access successful")
  404. else:
  405. print(f"❌ Health endpoint access failed")
  406. print(f" Error: {result.get('error', 'Unknown error')}")
  407. elif args.test_all:
  408. keys = config.get('keys', {})
  409. if not keys:
  410. print("No keys configured to test")
  411. return
  412. password = args.password
  413. if not password:
  414. import getpass
  415. password = getpass.getpass("Enter password for key testing: ")
  416. print(f"\n🧪 Testing access to {len(keys)} keys...")
  417. for key_id in keys:
  418. result = test_key_access(config, key_id, password)
  419. status = "✅" if result['success'] else "❌"
  420. print(f" {status} {key_id}")
  421. else:
  422. parser.print_help()
  423. except Exception as e:
  424. print(f"❌ {e}")
  425. sys.exit(1)
  426. if __name__ == '__main__':
  427. main()