| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- import json
- import os
- import yaml
- import hashlib
- import hmac
- import secrets
- from typing import Dict, List, Any, Optional
- class KeyConfig:
- """Configuration for a single key"""
- def __init__(self, key_id: str, config_data: Dict[str, Any], global_config: Dict[str, Any]):
- self.key_id = key_id
- self.route = config_data.get('route', f'/emergency-key-{key_id}')
- self.file_path = config_data.get('file', '')
- self.backends = config_data.get('backends', [])
- self.message = config_data.get('message', f'🚨 EMERGENCY: Key {key_id} accessed from server')
- self.username = config_data.get('username', f'emergency_{key_id}')
- self.password_hash = config_data.get('password_hash', '')
- if not self.file_path:
- raise Exception(f"File path not configured for key '{key_id}'")
- if not self.backends:
- raise Exception(f"No notification backends configured for key '{key_id}'")
- if not self.password_hash:
- raise Exception(f"Password hash not configured for key '{key_id}'. Use generate_password_hash() to create one.")
- class Config:
- def __init__(self, config_path: str = None):
- if config_path is None:
- config_path = os.environ.get('EMERGENCY_CONFIG', 'config.json')
- self.config_path = config_path
- self.config = self._load_config()
- self._keys = self._load_keys()
- @staticmethod
- def generate_password_hash(password: str, iterations: int = 100000) -> str:
- """Generate a password hash for use in configuration.
- Stored canonical format:
- pbkdf2_sha256$<iterations>$<salt>$<hexhash>
- Legacy format <salt>:<hex> remains supported for compatibility.
- """
- salt = secrets.token_hex(16)
- derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations)
- # Return canonical format including algorithm and iteration count
- return f"pbkdf2_sha256${iterations}${salt}${derived.hex()}"
- @staticmethod
- def verify_password(password: str, password_hash: str) -> bool:
- """Verify a password against its hash using constant-time comparison.
- Supported formats:
- - New canonical format: pbkdf2_sha256$<iterations>$<salt>$<hexhash>
- - Legacy format: <salt>:<hexhash>
- Returns True only if the password matches the stored hash.
- """
- try:
- # New canonical format uses '$' as separator
- if '$' in password_hash:
- parts = password_hash.split('$')
- # Expecting exactly 4 parts: algo, iterations, salt, hexhash
- if len(parts) != 4 or not parts[0].startswith('pbkdf2_sha256'):
- return False
- _, iterations_s, salt, stored_hex = parts
- iterations = int(iterations_s)
- derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations)
- try:
- stored_bytes = bytes.fromhex(stored_hex)
- except ValueError:
- return False
- # Use constant-time comparison
- return hmac.compare_digest(derived, stored_bytes)
- else:
- # Legacy format: salt:hex
- salt, stored_hex = password_hash.split(':')
- derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
- try:
- stored_bytes = bytes.fromhex(stored_hex)
- except ValueError:
- return False
- return hmac.compare_digest(derived, stored_bytes)
- except Exception:
- return False
- def _load_config(self) -> Dict[str, Any]:
- """Load configuration from file"""
- try:
- with open(self.config_path, 'r') as f:
- if self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'):
- return yaml.safe_load(f)
- else:
- return json.load(f)
- except FileNotFoundError:
- raise Exception(f"Configuration file {self.config_path} not found")
- except Exception as e:
- raise Exception(f"Failed to load configuration: {str(e)}")
- def _load_keys(self) -> Dict[str, KeyConfig]:
- """Load key configurations"""
- keys = {}
- if 'keys' not in self.config:
- raise Exception("No keys configured. Configuration must include a 'keys' section")
- for key_id, key_config in self.config['keys'].items():
- keys[key_id] = KeyConfig(key_id, key_config, self.config)
- if not keys:
- raise Exception("No valid keys found in configuration")
- return keys
- @property
- def server_host(self) -> str:
- return self.config.get('server', {}).get('host', '127.0.0.1')
- @property
- def server_port(self) -> int:
- return self.config.get('server', {}).get('port', 1127)
- @property
- def health_route(self) -> str:
- return self.config.get('routes', {}).get('health_route', '/health-check')
- @property
- def health_username(self) -> str:
- return self.config.get('routes', {}).get('health_username', 'health_monitor')
- @property
- def health_password_hash(self) -> str:
- health_auth = self.config.get('routes', {}).get('health_password_hash', '')
- if not health_auth:
- raise Exception("Health endpoint password hash not configured")
- return health_auth
- @property
- def dummy_file_path(self) -> str:
- dummy_file = self.config.get('files', {}).get('dummy_file')
- if not dummy_file:
- raise Exception("dummy_file not configured")
- return dummy_file
- @property
- def keys(self) -> Dict[str, KeyConfig]:
- """Get all configured keys"""
- return self._keys
- def get_key_by_route(self, route: str) -> Optional[KeyConfig]:
- """Find a key configuration by its route"""
- for key_config in self._keys.values():
- if key_config.route == route:
- return key_config
- return None
- def get_key_by_id(self, key_id: str) -> Optional[KeyConfig]:
- """Get a key configuration by its ID"""
- return self._keys.get(key_id)
- @property
- def ntfy_backends_health(self) -> List[str]:
- backends = self.config.get('notifications', {}).get('health_backends', [])
- if not backends:
- raise Exception("No notification backends configured for health check")
- return backends
- @property
- def ntfy_config_path(self) -> str:
- return self.config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml')
- @property
- def log_level(self) -> str:
- return self.config.get('notifications', {}).get('log_level', 'WARNING')
- @property
- def send_all_logs(self) -> bool:
- return self.config.get('notifications', {}).get('send_all_logs', True)
- @property
- def ntfy_health_message(self) -> str:
- return self.config.get('notifications', {}).get('health_message', '✅ Emergency access server health check completed')
|