main.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import logging
  5. import time
  6. import tempfile
  7. from flask import Flask, jsonify
  8. from config import Config
  9. from typing import List, Tuple
  10. # Configure logging with custom handler
  11. class NtfyLogHandler(logging.Handler):
  12. """Custom logging handler that sends logs to ntfy health backends"""
  13. def __init__(self, config_obj):
  14. super().__init__()
  15. self.config = config_obj
  16. def emit(self, record):
  17. """Send log record to health backends"""
  18. if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs:
  19. try:
  20. log_message = self.format(record)
  21. # Get configured log level or default to WARNING
  22. min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING)
  23. if record.levelno >= min_level:
  24. # Format message with appropriate emoji based on log level
  25. emoji = "🚨" if record.levelno >= logging.ERROR else "⚠️" if record.levelno >= logging.WARNING else "ℹ️"
  26. title = f"Emergency Access {record.levelname}"
  27. message = f"{emoji} {record.name}: {record.getMessage()}"
  28. send_ntfy_notification(
  29. self.config.ntfy_backends_health,
  30. message,
  31. title
  32. )
  33. except Exception:
  34. # Don't fail the application if logging notification fails
  35. pass
  36. # Configure logging
  37. logging.basicConfig(
  38. level=logging.INFO,
  39. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  40. handlers=[
  41. logging.FileHandler('/var/log/emergency-access.log'),
  42. logging.StreamHandler()
  43. ]
  44. )
  45. logger = logging.getLogger(__name__)
  46. app = Flask(__name__)
  47. # Global config instance
  48. config = None
  49. def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]:
  50. """
  51. Send notification using dschep/ntfy with dedicated config file
  52. Returns: (success, successful_backends)
  53. """
  54. successful_backends = []
  55. for backend in backends:
  56. try:
  57. # Import ntfy here to avoid import issues during startup
  58. import ntfy
  59. from ntfy.config import load_config
  60. # Load the ntfy config file
  61. ntfy_config = load_config(config.ntfy_config_path)
  62. ntfy_config["backends"] = [backend]
  63. # Send notification using the backend name from our config file
  64. if title:
  65. ret = ntfy.notify(message, title=title, config=ntfy_config)
  66. else:
  67. ret = ntfy.notify(message, config=ntfy_config, title="Note")
  68. if ret == 0:
  69. successful_backends.append(backend)
  70. logger.info(f"Notification sent successfully via {backend}")
  71. else:
  72. logger.error(f"Failed to send notification via {backend}")
  73. raise Exception(f"Failed to send notification via {backend}")
  74. except ImportError:
  75. logger.error(f"ntfy package not available for backend {backend}")
  76. except Exception as e:
  77. logger.error(f"Failed to send notification to {backend}: {str(e)}")
  78. raise
  79. success = len(successful_backends) > 0
  80. return success, successful_backends
  81. def read_file_safely(file_path: str) -> Tuple[bool, str]:
  82. """
  83. Safely read file content
  84. Returns: (success, content)
  85. """
  86. try:
  87. if not os.path.exists(file_path):
  88. logger.error(f"File not found: {file_path}")
  89. return False, f"File not found: {file_path}"
  90. with open(file_path, 'r') as f:
  91. content = f.read().strip()
  92. if not content:
  93. logger.error(f"File is empty: {file_path}")
  94. return False, f"File is empty: {file_path}"
  95. return True, content
  96. except PermissionError:
  97. logger.error(f"Permission denied reading file: {file_path}")
  98. return False, f"Permission denied: {file_path}"
  99. except Exception as e:
  100. logger.error(f"Failed to read file {file_path}: {str(e)}")
  101. return False, f"Failed to read file: {str(e)}"
  102. def get_key_part():
  103. """Emergency key access endpoint"""
  104. logger.warning("EMERGENCY: Key access attempt detected")
  105. try:
  106. # Send notification first - fail-safe approach
  107. notification_success, successful_backends = send_ntfy_notification(
  108. config.ntfy_backends_key,
  109. config.ntfy_key_message,
  110. "EMERGENCY ACCESS ALERT"
  111. )
  112. if not notification_success:
  113. logger.error("CRITICAL: Failed to send notifications to any backend")
  114. return jsonify({
  115. 'error': 'Notification system failure',
  116. 'message': 'Access denied for security reasons'
  117. }), 500
  118. logger.info(f"Notifications sent successfully to: {successful_backends}")
  119. # Read key file
  120. file_success, content = read_file_safely(config.key_file_path)
  121. if not file_success:
  122. logger.error(f"CRITICAL: Failed to read key file: {content}")
  123. return jsonify({
  124. 'error': 'File access failure',
  125. 'message': 'Unable to retrieve key part'
  126. }), 500
  127. logger.warning("EMERGENCY: Key part successfully retrieved and sent")
  128. return jsonify({
  129. 'success': True,
  130. 'key_part': content,
  131. 'timestamp': time.time(),
  132. 'notified_backends': successful_backends
  133. })
  134. except Exception as e:
  135. logger.error(f"CRITICAL: Unexpected error in key access: {str(e)}")
  136. return jsonify({
  137. 'error': 'System error',
  138. 'message': 'Internal server error'
  139. }), 500
  140. def health_check():
  141. """Health check endpoint that verifies both health monitoring and key request functionality"""
  142. logger.info("Health check requested")
  143. try:
  144. # Test health notification system
  145. health_notification_success, health_backends = send_ntfy_notification(
  146. config.ntfy_backends_health,
  147. config.ntfy_health_message,
  148. "Health Check"
  149. )
  150. # Test key notification system (without triggering emergency alert)
  151. key_test_message = "🔧 Emergency access system health verification - key notification test"
  152. key_notification_success, key_backends = send_ntfy_notification(
  153. config.ntfy_backends_key,
  154. key_test_message,
  155. "System Health Check"
  156. )
  157. # Test dummy file access
  158. dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path)
  159. # Test actual key file access (without exposing content)
  160. key_file_success, key_content = read_file_safely(config.key_file_path)
  161. # Determine overall health status
  162. all_systems_ok = (health_notification_success and key_notification_success and
  163. dummy_file_success and key_file_success)
  164. if not all_systems_ok:
  165. error_details = []
  166. if not health_notification_success:
  167. error_details.append("health notifications failed")
  168. if not key_notification_success:
  169. error_details.append("key notifications failed")
  170. if not dummy_file_success:
  171. error_details.append(f"dummy file access failed: {dummy_content}")
  172. if not key_file_success:
  173. error_details.append(f"key file access failed: {key_content}")
  174. logger.error(f"Health check failed: {', '.join(error_details)}")
  175. return jsonify({
  176. 'status': 'error',
  177. 'message': 'System components failed',
  178. 'details': error_details,
  179. 'health_notifications': health_notification_success,
  180. 'key_notifications': key_notification_success,
  181. 'dummy_file_access': dummy_file_success,
  182. 'key_file_access': key_file_success
  183. }), 500
  184. logger.info("Health check completed successfully - all systems operational")
  185. return jsonify({
  186. 'status': 'ok',
  187. 'timestamp': time.time(),
  188. 'health_backends_notified': health_backends,
  189. 'key_backends_tested': key_backends,
  190. 'dummy_content_length': len(dummy_content),
  191. 'key_file_accessible': True,
  192. 'emergency_system_ready': True
  193. })
  194. except Exception as e:
  195. logger.error(f"Health check error: {str(e)}")
  196. return jsonify({
  197. 'status': 'error',
  198. 'message': 'System error',
  199. 'error': str(e)
  200. }), 500
  201. @app.errorhandler(404)
  202. def not_found(error):
  203. """Handle 404 errors silently for security"""
  204. logger.warning(f"404 attempt: {error}")
  205. return jsonify({'error': 'Not found'}), 404
  206. @app.errorhandler(500)
  207. def internal_error(error):
  208. """Handle internal server errors"""
  209. logger.error(f"Internal server error: {error}")
  210. return jsonify({'error': 'Internal server error'}), 500
  211. def validate_setup():
  212. """Validate system setup before starting"""
  213. logger.info("Validating system setup...")
  214. # Check config files exist
  215. if not os.path.exists(config.key_file_path):
  216. logger.error(f"Key file not found: {config.key_file_path}")
  217. return False
  218. if not os.path.exists(config.dummy_file_path):
  219. logger.error(f"Dummy file not found: {config.dummy_file_path}")
  220. return False
  221. # Test file permissions
  222. try:
  223. with open(config.key_file_path, 'r') as f:
  224. f.read(1)
  225. with open(config.dummy_file_path, 'r') as f:
  226. f.read(1)
  227. except Exception as e:
  228. logger.error(f"File permission test failed: {str(e)}")
  229. return False
  230. # Test notification system
  231. logger.info("Testing notification system...")
  232. try:
  233. key_success, _ = send_ntfy_notification(
  234. config.ntfy_backends_key[:1], # Test only first backend
  235. "System startup test - key notifications",
  236. "Emergency Access Startup Test"
  237. )
  238. health_success, _ = send_ntfy_notification(
  239. config.ntfy_backends_health[:1], # Test only first backend
  240. "System startup test - health notifications",
  241. "Emergency Access Startup Test"
  242. )
  243. if not key_success:
  244. logger.error("Key notification system test failed")
  245. return False
  246. if not health_success:
  247. logger.error("Health notification system test failed")
  248. return False
  249. except Exception as e:
  250. logger.warning(f"Notification test failed, but continuing: {str(e)}")
  251. logger.info("System validation completed successfully")
  252. return True
  253. if __name__ == '__main__':
  254. try:
  255. # Load configuration
  256. config = Config()
  257. logger.info("Configuration loaded successfully")
  258. # Add ntfy log handler after config is loaded
  259. if config.send_all_logs:
  260. ntfy_handler = NtfyLogHandler(config)
  261. min_level = getattr(logging, config.log_level.upper(), logging.WARNING)
  262. ntfy_handler.setLevel(min_level)
  263. # Add to root logger to catch all application logs
  264. logging.getLogger().addHandler(ntfy_handler)
  265. # Validate system setup
  266. if not validate_setup():
  267. logger.error("System validation failed, exiting")
  268. sys.exit(1)
  269. # Add Flask routes with config values
  270. app.add_url_rule(config.key_route, 'get_key_part', get_key_part, methods=['GET'])
  271. app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET'])
  272. logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}")
  273. logger.info(f"Key route: {config.key_route}")
  274. logger.info(f"Health route: {config.health_route}")
  275. # Run the server on local port for Caddy reverse proxy
  276. app.run(
  277. host=config.server_host,
  278. port=config.server_port,
  279. debug=False,
  280. threaded=True
  281. )
  282. except Exception as e:
  283. logger.error(f"Failed to start server: {str(e)}")
  284. sys.exit(1)