main.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import logging
  5. import time
  6. import tempfile
  7. from flask import Flask, jsonify
  8. from config import Config
  9. from typing import List, Tuple
  10. # Configure logging
  11. logging.basicConfig(
  12. level=logging.INFO,
  13. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  14. handlers=[
  15. logging.FileHandler('/var/log/emergency-access.log'),
  16. logging.StreamHandler()
  17. ]
  18. )
  19. logger = logging.getLogger(__name__)
  20. app = Flask(__name__)
  21. # Global config instance
  22. config = None
  23. def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]:
  24. """
  25. Send notification using dschep/ntfy with global config
  26. Returns: (success, successful_backends)
  27. """
  28. successful_backends = []
  29. for backend in backends:
  30. try:
  31. # Import ntfy here to avoid import issues during startup
  32. import ntfy
  33. # Send notification using the backend name from global ntfy config
  34. if title:
  35. ntfy.notify(message, title=title, backend=backend)
  36. else:
  37. ntfy.notify(message, backend=backend)
  38. successful_backends.append(backend)
  39. logger.info(f"Notification sent successfully via {backend}")
  40. except ImportError:
  41. logger.error(f"ntfy package not available for backend {backend}")
  42. except Exception as e:
  43. logger.error(f"Failed to send notification to {backend}: {str(e)}")
  44. success = len(successful_backends) > 0
  45. return success, successful_backends
  46. def read_file_safely(file_path: str) -> Tuple[bool, str]:
  47. """
  48. Safely read file content
  49. Returns: (success, content)
  50. """
  51. try:
  52. if not os.path.exists(file_path):
  53. logger.error(f"File not found: {file_path}")
  54. return False, f"File not found: {file_path}"
  55. with open(file_path, 'r') as f:
  56. content = f.read().strip()
  57. if not content:
  58. logger.error(f"File is empty: {file_path}")
  59. return False, f"File is empty: {file_path}"
  60. return True, content
  61. except PermissionError:
  62. logger.error(f"Permission denied reading file: {file_path}")
  63. return False, f"Permission denied: {file_path}"
  64. except Exception as e:
  65. logger.error(f"Failed to read file {file_path}: {str(e)}")
  66. return False, f"Failed to read file: {str(e)}"
  67. def get_key_part():
  68. """Emergency key access endpoint"""
  69. logger.warning("EMERGENCY: Key access attempt detected")
  70. try:
  71. # Send notification first - fail-safe approach
  72. notification_success, successful_backends = send_ntfy_notification(
  73. config.ntfy_backends_key,
  74. config.ntfy_key_message,
  75. "EMERGENCY ACCESS ALERT"
  76. )
  77. if not notification_success:
  78. logger.error("CRITICAL: Failed to send notifications to any backend")
  79. return jsonify({
  80. 'error': 'Notification system failure',
  81. 'message': 'Access denied for security reasons'
  82. }), 500
  83. logger.info(f"Notifications sent successfully to: {successful_backends}")
  84. # Read key file
  85. file_success, content = read_file_safely(config.key_file_path)
  86. if not file_success:
  87. logger.error(f"CRITICAL: Failed to read key file: {content}")
  88. return jsonify({
  89. 'error': 'File access failure',
  90. 'message': 'Unable to retrieve key part'
  91. }), 500
  92. logger.warning("EMERGENCY: Key part successfully retrieved and sent")
  93. return jsonify({
  94. 'success': True,
  95. 'key_part': content,
  96. 'timestamp': time.time(),
  97. 'notified_backends': successful_backends
  98. })
  99. except Exception as e:
  100. logger.error(f"CRITICAL: Unexpected error in key access: {str(e)}")
  101. return jsonify({
  102. 'error': 'System error',
  103. 'message': 'Internal server error'
  104. }), 500
  105. def health_check():
  106. """Health check endpoint with dummy file access"""
  107. logger.info("Health check requested")
  108. try:
  109. # Send notification
  110. notification_success, successful_backends = send_ntfy_notification(
  111. config.ntfy_backends_health,
  112. config.ntfy_health_message,
  113. "Health Check"
  114. )
  115. if not notification_success:
  116. logger.error("Health check notification failed")
  117. return jsonify({
  118. 'status': 'error',
  119. 'message': 'Notification system failure'
  120. }), 500
  121. # Read dummy file
  122. file_success, content = read_file_safely(config.dummy_file_path)
  123. if not file_success:
  124. logger.error(f"Health check file read failed: {content}")
  125. return jsonify({
  126. 'status': 'error',
  127. 'message': 'File system failure'
  128. }), 500
  129. logger.info("Health check completed successfully")
  130. return jsonify({
  131. 'status': 'ok',
  132. 'timestamp': time.time(),
  133. 'notified_backends': successful_backends,
  134. 'dummy_content_length': len(content)
  135. })
  136. except Exception as e:
  137. logger.error(f"Health check error: {str(e)}")
  138. return jsonify({
  139. 'status': 'error',
  140. 'message': 'System error'
  141. }), 500
  142. @app.errorhandler(404)
  143. def not_found(error):
  144. """Handle 404 errors silently for security"""
  145. logger.warning(f"404 attempt: {error}")
  146. return jsonify({'error': 'Not found'}), 404
  147. @app.errorhandler(500)
  148. def internal_error(error):
  149. """Handle internal server errors"""
  150. logger.error(f"Internal server error: {error}")
  151. return jsonify({'error': 'Internal server error'}), 500
  152. def validate_setup():
  153. """Validate system setup before starting"""
  154. logger.info("Validating system setup...")
  155. # Check config files exist
  156. if not os.path.exists(config.key_file_path):
  157. logger.error(f"Key file not found: {config.key_file_path}")
  158. return False
  159. if not os.path.exists(config.dummy_file_path):
  160. logger.error(f"Dummy file not found: {config.dummy_file_path}")
  161. return False
  162. # Test file permissions
  163. try:
  164. with open(config.key_file_path, 'r') as f:
  165. f.read(1)
  166. with open(config.dummy_file_path, 'r') as f:
  167. f.read(1)
  168. except Exception as e:
  169. logger.error(f"File permission test failed: {str(e)}")
  170. return False
  171. # Test notification system
  172. logger.info("Testing notification system...")
  173. try:
  174. key_success, _ = send_ntfy_notification(
  175. config.ntfy_backends_key[:1], # Test only first backend
  176. "System startup test - key notifications",
  177. "Emergency Access Startup Test"
  178. )
  179. health_success, _ = send_ntfy_notification(
  180. config.ntfy_backends_health[:1], # Test only first backend
  181. "System startup test - health notifications",
  182. "Emergency Access Startup Test"
  183. )
  184. if not key_success:
  185. logger.error("Key notification system test failed")
  186. return False
  187. if not health_success:
  188. logger.error("Health notification system test failed")
  189. return False
  190. except Exception as e:
  191. logger.warning(f"Notification test failed, but continuing: {str(e)}")
  192. logger.info("System validation completed successfully")
  193. return True
  194. if __name__ == '__main__':
  195. try:
  196. # Load configuration
  197. config = Config()
  198. logger.info("Configuration loaded successfully")
  199. # Validate system setup
  200. if not validate_setup():
  201. logger.error("System validation failed, exiting")
  202. sys.exit(1)
  203. # Add Flask routes with config values
  204. app.add_url_rule(config.key_route, 'get_key_part', get_key_part, methods=['GET'])
  205. app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET'])
  206. logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}")
  207. logger.info(f"Key route: {config.key_route}")
  208. logger.info(f"Health route: {config.health_route}")
  209. # Run the server on local port for Caddy reverse proxy
  210. app.run(
  211. host=config.server_host,
  212. port=config.server_port,
  213. debug=False,
  214. threaded=True
  215. )
  216. except Exception as e:
  217. logger.error(f"Failed to start server: {str(e)}")
  218. sys.exit(1)