import json import os import yaml import hashlib import secrets from typing import Dict, List, Any, Optional class KeyConfig: """Configuration for a single key""" def __init__(self, key_id: str, config_data: Dict[str, Any], global_config: Dict[str, Any]): self.key_id = key_id self.route = config_data.get('route', f'/emergency-key-{key_id}') self.file_path = config_data.get('file', '') self.backends = config_data.get('backends', []) self.message = config_data.get('message', f'🚨 EMERGENCY: Key {key_id} accessed from server') self.username = config_data.get('username', f'emergency_{key_id}') self.password_hash = config_data.get('password_hash', '') if not self.file_path: raise Exception(f"File path not configured for key '{key_id}'") if not self.backends: raise Exception(f"No notification backends configured for key '{key_id}'") if not self.password_hash: raise Exception(f"Password hash not configured for key '{key_id}'. Use generate_password_hash() to create one.") class Config: def __init__(self, config_path: str = None): if config_path is None: config_path = os.environ.get('EMERGENCY_CONFIG', 'config.json') self.config_path = config_path self.config = self._load_config() self._keys = self._load_keys() @staticmethod def generate_password_hash(password: str) -> str: """Generate a password hash for use in configuration""" salt = secrets.token_hex(16) password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return f"{salt}:{password_hash.hex()}" @staticmethod def verify_password(password: str, password_hash: str) -> bool: """Verify a password against its hash""" try: salt, stored_hash = password_hash.split(':') password_hash_computed = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return password_hash_computed.hex() == stored_hash except Exception: return False def _load_config(self) -> Dict[str, Any]: """Load configuration from file""" try: with open(self.config_path, 'r') as f: if self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'): return yaml.safe_load(f) else: return json.load(f) except FileNotFoundError: raise Exception(f"Configuration file {self.config_path} not found") except Exception as e: raise Exception(f"Failed to load configuration: {str(e)}") def _load_keys(self) -> Dict[str, KeyConfig]: """Load key configurations""" keys = {} if 'keys' not in self.config: raise Exception("No keys configured. Configuration must include a 'keys' section") for key_id, key_config in self.config['keys'].items(): keys[key_id] = KeyConfig(key_id, key_config, self.config) if not keys: raise Exception("No valid keys found in configuration") return keys @property def server_host(self) -> str: return self.config.get('server', {}).get('host', '127.0.0.1') @property def server_port(self) -> int: return self.config.get('server', {}).get('port', 1127) @property def health_route(self) -> str: return self.config.get('routes', {}).get('health_route', '/health-check') @property def health_username(self) -> str: return self.config.get('routes', {}).get('health_username', 'health_monitor') @property def health_password_hash(self) -> str: health_auth = self.config.get('routes', {}).get('health_password_hash', '') if not health_auth: raise Exception("Health endpoint password hash not configured") return health_auth @property def dummy_file_path(self) -> str: dummy_file = self.config.get('files', {}).get('dummy_file') if not dummy_file: raise Exception("dummy_file not configured") return dummy_file @property def keys(self) -> Dict[str, KeyConfig]: """Get all configured keys""" return self._keys def get_key_by_route(self, route: str) -> Optional[KeyConfig]: """Find a key configuration by its route""" for key_config in self._keys.values(): if key_config.route == route: return key_config return None def get_key_by_id(self, key_id: str) -> Optional[KeyConfig]: """Get a key configuration by its ID""" return self._keys.get(key_id) @property def ntfy_backends_health(self) -> List[str]: backends = self.config.get('notifications', {}).get('health_backends', []) if not backends: raise Exception("No notification backends configured for health check") return backends @property def ntfy_config_path(self) -> str: return self.config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml') @property def log_level(self) -> str: return self.config.get('notifications', {}).get('log_level', 'WARNING') @property def send_all_logs(self) -> bool: return self.config.get('notifications', {}).get('send_all_logs', True) @property def ntfy_health_message(self) -> str: return self.config.get('notifications', {}).get('health_message', '✅ Emergency access server health check completed')