import json import os import yaml import hashlib import hmac import secrets from typing import Dict, List, Any, Optional class KeyConfig: """Configuration for a single key""" def __init__(self, key_id: str, config_data: Dict[str, Any], global_config: Dict[str, Any]): self.key_id = key_id self.route = config_data.get('route', f'/emergency-key-{key_id}') self.file_path = config_data.get('file', '') self.backends = config_data.get('backends', []) self.message = config_data.get('message', f'🚨 EMERGENCY: Key {key_id} accessed from server') self.username = config_data.get('username', f'emergency_{key_id}') self.password_hash = config_data.get('password_hash', '') if not self.file_path: raise Exception(f"File path not configured for key '{key_id}'") if not self.backends: raise Exception(f"No notification backends configured for key '{key_id}'") if not self.password_hash: raise Exception(f"Password hash not configured for key '{key_id}'. Use generate_password_hash() to create one.") class Config: def __init__(self, config_path: str = None): if config_path is None: config_path = os.environ.get('EMERGENCY_CONFIG', 'config.json') self.config_path = config_path self.config = self._load_config() self._keys = self._load_keys() @staticmethod def generate_password_hash(password: str, iterations: int = 100000) -> str: """Generate a password hash for use in configuration. Stored canonical format: pbkdf2_sha256$$$ Legacy format : remains supported for compatibility. """ salt = secrets.token_hex(16) derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations) # Return canonical format including algorithm and iteration count return f"pbkdf2_sha256${iterations}${salt}${derived.hex()}" @staticmethod def verify_password(password: str, password_hash: str) -> bool: """Verify a password against its hash using constant-time comparison. Supported formats: - New canonical format: pbkdf2_sha256$$$ - Legacy format: : Returns True only if the password matches the stored hash. """ try: # New canonical format uses '$' as separator if '$' in password_hash: parts = password_hash.split('$') # Expecting exactly 4 parts: algo, iterations, salt, hexhash if len(parts) != 4 or not parts[0].startswith('pbkdf2_sha256'): return False _, iterations_s, salt, stored_hex = parts iterations = int(iterations_s) derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations) try: stored_bytes = bytes.fromhex(stored_hex) except ValueError: return False # Use constant-time comparison return hmac.compare_digest(derived, stored_bytes) else: # Legacy format: salt:hex salt, stored_hex = password_hash.split(':') derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) try: stored_bytes = bytes.fromhex(stored_hex) except ValueError: return False return hmac.compare_digest(derived, stored_bytes) except Exception: return False def _load_config(self) -> Dict[str, Any]: """Load configuration from file""" try: with open(self.config_path, 'r') as f: if self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'): return yaml.safe_load(f) else: return json.load(f) except FileNotFoundError: raise Exception(f"Configuration file {self.config_path} not found") except Exception as e: raise Exception(f"Failed to load configuration: {str(e)}") def _load_keys(self) -> Dict[str, KeyConfig]: """Load key configurations""" keys = {} if 'keys' not in self.config: raise Exception("No keys configured. Configuration must include a 'keys' section") for key_id, key_config in self.config['keys'].items(): keys[key_id] = KeyConfig(key_id, key_config, self.config) if not keys: raise Exception("No valid keys found in configuration") return keys @property def server_host(self) -> str: return self.config.get('server', {}).get('host', '127.0.0.1') @property def server_port(self) -> int: return self.config.get('server', {}).get('port', 1127) @property def health_route(self) -> str: return self.config.get('routes', {}).get('health_route', '/health-check') @property def health_username(self) -> str: return self.config.get('routes', {}).get('health_username', 'health_monitor') @property def health_password_hash(self) -> str: health_auth = self.config.get('routes', {}).get('health_password_hash', '') if not health_auth: raise Exception("Health endpoint password hash not configured") return health_auth @property def dummy_file_path(self) -> str: dummy_file = self.config.get('files', {}).get('dummy_file') if not dummy_file: raise Exception("dummy_file not configured") return dummy_file @property def keys(self) -> Dict[str, KeyConfig]: """Get all configured keys""" return self._keys def get_key_by_route(self, route: str) -> Optional[KeyConfig]: """Find a key configuration by its route""" for key_config in self._keys.values(): if key_config.route == route: return key_config return None def get_key_by_id(self, key_id: str) -> Optional[KeyConfig]: """Get a key configuration by its ID""" return self._keys.get(key_id) @property def ntfy_backends_health(self) -> List[str]: backends = self.config.get('notifications', {}).get('health_backends', []) if not backends: raise Exception("No notification backends configured for health check") return backends @property def ntfy_config_path(self) -> str: return self.config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml') @property def log_level(self) -> str: return self.config.get('notifications', {}).get('log_level', 'WARNING') @property def send_all_logs(self) -> bool: return self.config.get('notifications', {}).get('send_all_logs', True) @property def ntfy_health_message(self) -> str: return self.config.get('notifications', {}).get('health_message', '✅ Emergency access server health check completed')