#!/usr/bin/env python3 import os import sys import logging import time import tempfile from flask import Flask, jsonify from config import Config from typing import List, Tuple # Configure logging with custom handler class NtfyLogHandler(logging.Handler): """Custom logging handler that sends logs to ntfy health backends""" def __init__(self, config_obj): super().__init__() self.config = config_obj def emit(self, record): """Send log record to health backends""" if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs: try: log_message = self.format(record) # Get configured log level or default to WARNING min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING) if record.levelno >= min_level: # Format message with appropriate emoji based on log level emoji = "🚨" if record.levelno >= logging.ERROR else "⚠️" if record.levelno >= logging.WARNING else "ℹ️" title = f"Emergency Access {record.levelname}" message = f"{emoji} {record.name}: {record.getMessage()}" send_ntfy_notification( self.config.ntfy_backends_health, message, title ) except Exception: # Don't fail the application if logging notification fails pass # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/emergency-access.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) app = Flask(__name__) # Global config instance will be initialized in main config = None def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]: """ Send notification using dschep/ntfy with dedicated config file Returns: (success, successful_backends) """ successful_backends = [] for backend in backends: try: # Import ntfy here to avoid import issues during startup try: import ntfy from ntfy.config import load_config except ImportError: raise Exception("ntfy package not available. Please install with: pip install ntfy") # Load the ntfy config file ntfy_config = load_config(config.ntfy_config_path) ntfy_config["backends"] = [backend] # Send notification using the backend name from our config file if title: ret = ntfy.notify(message, title=title, config=ntfy_config) else: ret = ntfy.notify(message, config=ntfy_config, title="Note") if ret == 0: successful_backends.append(backend) logger.info(f"Notification sent successfully via {backend}") else: logger.error(f"Failed to send notification via {backend}") raise Exception(f"Failed to send notification via {backend}") except ImportError: logger.error(f"ntfy package not available for backend {backend}") except Exception as e: logger.error(f"Failed to send notification to {backend}: {str(e)}") raise success = len(successful_backends) > 0 return success, successful_backends def read_file_safely(file_path: str) -> Tuple[bool, str]: """ Safely read file content Returns: (success, content) """ try: if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False, f"File not found: {file_path}" with open(file_path, 'r') as f: content = f.read().strip() if not content: logger.error(f"File is empty: {file_path}") return False, f"File is empty: {file_path}" return True, content except PermissionError: logger.error(f"Permission denied reading file: {file_path}") return False, f"Permission denied: {file_path}" except Exception as e: logger.error(f"Failed to read file {file_path}: {str(e)}") return False, f"Failed to read file: {str(e)}" def create_key_handler(key_config): """Create a key access handler for a specific key configuration""" def get_key_part(): """Emergency key access endpoint""" logger.warning(f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'") try: # Send notification first - fail-safe approach notification_success, successful_backends = send_ntfy_notification( key_config.backends, key_config.message, "EMERGENCY ACCESS ALERT" ) if not notification_success: logger.error(f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'") return jsonify({ 'error': 'Notification system failure', 'message': 'Access denied for security reasons' }), 500 logger.info(f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'") # Read key file file_success, content = read_file_safely(key_config.file_path) if not file_success: logger.error(f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}") return jsonify({ 'error': 'File access failure', 'message': 'Unable to retrieve key part' }), 500 logger.warning(f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'") return jsonify({ 'success': True, 'key_id': key_config.key_id, 'key_part': content, 'timestamp': time.time(), 'notified_backends': successful_backends }) except Exception as e: logger.error(f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}") return jsonify({ 'error': 'System error', 'message': 'Internal server error' }), 500 return get_key_part def health_check(): """Health check endpoint that verifies both health monitoring and all key request functionality""" logger.info("Health check requested") try: # Test health notification system health_notification_success, health_backends = send_ntfy_notification( config.ntfy_backends_health, config.ntfy_health_message, "Health Check" ) # Test dummy file access dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path) # Test all key files access (without exposing content) key_files_status = {} all_key_files_ok = True for key_id, key_config in config.keys.items(): key_file_success, key_content = read_file_safely(key_config.file_path) key_files_status[key_id] = key_file_success if not key_file_success: all_key_files_ok = False # Test all key notification backends key_backends_status = {} all_key_backends_ok = True for key_id, key_config in config.keys.items(): try: # Test notification without actually sending backend_test_success = len(key_config.backends) > 0 key_backends_status[key_id] = { 'backends': key_config.backends, 'success': backend_test_success } if not backend_test_success: all_key_backends_ok = False except Exception as e: key_backends_status[key_id] = { 'backends': key_config.backends, 'success': False, 'error': str(e) } all_key_backends_ok = False # Determine overall health status all_systems_ok = (health_notification_success and dummy_file_success and all_key_files_ok and all_key_backends_ok) if not all_systems_ok: error_details = [] if not health_notification_success: error_details.append("health notifications failed") if not dummy_file_success: error_details.append(f"dummy file access failed: {dummy_content}") # Add key-specific errors for key_id, status in key_files_status.items(): if not status: error_details.append(f"key file access failed for '{key_id}'") for key_id, status in key_backends_status.items(): if not status['success']: error_msg = f"key backends failed for '{key_id}'" if 'error' in status: error_msg += f": {status['error']}" error_details.append(error_msg) logger.error(f"Health check failed: {', '.join(error_details)}") return jsonify({ 'status': 'error', 'message': 'System components failed', 'details': error_details, 'health_notifications': health_notification_success, 'dummy_file_access': dummy_file_success, 'key_files_status': key_files_status, 'key_backends_status': key_backends_status }), 500 logger.info("Health check completed successfully - all systems operational") return jsonify({ 'status': 'ok', 'timestamp': time.time(), 'health_backends_notified': health_backends, 'dummy_content_length': len(dummy_content), 'keys_accessible': len(key_files_status), 'key_files_status': key_files_status, 'key_backends_status': key_backends_status, 'emergency_system_ready': True }) except Exception as e: logger.error(f"Health check error: {str(e)}") return jsonify({ 'status': 'error', 'message': 'System error', 'error': str(e) }), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors silently for security""" logger.warning(f"404 attempt: {error}") return jsonify({'error': 'Not found'}), 404 @app.errorhandler(500) def internal_error(error): """Handle internal server errors""" logger.error(f"Internal server error: {error}") return jsonify({'error': 'Internal server error'}), 500 def validate_setup(): """Validate system setup before starting""" logger.info("Validating system setup...") # Check dummy file exists if not os.path.exists(config.dummy_file_path): logger.error(f"Dummy file not found: {config.dummy_file_path}") return False # Test dummy file permissions try: with open(config.dummy_file_path, 'r') as f: f.read(1) except Exception as e: logger.error(f"Dummy file permission test failed: {str(e)}") return False # Validate all key configurations for key_id, key_config in config.keys.items(): logger.info(f"Validating key '{key_id}'...") # Check key file exists if not os.path.exists(key_config.file_path): logger.error(f"Key file not found for '{key_id}': {key_config.file_path}") return False # Test key file permissions try: with open(key_config.file_path, 'r') as f: f.read(1) except Exception as e: logger.error(f"Key file permission test failed for '{key_id}': {str(e)}") return False # Validate backends are configured if not key_config.backends: logger.error(f"No notification backends configured for key '{key_id}'") return False # Test notification system logger.info("Testing notification system...") try: # Test health notification system health_success, _ = send_ntfy_notification( config.ntfy_backends_health[:1], # Test only first backend "System startup test - health notifications", "Emergency Access Startup Test" ) if not health_success: logger.error("Health notification system test failed") return False # Test each key's notification backends for key_id, key_config in config.keys.items(): key_success, _ = send_ntfy_notification( key_config.backends[:1], # Test only first backend f"System startup test - key '{key_id}' notifications", "Emergency Access Startup Test" ) if not key_success: logger.error(f"Key notification system test failed for '{key_id}'") return False except Exception as e: logger.warning(f"Notification test failed, but continuing: {str(e)}") logger.info("System validation completed successfully") return True if __name__ == '__main__': try: # Load configuration config = Config() logger.info("Configuration loaded successfully") # Add ntfy log handler after config is loaded if config.send_all_logs: ntfy_handler = NtfyLogHandler(config) min_level = getattr(logging, config.log_level.upper(), logging.WARNING) ntfy_handler.setLevel(min_level) # Add to root logger to catch all application logs logging.getLogger().addHandler(ntfy_handler) # Validate system setup if not validate_setup(): logger.error("System validation failed, exiting") sys.exit(1) # Add Flask routes dynamically for each key for key_id, key_config in config.keys.items(): handler = create_key_handler(key_config) endpoint_name = f'get_key_part_{key_id}' app.add_url_rule(key_config.route, endpoint_name, handler, methods=['GET']) logger.info(f"Registered key route for '{key_id}': {key_config.route}") # Add health check route app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET']) logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}") logger.info(f"Health route: {config.health_route}") logger.info(f"Configured {len(config.keys)} key(s)") # Run the server on local port for Caddy reverse proxy app.run( host=config.server_host, port=config.server_port, debug=False, threaded=True ) except Exception as e: logger.error(f"Failed to start server: {str(e)}") sys.exit(1)