config.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import json
  2. import os
  3. import yaml
  4. import hashlib
  5. import secrets
  6. from typing import Dict, List, Any, Optional
  7. class KeyConfig:
  8. """Configuration for a single key"""
  9. def __init__(self, key_id: str, config_data: Dict[str, Any], global_config: Dict[str, Any]):
  10. self.key_id = key_id
  11. self.route = config_data.get('route', f'/emergency-key-{key_id}')
  12. self.file_path = config_data.get('file', '')
  13. self.backends = config_data.get('backends', [])
  14. self.message = config_data.get('message', f'🚨 EMERGENCY: Key {key_id} accessed from server')
  15. self.username = config_data.get('username', f'emergency_{key_id}')
  16. self.password_hash = config_data.get('password_hash', '')
  17. if not self.file_path:
  18. raise Exception(f"File path not configured for key '{key_id}'")
  19. if not self.backends:
  20. raise Exception(f"No notification backends configured for key '{key_id}'")
  21. if not self.password_hash:
  22. raise Exception(f"Password hash not configured for key '{key_id}'. Use generate_password_hash() to create one.")
  23. class Config:
  24. def __init__(self, config_path: str = None):
  25. if config_path is None:
  26. config_path = os.environ.get('EMERGENCY_CONFIG', 'config.json')
  27. self.config_path = config_path
  28. self.config = self._load_config()
  29. self._keys = self._load_keys()
  30. @staticmethod
  31. def generate_password_hash(password: str) -> str:
  32. """Generate a password hash for use in configuration"""
  33. salt = secrets.token_hex(16)
  34. password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
  35. return f"{salt}:{password_hash.hex()}"
  36. @staticmethod
  37. def verify_password(password: str, password_hash: str) -> bool:
  38. """Verify a password against its hash"""
  39. try:
  40. salt, stored_hash = password_hash.split(':')
  41. password_hash_computed = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
  42. return password_hash_computed.hex() == stored_hash
  43. except Exception:
  44. return False
  45. def _load_config(self) -> Dict[str, Any]:
  46. """Load configuration from file"""
  47. try:
  48. with open(self.config_path, 'r') as f:
  49. if self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'):
  50. return yaml.safe_load(f)
  51. else:
  52. return json.load(f)
  53. except FileNotFoundError:
  54. raise Exception(f"Configuration file {self.config_path} not found")
  55. except Exception as e:
  56. raise Exception(f"Failed to load configuration: {str(e)}")
  57. def _load_keys(self) -> Dict[str, KeyConfig]:
  58. """Load key configurations"""
  59. keys = {}
  60. if 'keys' not in self.config:
  61. raise Exception("No keys configured. Configuration must include a 'keys' section")
  62. for key_id, key_config in self.config['keys'].items():
  63. keys[key_id] = KeyConfig(key_id, key_config, self.config)
  64. if not keys:
  65. raise Exception("No valid keys found in configuration")
  66. return keys
  67. @property
  68. def server_host(self) -> str:
  69. return self.config.get('server', {}).get('host', '127.0.0.1')
  70. @property
  71. def server_port(self) -> int:
  72. return self.config.get('server', {}).get('port', 1127)
  73. @property
  74. def health_route(self) -> str:
  75. return self.config.get('routes', {}).get('health_route', '/health-check')
  76. @property
  77. def health_username(self) -> str:
  78. return self.config.get('routes', {}).get('health_username', 'health_monitor')
  79. @property
  80. def health_password_hash(self) -> str:
  81. health_auth = self.config.get('routes', {}).get('health_password_hash', '')
  82. if not health_auth:
  83. raise Exception("Health endpoint password hash not configured")
  84. return health_auth
  85. @property
  86. def dummy_file_path(self) -> str:
  87. dummy_file = self.config.get('files', {}).get('dummy_file')
  88. if not dummy_file:
  89. raise Exception("dummy_file not configured")
  90. return dummy_file
  91. @property
  92. def keys(self) -> Dict[str, KeyConfig]:
  93. """Get all configured keys"""
  94. return self._keys
  95. def get_key_by_route(self, route: str) -> Optional[KeyConfig]:
  96. """Find a key configuration by its route"""
  97. for key_config in self._keys.values():
  98. if key_config.route == route:
  99. return key_config
  100. return None
  101. def get_key_by_id(self, key_id: str) -> Optional[KeyConfig]:
  102. """Get a key configuration by its ID"""
  103. return self._keys.get(key_id)
  104. @property
  105. def ntfy_backends_health(self) -> List[str]:
  106. backends = self.config.get('notifications', {}).get('health_backends', [])
  107. if not backends:
  108. raise Exception("No notification backends configured for health check")
  109. return backends
  110. @property
  111. def ntfy_config_path(self) -> str:
  112. return self.config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml')
  113. @property
  114. def log_level(self) -> str:
  115. return self.config.get('notifications', {}).get('log_level', 'WARNING')
  116. @property
  117. def send_all_logs(self) -> bool:
  118. return self.config.get('notifications', {}).get('send_all_logs', True)
  119. @property
  120. def ntfy_health_message(self) -> str:
  121. return self.config.get('notifications', {}).get('health_message', '✅ Emergency access server health check completed')