#!/usr/bin/env python3 """ Add Key Script for Emergency Access Server Adds new emergency access keys to the configuration with proper validation """ import json import yaml import os import sys import argparse import secrets import string import hashlib import shutil from pathlib import Path from typing import Dict, Any, Optional def generate_secure_password(length: int = 64) -> str: """Generate a cryptographically secure password""" alphabet = string.ascii_letters + string.digits + "!@#$%^&*" return ''.join(secrets.choice(alphabet) for _ in range(length)) def generate_password_hash(password: str) -> str: """Generate a password hash using PBKDF2 with SHA-256""" salt = secrets.token_hex(16) password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return f"{salt}:{password_hash.hex()}" def load_config(config_path: str) -> Dict[str, Any]: """Load configuration from file""" try: with open(config_path, 'r') as f: if config_path.endswith(('.yaml', '.yml')): return yaml.safe_load(f) else: return json.load(f) except FileNotFoundError: raise Exception(f"Configuration file {config_path} not found") except Exception as e: raise Exception(f"Failed to load configuration: {str(e)}") def save_config(config: Dict[str, Any], config_path: str) -> None: """Save configuration to file""" # Create backup backup_path = f"{config_path}.backup" if os.path.exists(config_path): shutil.copy2(config_path, backup_path) print(f"šŸ“ Created backup: {backup_path}") try: with open(config_path, 'w') as f: if config_path.endswith(('.yaml', '.yml')): yaml.safe_dump(config, f, indent=2, default_flow_style=False) else: json.dump(config, f, indent=2) print(f"šŸ’¾ Configuration saved to: {config_path}") except Exception as e: # Restore backup if save failed if os.path.exists(backup_path): shutil.move(backup_path, config_path) raise Exception(f"Failed to save configuration: {str(e)}") def validate_key_id(key_id: str, existing_keys: Dict[str, Any]) -> None: """Validate key ID format and uniqueness""" if not key_id: raise ValueError("Key ID cannot be empty") if not key_id.replace('_', '').replace('-', '').isalnum(): raise ValueError("Key ID must contain only letters, numbers, underscores, and hyphens") if key_id in existing_keys: raise ValueError(f"Key ID '{key_id}' already exists") def validate_file_path(file_path: str) -> None: """Validate file path and create directory if needed""" path = Path(file_path) # Create parent directory if it doesn't exist path.parent.mkdir(parents=True, exist_ok=True) # Check if we can write to the directory if not os.access(path.parent, os.W_OK): raise ValueError(f"Cannot write to directory: {path.parent}") def validate_backends(backends: list, available_backends: Optional[list] = None) -> None: """Validate notification backends""" if not backends: raise ValueError("At least one notification backend must be specified") if available_backends: invalid_backends = [b for b in backends if b not in available_backends] if invalid_backends: raise ValueError(f"Invalid backends: {invalid_backends}") def get_available_backends(config: Dict[str, Any]) -> list: """Extract available backends from ntfy config""" try: ntfy_config_path = config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml') if os.path.exists(ntfy_config_path): with open(ntfy_config_path, 'r') as f: ntfy_config = yaml.safe_load(f) if 'backends' in ntfy_config: return list(ntfy_config['backends'].keys()) except Exception: pass return [] def create_key_file(file_path: str, content: str = None) -> None: """Create the key file with content or placeholder""" path = Path(file_path) if content is None: content = f"# Emergency Access Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace this with your actual key content\n" with open(path, 'w') as f: f.write(content) # Set secure permissions (owner read/write only) os.chmod(path, 0o600) print(f"šŸ”‘ Created key file: {file_path} (permissions: 600)") def add_key_interactive(config: Dict[str, Any], config_path: str) -> None: """Interactive key addition""" print("\n" + "=" * 60) print("INTERACTIVE KEY ADDITION") print("=" * 60) existing_keys = config.get('keys', {}) available_backends = get_available_backends(config) # Get key ID while True: key_id = input("\nEnter key ID (e.g., 'backup', 'master', 'recovery'): ").strip() try: validate_key_id(key_id, existing_keys) break except ValueError as e: print(f"āŒ {e}") # Get route (optional, defaults to /emergency-key-{key_id}) default_route = f"/emergency-key-{key_id}" route = input(f"Enter route (default: {default_route}): ").strip() if not route: route = default_route # Get file path while True: default_file = f"/etc/emergency-access/{key_id}-key.txt" file_path = input(f"Enter key file path (default: {default_file}): ").strip() if not file_path: file_path = default_file try: validate_file_path(file_path) break except ValueError as e: print(f"āŒ {e}") # Get username (optional, defaults to emergency_{key_id}) default_username = f"emergency_{key_id}" username = input(f"Enter username (default: {default_username}): ").strip() if not username: username = default_username # Get password import getpass while True: password = getpass.getpass("Enter password (leave empty to generate): ").strip() if not password: password = generate_secure_password(64) print(f"šŸ” Generated password: {password}") break elif len(password) < 12: print("āŒ Password must be at least 12 characters") else: break password_hash = generate_password_hash(password) # Get notification backends if available_backends: print(f"\nAvailable backends: {', '.join(available_backends)}") while True: backends_input = input("Enter notification backends (comma-separated): ").strip() backends = [b.strip() for b in backends_input.split(',') if b.strip()] try: validate_backends(backends, available_backends if available_backends else None) break except ValueError as e: print(f"āŒ {e}") # Get custom message (optional) default_message = f"🚨 EMERGENCY: Key {key_id} accessed from server" message = input(f"Enter notification message (default: {default_message}): ").strip() if not message: message = default_message # Create key file create_file = input(f"\nCreate key file at {file_path}? (y/N): ").strip().lower() if create_file == 'y': key_content = input("Enter key content (leave empty for placeholder): ").strip() create_key_file(file_path, key_content if key_content else None) # Add key to config add_key_to_config(config, key_id, { 'route': route, 'file': file_path, 'username': username, 'password_hash': password_hash, 'backends': backends, 'message': message }) save_config(config, config_path) print(f"\nāœ… Successfully added key '{key_id}'") print(f" Route: {route}") print(f" File: {file_path}") print(f" Username: {username}") print(f" Password: {password}") print(f" Backends: {', '.join(backends)}") def add_key_to_config(config: Dict[str, Any], key_id: str, key_config: Dict[str, Any]) -> None: """Add key configuration to config""" if 'keys' not in config: config['keys'] = {} config['keys'][key_id] = key_config def add_key_programmatic(config: Dict[str, Any], config_path: str, args) -> None: """Add key programmatically using command line arguments""" existing_keys = config.get('keys', {}) available_backends = get_available_backends(config) # Validate inputs validate_key_id(args.key_id, existing_keys) validate_file_path(args.file) validate_backends(args.backends, available_backends if available_backends else None) # Generate password if not provided if args.password: password = args.password else: password = generate_secure_password(64) print(f"šŸ” Generated password: {password}") password_hash = generate_password_hash(password) # Set defaults route = args.route or f"/emergency-key-{args.key_id}" username = args.username or f"emergency_{args.key_id}" message = args.message or f"🚨 EMERGENCY: Key {args.key_id} accessed from server" # Create key file if requested if args.create_file: create_key_file(args.file, args.key_content) # Add key to config add_key_to_config(config, args.key_id, { 'route': route, 'file': args.file, 'username': username, 'password_hash': password_hash, 'backends': args.backends, 'message': message }) save_config(config, config_path) print(f"\nāœ… Successfully added key '{args.key_id}'") print(f" Route: {route}") print(f" File: {args.file}") print(f" Username: {username}") print(f" Password: {password}") print(f" Backends: {', '.join(args.backends)}") def list_keys(config: Dict[str, Any]) -> None: """List existing keys""" keys = config.get('keys', {}) if not keys: print("No keys configured") return print("\n" + "=" * 60) print("CONFIGURED KEYS") print("=" * 60) for key_id, key_config in keys.items(): print(f"\nšŸ“‹ Key ID: {key_id}") print(f" Route: {key_config.get('route', 'N/A')}") print(f" File: {key_config.get('file', 'N/A')}") print(f" Username: {key_config.get('username', 'N/A')}") print(f" Backends: {', '.join(key_config.get('backends', []))}") print(f" Message: {key_config.get('message', 'N/A')}") def remove_key(config: Dict[str, Any], config_path: str, key_id: str, remove_file: bool = False) -> None: """Remove a key from configuration""" keys = config.get('keys', {}) if key_id not in keys: print(f"āŒ Key '{key_id}' not found") return key_config = keys[key_id] # Remove file if requested if remove_file and 'file' in key_config: file_path = key_config['file'] if os.path.exists(file_path): os.remove(file_path) print(f"šŸ—‘ļø Removed file: {file_path}") # Remove from config del keys[key_id] save_config(config, config_path) print(f"āœ… Removed key '{key_id}' from configuration") def main(): parser = argparse.ArgumentParser( description="Add, list, or remove emergency access keys", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Interactive key addition python add_key.py --interactive # Add key programmatically python add_key.py --key-id backup --file /etc/emergency-access/backup.txt --backends matrix_sec,email # List existing keys python add_key.py --list # Remove a key python add_key.py --remove backup --remove-file # Add key with all options python add_key.py --key-id master \\ --file /etc/emergency-access/master.txt \\ --route /emergency-master \\ --username master_user \\ --password "secure_password" \\ --backends matrix_sec,pushover \\ --message "Master key accessed!" \\ --create-file \\ --key-content "actual key content here" """ ) parser.add_argument( '--config', '-c', default='config.json', help='Configuration file path (default: config.json)' ) # Add key options parser.add_argument( '--interactive', '-i', action='store_true', help='Interactive key addition' ) parser.add_argument( '--key-id', help='Key identifier' ) parser.add_argument( '--file', '-f', help='Path to key file' ) parser.add_argument( '--route', help='HTTP route for key access (default: /emergency-key-{key_id})' ) parser.add_argument( '--username', '-u', help='HTTP Basic Auth username (default: emergency_{key_id})' ) parser.add_argument( '--password', '-p', help='HTTP Basic Auth password (generated if not provided)' ) parser.add_argument( '--backends', '-b', nargs='+', help='Notification backends (space or comma separated)' ) parser.add_argument( '--message', '-m', help='Notification message' ) parser.add_argument( '--create-file', action='store_true', help='Create the key file' ) parser.add_argument( '--key-content', help='Content for the key file' ) # List/remove options parser.add_argument( '--list', '-l', action='store_true', help='List existing keys' ) parser.add_argument( '--remove', '-r', help='Remove key by ID' ) parser.add_argument( '--remove-file', action='store_true', help='Also remove the key file when removing a key' ) args = parser.parse_args() # Handle backends input (support both space and comma separated) if args.backends: backends = [] for backend_group in args.backends: backends.extend([b.strip() for b in backend_group.replace(',', ' ').split() if b.strip()]) args.backends = backends # Load configuration try: config = load_config(args.config) except Exception as e: print(f"āŒ {e}") sys.exit(1) try: if args.list: list_keys(config) elif args.remove: remove_key(config, args.config, args.remove, args.remove_file) elif args.interactive: add_key_interactive(config, args.config) elif args.key_id and args.file and args.backends: add_key_programmatic(config, args.config, args) else: parser.print_help() print("\nāŒ Missing required arguments for key addition") print(" Use --interactive for guided setup") print(" Or provide --key-id, --file, and --backends") except Exception as e: print(f"āŒ {e}") sys.exit(1) if __name__ == '__main__': main()