config.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import json
  2. import os
  3. import yaml
  4. import hashlib
  5. import hmac
  6. import secrets
  7. from typing import Dict, List, Any, Optional
  8. class KeyConfig:
  9. """Configuration for a single key"""
  10. def __init__(self, key_id: str, config_data: Dict[str, Any], global_config: Dict[str, Any]):
  11. self.key_id = key_id
  12. self.route = config_data.get('route', f'/emergency-key-{key_id}')
  13. self.file_path = config_data.get('file', '')
  14. self.backends = config_data.get('backends', [])
  15. self.message = config_data.get('message', f'🚨 EMERGENCY: Key {key_id} accessed from server')
  16. self.username = config_data.get('username', f'emergency_{key_id}')
  17. self.password_hash = config_data.get('password_hash', '')
  18. if not self.file_path:
  19. raise Exception(f"File path not configured for key '{key_id}'")
  20. if not self.backends:
  21. raise Exception(f"No notification backends configured for key '{key_id}'")
  22. if not self.password_hash:
  23. raise Exception(f"Password hash not configured for key '{key_id}'. Use generate_password_hash() to create one.")
  24. class Config:
  25. def __init__(self, config_path: str = None):
  26. if config_path is None:
  27. config_path = os.environ.get('EMERGENCY_CONFIG', 'config.json')
  28. self.config_path = config_path
  29. self.config = self._load_config()
  30. self._keys = self._load_keys()
  31. @staticmethod
  32. def generate_password_hash(password: str, iterations: int = 100000) -> str:
  33. """Generate a password hash for use in configuration.
  34. Stored canonical format:
  35. pbkdf2_sha256$<iterations>$<salt>$<hexhash>
  36. Legacy format <salt>:<hex> remains supported for compatibility.
  37. """
  38. salt = secrets.token_hex(16)
  39. derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations)
  40. # Return canonical format including algorithm and iteration count
  41. return f"pbkdf2_sha256${iterations}${salt}${derived.hex()}"
  42. @staticmethod
  43. def verify_password(password: str, password_hash: str) -> bool:
  44. """Verify a password against its hash using constant-time comparison.
  45. Supported formats:
  46. - New canonical format: pbkdf2_sha256$<iterations>$<salt>$<hexhash>
  47. - Legacy format: <salt>:<hexhash>
  48. Returns True only if the password matches the stored hash.
  49. """
  50. try:
  51. # New canonical format uses '$' as separator
  52. if '$' in password_hash:
  53. parts = password_hash.split('$')
  54. # Expecting exactly 4 parts: algo, iterations, salt, hexhash
  55. if len(parts) != 4 or not parts[0].startswith('pbkdf2_sha256'):
  56. return False
  57. _, iterations_s, salt, stored_hex = parts
  58. iterations = int(iterations_s)
  59. derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations)
  60. try:
  61. stored_bytes = bytes.fromhex(stored_hex)
  62. except ValueError:
  63. return False
  64. # Use constant-time comparison
  65. return hmac.compare_digest(derived, stored_bytes)
  66. else:
  67. # Legacy format: salt:hex
  68. salt, stored_hex = password_hash.split(':')
  69. derived = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
  70. try:
  71. stored_bytes = bytes.fromhex(stored_hex)
  72. except ValueError:
  73. return False
  74. return hmac.compare_digest(derived, stored_bytes)
  75. except Exception:
  76. return False
  77. def _load_config(self) -> Dict[str, Any]:
  78. """Load configuration from file"""
  79. try:
  80. with open(self.config_path, 'r') as f:
  81. if self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'):
  82. return yaml.safe_load(f)
  83. else:
  84. return json.load(f)
  85. except FileNotFoundError:
  86. raise Exception(f"Configuration file {self.config_path} not found")
  87. except Exception as e:
  88. raise Exception(f"Failed to load configuration: {str(e)}")
  89. def _load_keys(self) -> Dict[str, KeyConfig]:
  90. """Load key configurations"""
  91. keys = {}
  92. if 'keys' not in self.config:
  93. raise Exception("No keys configured. Configuration must include a 'keys' section")
  94. for key_id, key_config in self.config['keys'].items():
  95. keys[key_id] = KeyConfig(key_id, key_config, self.config)
  96. if not keys:
  97. raise Exception("No valid keys found in configuration")
  98. return keys
  99. @property
  100. def server_host(self) -> str:
  101. return self.config.get('server', {}).get('host', '127.0.0.1')
  102. @property
  103. def server_port(self) -> int:
  104. return self.config.get('server', {}).get('port', 1127)
  105. @property
  106. def health_route(self) -> str:
  107. return self.config.get('routes', {}).get('health_route', '/health-check')
  108. @property
  109. def health_username(self) -> str:
  110. return self.config.get('routes', {}).get('health_username', 'health_monitor')
  111. @property
  112. def health_password_hash(self) -> str:
  113. health_auth = self.config.get('routes', {}).get('health_password_hash', '')
  114. if not health_auth:
  115. raise Exception("Health endpoint password hash not configured")
  116. return health_auth
  117. @property
  118. def dummy_file_path(self) -> str:
  119. dummy_file = self.config.get('files', {}).get('dummy_file')
  120. if not dummy_file:
  121. raise Exception("dummy_file not configured")
  122. return dummy_file
  123. @property
  124. def keys(self) -> Dict[str, KeyConfig]:
  125. """Get all configured keys"""
  126. return self._keys
  127. def get_key_by_route(self, route: str) -> Optional[KeyConfig]:
  128. """Find a key configuration by its route"""
  129. for key_config in self._keys.values():
  130. if key_config.route == route:
  131. return key_config
  132. return None
  133. def get_key_by_id(self, key_id: str) -> Optional[KeyConfig]:
  134. """Get a key configuration by its ID"""
  135. return self._keys.get(key_id)
  136. @property
  137. def ntfy_backends_health(self) -> List[str]:
  138. backends = self.config.get('notifications', {}).get('health_backends', [])
  139. if not backends:
  140. raise Exception("No notification backends configured for health check")
  141. return backends
  142. @property
  143. def ntfy_config_path(self) -> str:
  144. return self.config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml')
  145. @property
  146. def log_level(self) -> str:
  147. return self.config.get('notifications', {}).get('log_level', 'WARNING')
  148. @property
  149. def send_all_logs(self) -> bool:
  150. return self.config.get('notifications', {}).get('send_all_logs', True)
  151. @property
  152. def ntfy_health_message(self) -> str:
  153. return self.config.get('notifications', {}).get('health_message', '✅ Emergency access server health check completed')