add_key.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. #!/usr/bin/env python3
  2. """
  3. Add Key Script for Emergency Access Server
  4. Adds new emergency access keys to the configuration with proper validation
  5. """
  6. import json
  7. import yaml
  8. import os
  9. import sys
  10. import argparse
  11. import secrets
  12. import string
  13. import hashlib
  14. import shutil
  15. from pathlib import Path
  16. from typing import Dict, Any, Optional
  17. def generate_secure_password(length: int = 64) -> str:
  18. """Generate a cryptographically secure password"""
  19. alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
  20. return ''.join(secrets.choice(alphabet) for _ in range(length))
  21. def generate_password_hash(password: str) -> str:
  22. """Generate a password hash using PBKDF2 with SHA-256"""
  23. salt = secrets.token_hex(16)
  24. password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
  25. return f"{salt}:{password_hash.hex()}"
  26. def load_config(config_path: str) -> Dict[str, Any]:
  27. """Load configuration from file"""
  28. try:
  29. with open(config_path, 'r') as f:
  30. if config_path.endswith(('.yaml', '.yml')):
  31. return yaml.safe_load(f)
  32. else:
  33. return json.load(f)
  34. except FileNotFoundError:
  35. raise Exception(f"Configuration file {config_path} not found")
  36. except Exception as e:
  37. raise Exception(f"Failed to load configuration: {str(e)}")
  38. def save_config(config: Dict[str, Any], config_path: str) -> None:
  39. """Save configuration to file"""
  40. # Create backup
  41. backup_path = f"{config_path}.backup"
  42. if os.path.exists(config_path):
  43. shutil.copy2(config_path, backup_path)
  44. print(f"📁 Created backup: {backup_path}")
  45. try:
  46. with open(config_path, 'w') as f:
  47. if config_path.endswith(('.yaml', '.yml')):
  48. yaml.safe_dump(config, f, indent=2, default_flow_style=False)
  49. else:
  50. json.dump(config, f, indent=2)
  51. print(f"💾 Configuration saved to: {config_path}")
  52. except Exception as e:
  53. # Restore backup if save failed
  54. if os.path.exists(backup_path):
  55. shutil.move(backup_path, config_path)
  56. raise Exception(f"Failed to save configuration: {str(e)}")
  57. def validate_key_id(key_id: str, existing_keys: Dict[str, Any]) -> None:
  58. """Validate key ID format and uniqueness"""
  59. if not key_id:
  60. raise ValueError("Key ID cannot be empty")
  61. if not key_id.replace('_', '').replace('-', '').isalnum():
  62. raise ValueError("Key ID must contain only letters, numbers, underscores, and hyphens")
  63. if key_id in existing_keys:
  64. raise ValueError(f"Key ID '{key_id}' already exists")
  65. def validate_file_path(file_path: str) -> None:
  66. """Validate file path and create directory if needed"""
  67. path = Path(file_path)
  68. # Create parent directory if it doesn't exist
  69. path.parent.mkdir(parents=True, exist_ok=True)
  70. # Check if we can write to the directory
  71. if not os.access(path.parent, os.W_OK):
  72. raise ValueError(f"Cannot write to directory: {path.parent}")
  73. def validate_backends(backends: list, available_backends: Optional[list] = None) -> None:
  74. """Validate notification backends"""
  75. if not backends:
  76. raise ValueError("At least one notification backend must be specified")
  77. if available_backends:
  78. invalid_backends = [b for b in backends if b not in available_backends]
  79. if invalid_backends:
  80. raise ValueError(f"Invalid backends: {invalid_backends}")
  81. def get_available_backends(config: Dict[str, Any]) -> list:
  82. """Extract available backends from ntfy config"""
  83. try:
  84. ntfy_config_path = config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml')
  85. if os.path.exists(ntfy_config_path):
  86. with open(ntfy_config_path, 'r') as f:
  87. ntfy_config = yaml.safe_load(f)
  88. if 'backends' in ntfy_config:
  89. return list(ntfy_config['backends'].keys())
  90. except Exception:
  91. pass
  92. return []
  93. def create_key_file(file_path: str, content: str = None) -> None:
  94. """Create the key file with content or placeholder"""
  95. path = Path(file_path)
  96. if content is None:
  97. content = f"# Emergency Access Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace this with your actual key content\n"
  98. with open(path, 'w') as f:
  99. f.write(content)
  100. # Set secure permissions (owner read/write only)
  101. os.chmod(path, 0o600)
  102. print(f"🔑 Created key file: {file_path} (permissions: 600)")
  103. def add_key_interactive(config: Dict[str, Any], config_path: str) -> None:
  104. """Interactive key addition"""
  105. print("\n" + "=" * 60)
  106. print("INTERACTIVE KEY ADDITION")
  107. print("=" * 60)
  108. existing_keys = config.get('keys', {})
  109. available_backends = get_available_backends(config)
  110. # Get key ID
  111. while True:
  112. key_id = input("\nEnter key ID (e.g., 'backup', 'master', 'recovery'): ").strip()
  113. try:
  114. validate_key_id(key_id, existing_keys)
  115. break
  116. except ValueError as e:
  117. print(f"❌ {e}")
  118. # Get route (optional, defaults to /emergency-key-{key_id})
  119. default_route = f"/emergency-key-{key_id}"
  120. route = input(f"Enter route (default: {default_route}): ").strip()
  121. if not route:
  122. route = default_route
  123. # Get file path
  124. while True:
  125. default_file = f"/etc/emergency-access/{key_id}-key.txt"
  126. file_path = input(f"Enter key file path (default: {default_file}): ").strip()
  127. if not file_path:
  128. file_path = default_file
  129. try:
  130. validate_file_path(file_path)
  131. break
  132. except ValueError as e:
  133. print(f"❌ {e}")
  134. # Get username (optional, defaults to emergency_{key_id})
  135. default_username = f"emergency_{key_id}"
  136. username = input(f"Enter username (default: {default_username}): ").strip()
  137. if not username:
  138. username = default_username
  139. # Get password
  140. import getpass
  141. while True:
  142. password = getpass.getpass("Enter password (leave empty to generate): ").strip()
  143. if not password:
  144. password = generate_secure_password(64)
  145. print(f"🔐 Generated password: {password}")
  146. break
  147. elif len(password) < 12:
  148. print("❌ Password must be at least 12 characters")
  149. else:
  150. break
  151. password_hash = generate_password_hash(password)
  152. # Get notification backends
  153. if available_backends:
  154. print(f"\nAvailable backends: {', '.join(available_backends)}")
  155. while True:
  156. backends_input = input("Enter notification backends (comma-separated): ").strip()
  157. backends = [b.strip() for b in backends_input.split(',') if b.strip()]
  158. try:
  159. validate_backends(backends, available_backends if available_backends else None)
  160. break
  161. except ValueError as e:
  162. print(f"❌ {e}")
  163. # Get custom message (optional)
  164. default_message = f"🚨 EMERGENCY: Key {key_id} accessed from server"
  165. message = input(f"Enter notification message (default: {default_message}): ").strip()
  166. if not message:
  167. message = default_message
  168. # Create key file
  169. create_file = input(f"\nCreate key file at {file_path}? (y/N): ").strip().lower()
  170. if create_file == 'y':
  171. key_content = input("Enter key content (leave empty for placeholder): ").strip()
  172. create_key_file(file_path, key_content if key_content else None)
  173. # Add key to config
  174. add_key_to_config(config, key_id, {
  175. 'route': route,
  176. 'file': file_path,
  177. 'username': username,
  178. 'password_hash': password_hash,
  179. 'backends': backends,
  180. 'message': message
  181. })
  182. save_config(config, config_path)
  183. print(f"\n✅ Successfully added key '{key_id}'")
  184. print(f" Route: {route}")
  185. print(f" File: {file_path}")
  186. print(f" Username: {username}")
  187. print(f" Password: {password}")
  188. print(f" Backends: {', '.join(backends)}")
  189. def add_key_to_config(config: Dict[str, Any], key_id: str, key_config: Dict[str, Any]) -> None:
  190. """Add key configuration to config"""
  191. if 'keys' not in config:
  192. config['keys'] = {}
  193. config['keys'][key_id] = key_config
  194. def add_key_programmatic(config: Dict[str, Any], config_path: str, args) -> None:
  195. """Add key programmatically using command line arguments"""
  196. existing_keys = config.get('keys', {})
  197. available_backends = get_available_backends(config)
  198. # Validate inputs
  199. validate_key_id(args.key_id, existing_keys)
  200. validate_file_path(args.file)
  201. validate_backends(args.backends, available_backends if available_backends else None)
  202. # Generate password if not provided
  203. if args.password:
  204. password = args.password
  205. else:
  206. password = generate_secure_password(64)
  207. print(f"🔐 Generated password: {password}")
  208. password_hash = generate_password_hash(password)
  209. # Set defaults
  210. route = args.route or f"/emergency-key-{args.key_id}"
  211. username = args.username or f"emergency_{args.key_id}"
  212. message = args.message or f"🚨 EMERGENCY: Key {args.key_id} accessed from server"
  213. # Create key file if requested
  214. if args.create_file:
  215. create_key_file(args.file, args.key_content)
  216. # Add key to config
  217. add_key_to_config(config, args.key_id, {
  218. 'route': route,
  219. 'file': args.file,
  220. 'username': username,
  221. 'password_hash': password_hash,
  222. 'backends': args.backends,
  223. 'message': message
  224. })
  225. save_config(config, config_path)
  226. print(f"\n✅ Successfully added key '{args.key_id}'")
  227. print(f" Route: {route}")
  228. print(f" File: {args.file}")
  229. print(f" Username: {username}")
  230. print(f" Password: {password}")
  231. print(f" Backends: {', '.join(args.backends)}")
  232. def list_keys(config: Dict[str, Any]) -> None:
  233. """List existing keys"""
  234. keys = config.get('keys', {})
  235. if not keys:
  236. print("No keys configured")
  237. return
  238. print("\n" + "=" * 60)
  239. print("CONFIGURED KEYS")
  240. print("=" * 60)
  241. for key_id, key_config in keys.items():
  242. print(f"\n📋 Key ID: {key_id}")
  243. print(f" Route: {key_config.get('route', 'N/A')}")
  244. print(f" File: {key_config.get('file', 'N/A')}")
  245. print(f" Username: {key_config.get('username', 'N/A')}")
  246. print(f" Backends: {', '.join(key_config.get('backends', []))}")
  247. print(f" Message: {key_config.get('message', 'N/A')}")
  248. def remove_key(config: Dict[str, Any], config_path: str, key_id: str, remove_file: bool = False) -> None:
  249. """Remove a key from configuration"""
  250. keys = config.get('keys', {})
  251. if key_id not in keys:
  252. print(f"❌ Key '{key_id}' not found")
  253. return
  254. key_config = keys[key_id]
  255. # Remove file if requested
  256. if remove_file and 'file' in key_config:
  257. file_path = key_config['file']
  258. if os.path.exists(file_path):
  259. os.remove(file_path)
  260. print(f"🗑️ Removed file: {file_path}")
  261. # Remove from config
  262. del keys[key_id]
  263. save_config(config, config_path)
  264. print(f"✅ Removed key '{key_id}' from configuration")
  265. def main():
  266. parser = argparse.ArgumentParser(
  267. description="Add, list, or remove emergency access keys",
  268. formatter_class=argparse.RawDescriptionHelpFormatter,
  269. epilog="""
  270. Examples:
  271. # Interactive key addition
  272. python add_key.py --interactive
  273. # Add key programmatically
  274. python add_key.py --key-id backup --file /etc/emergency-access/backup.txt --backends matrix_sec,email
  275. # List existing keys
  276. python add_key.py --list
  277. # Remove a key
  278. python add_key.py --remove backup --remove-file
  279. # Add key with all options
  280. python add_key.py --key-id master \\
  281. --file /etc/emergency-access/master.txt \\
  282. --route /emergency-master \\
  283. --username master_user \\
  284. --password "secure_password" \\
  285. --backends matrix_sec,pushover \\
  286. --message "Master key accessed!" \\
  287. --create-file \\
  288. --key-content "actual key content here"
  289. """
  290. )
  291. parser.add_argument(
  292. '--config', '-c',
  293. default='config.json',
  294. help='Configuration file path (default: config.json)'
  295. )
  296. # Add key options
  297. parser.add_argument(
  298. '--interactive', '-i',
  299. action='store_true',
  300. help='Interactive key addition'
  301. )
  302. parser.add_argument(
  303. '--key-id',
  304. help='Key identifier'
  305. )
  306. parser.add_argument(
  307. '--file', '-f',
  308. help='Path to key file'
  309. )
  310. parser.add_argument(
  311. '--route',
  312. help='HTTP route for key access (default: /emergency-key-{key_id})'
  313. )
  314. parser.add_argument(
  315. '--username', '-u',
  316. help='HTTP Basic Auth username (default: emergency_{key_id})'
  317. )
  318. parser.add_argument(
  319. '--password', '-p',
  320. help='HTTP Basic Auth password (generated if not provided)'
  321. )
  322. parser.add_argument(
  323. '--backends', '-b',
  324. nargs='+',
  325. help='Notification backends (space or comma separated)'
  326. )
  327. parser.add_argument(
  328. '--message', '-m',
  329. help='Notification message'
  330. )
  331. parser.add_argument(
  332. '--create-file',
  333. action='store_true',
  334. help='Create the key file'
  335. )
  336. parser.add_argument(
  337. '--key-content',
  338. help='Content for the key file'
  339. )
  340. # List/remove options
  341. parser.add_argument(
  342. '--list', '-l',
  343. action='store_true',
  344. help='List existing keys'
  345. )
  346. parser.add_argument(
  347. '--remove', '-r',
  348. help='Remove key by ID'
  349. )
  350. parser.add_argument(
  351. '--remove-file',
  352. action='store_true',
  353. help='Also remove the key file when removing a key'
  354. )
  355. args = parser.parse_args()
  356. # Handle backends input (support both space and comma separated)
  357. if args.backends:
  358. backends = []
  359. for backend_group in args.backends:
  360. backends.extend([b.strip() for b in backend_group.replace(',', ' ').split() if b.strip()])
  361. args.backends = backends
  362. # Load configuration
  363. try:
  364. config = load_config(args.config)
  365. except Exception as e:
  366. print(f"❌ {e}")
  367. sys.exit(1)
  368. try:
  369. if args.list:
  370. list_keys(config)
  371. elif args.remove:
  372. remove_key(config, args.config, args.remove, args.remove_file)
  373. elif args.interactive:
  374. add_key_interactive(config, args.config)
  375. elif args.key_id and args.file and args.backends:
  376. add_key_programmatic(config, args.config, args)
  377. else:
  378. parser.print_help()
  379. print("\n❌ Missing required arguments for key addition")
  380. print(" Use --interactive for guided setup")
  381. print(" Or provide --key-id, --file, and --backends")
  382. except Exception as e:
  383. print(f"❌ {e}")
  384. sys.exit(1)
  385. if __name__ == '__main__':
  386. main()