#!/usr/bin/env python3 import os import sys import logging import time import tempfile from flask import Flask, jsonify, request from werkzeug.security import check_password_hash from werkzeug.serving import WSGIRequestHandler from functools import wraps from config import Config from typing import List, Tuple import base64 import signal # Configure logging with custom handler class NtfyLogHandler(logging.Handler): """Custom logging handler that sends logs to ntfy health backends""" def __init__(self, config_obj): super().__init__() self.config = config_obj def emit(self, record): """Send log record to health backends""" if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs: try: log_message = self.format(record) # Get configured log level or default to WARNING min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING) if record.levelno >= min_level: # Format message with appropriate emoji based on log level emoji = "🚨" if record.levelno >= logging.ERROR else "âš ī¸" if record.levelno >= logging.WARNING else "â„šī¸" title = f"Emergency Access {record.levelname}" message = f"{emoji} {record.name}: {record.getMessage()}" send_ntfy_notification( self.config.ntfy_backends_health, message, title ) except Exception: # Don't fail the application if logging notification fails pass # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/emergency-access.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) app = Flask(__name__) # Global config instance will be initialized in main config = None def require_auth(key_config=None, is_health=False): """Decorator for HTTP Basic Authentication""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): auth = request.authorization if not auth: return jsonify({'error': 'Authentication required'}), 401, { 'WWW-Authenticate': 'Basic realm="Emergency Access"' } # Determine expected credentials if is_health: expected_username = config.health_username expected_password_hash = config.health_password_hash auth_type = "health check" else: expected_username = key_config.username expected_password_hash = key_config.password_hash auth_type = f"key '{key_config.key_id}'" # Verify credentials if (auth.username != expected_username or not Config.verify_password(auth.password, expected_password_hash)): logger.warning(f"Authentication failed for {auth_type}: invalid credentials from {request.remote_addr}") return jsonify({'error': 'Invalid credentials'}), 401, { 'WWW-Authenticate': 'Basic realm="Emergency Access"' } logger.info(f"Authentication successful for {auth_type}") return f(*args, **kwargs) return decorated_function return decorator def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]: """ Send notification using dschep/ntfy with dedicated config file Returns: (success, successful_backends) """ successful_backends = [] max_retries = 3 retry_delay = 1 for backend in backends: success = False last_error = None for attempt in range(max_retries): try: # Import ntfy here to avoid import issues during startup try: import ntfy from ntfy.config import load_config except ImportError: raise Exception("ntfy package not available. Please install with: pip install ntfy") # Load the ntfy config file ntfy_config = load_config(config.ntfy_config_path) ntfy_config["backends"] = [backend] # Add timeout to prevent hanging using threading import threading result = [None] exception = [None] def notify_with_timeout(): try: # Send notification using the backend name from our config file if title: result[0] = ntfy.notify(message, title=title, config=ntfy_config) else: result[0] = ntfy.notify(message, config=ntfy_config, title="Note") except Exception as e: exception[0] = e thread = threading.Thread(target=notify_with_timeout) thread.daemon = True thread.start() thread.join(timeout=15) # 15 second timeout if thread.is_alive(): raise Exception("Notification timeout") if exception[0]: raise exception[0] ret = result[0] if ret == 0: successful_backends.append(backend) logger.info(f"Notification sent successfully via {backend}") success = True break else: raise Exception(f"ntfy returned error code {ret}") except ImportError as e: logger.error(f"ntfy package not available for backend {backend}: {e}") last_error = e break # Don't retry import errors except Exception as e: last_error = e if attempt < max_retries - 1: logger.warning(f"Notification attempt {attempt + 1} failed for {backend}: {str(e)}, retrying...") time.sleep(retry_delay * (attempt + 1)) # Exponential backoff else: logger.error(f"All notification attempts failed for {backend}: {str(e)}") if not success and last_error: # Don't raise exception - just log failure and continue with other backends logger.error(f"Failed to send notification to {backend} after {max_retries} attempts: {str(last_error)}") success = len(successful_backends) > 0 return success, successful_backends def read_file_safely(file_path: str) -> Tuple[bool, str]: """ Safely read file content Returns: (success, content) """ try: if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False, f"File not found: {file_path}" with open(file_path, 'r') as f: content = f.read().strip() if not content: logger.error(f"File is empty: {file_path}") return False, f"File is empty: {file_path}" return True, content except PermissionError: logger.error(f"Permission denied reading file: {file_path}") return False, f"Permission denied: {file_path}" except Exception as e: logger.error(f"Failed to read file {file_path}: {str(e)}") return False, f"Failed to read file: {str(e)}" def create_key_handler(key_config): """Create a key access handler for a specific key configuration""" @require_auth(key_config=key_config) def get_key_part(): """Emergency key access endpoint""" logger.warning(f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'") try: # Send notification first - fail-safe approach notification_success, successful_backends = send_ntfy_notification( key_config.backends, key_config.message, "EMERGENCY ACCESS ALERT" ) if not notification_success: logger.error(f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'") return jsonify({ 'error': 'Notification system failure', 'message': 'Access denied for security reasons' }), 500 logger.info(f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'") # Read key file file_success, content = read_file_safely(key_config.file_path) if not file_success: logger.error(f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}") return jsonify({ 'error': 'File access failure', 'message': 'Unable to retrieve key part' }), 500 logger.warning(f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'") return jsonify({ 'success': True, 'key_id': key_config.key_id, 'key_part': content, 'timestamp': time.time(), 'notified_backends': successful_backends }) except Exception as e: logger.error(f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}") return jsonify({ 'error': 'System error', 'message': 'Internal server error' }), 500 return get_key_part @require_auth(is_health=True) def health_check(): """Health check endpoint that verifies both health monitoring and all key request functionality""" logger.info("Health check requested") try: # Test health notification system health_notification_success, health_backends = send_ntfy_notification( config.ntfy_backends_health, config.ntfy_health_message, "Health Check" ) # Test dummy file access dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path) # Test all key files access (without exposing content) key_files_status = {} all_key_files_ok = True for key_id, key_config in config.keys.items(): key_file_success, key_content = read_file_safely(key_config.file_path) key_files_status[key_id] = key_file_success if not key_file_success: all_key_files_ok = False # Test all key notification backends key_backends_status = {} all_key_backends_ok = True for key_id, key_config in config.keys.items(): try: # Test notification without actually sending backend_test_success = len(key_config.backends) > 0 key_backends_status[key_id] = { 'backends': key_config.backends, 'success': backend_test_success } if not backend_test_success: all_key_backends_ok = False except Exception as e: key_backends_status[key_id] = { 'backends': key_config.backends, 'success': False, 'error': str(e) } all_key_backends_ok = False # Determine overall health status all_systems_ok = (health_notification_success and dummy_file_success and all_key_files_ok and all_key_backends_ok) if not all_systems_ok: error_details = [] if not health_notification_success: error_details.append("health notifications failed") if not dummy_file_success: error_details.append(f"dummy file access failed: {dummy_content}") # Add key-specific errors for key_id, status in key_files_status.items(): if not status: error_details.append(f"key file access failed for '{key_id}'") for key_id, status in key_backends_status.items(): if not status['success']: error_msg = f"key backends failed for '{key_id}'" if 'error' in status: error_msg += f": {status['error']}" error_details.append(error_msg) logger.error(f"Health check failed: {', '.join(error_details)}") return jsonify({ 'status': 'error', 'message': 'System components failed', 'details': error_details, 'health_notifications': health_notification_success, 'dummy_file_access': dummy_file_success, 'key_files_status': key_files_status, 'key_backends_status': key_backends_status }), 500 logger.info("Health check completed successfully - all systems operational") return jsonify({ 'status': 'ok', 'timestamp': time.time(), 'health_backends_notified': health_backends, 'dummy_content_length': len(dummy_content), 'keys_accessible': len(key_files_status), 'key_files_status': key_files_status, 'key_backends_status': key_backends_status, 'emergency_system_ready': True }) except Exception as e: logger.error(f"Health check error: {str(e)}") return jsonify({ 'status': 'error', 'message': 'System error', 'error': str(e) }), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors silently for security""" logger.warning(f"404 attempt: {error}") return jsonify({'error': 'Not found'}), 404 @app.errorhandler(500) def internal_error(error): """Handle internal server errors""" logger.error(f"Internal server error: {error}") return jsonify({'error': 'Internal server error'}), 500 def validate_setup(): """Validate system setup before starting""" logger.info("Validating system setup...") # Check dummy file exists if not os.path.exists(config.dummy_file_path): logger.error(f"Dummy file not found: {config.dummy_file_path}") return False # Test dummy file permissions try: with open(config.dummy_file_path, 'r') as f: f.read(1) except Exception as e: logger.error(f"Dummy file permission test failed: {str(e)}") return False # Validate all key configurations for key_id, key_config in config.keys.items(): logger.info(f"Validating key '{key_id}'...") # Check key file exists if not os.path.exists(key_config.file_path): logger.error(f"Key file not found for '{key_id}': {key_config.file_path}") return False # Test key file permissions try: with open(key_config.file_path, 'r') as f: f.read(1) except Exception as e: logger.error(f"Key file permission test failed for '{key_id}': {str(e)}") return False # Validate backends are configured if not key_config.backends: logger.error(f"No notification backends configured for key '{key_id}'") return False # Test notification system logger.info("Testing notification system...") try: # Test health notification system health_success, _ = send_ntfy_notification( config.ntfy_backends_health[:1], # Test only first backend "System startup test - health notifications", "Emergency Access Startup Test" ) if not health_success: logger.error("Health notification system test failed") return False # Test each key's notification backends for key_id, key_config in config.keys.items(): key_success, _ = send_ntfy_notification( key_config.backends[:1], # Test only first backend f"System startup test - key '{key_id}' notifications", "Emergency Access Startup Test" ) if not key_success: logger.error(f"Key notification system test failed for '{key_id}'") return False except Exception as e: logger.warning(f"Notification test failed, but continuing: {str(e)}") logger.info("System validation completed successfully") return True class ProductionRequestHandler(WSGIRequestHandler): """Custom request handler with improved error handling""" def log_error(self, format, *args): """Override to use our logger""" logger.error(f"HTTP Error: {format % args}") def log_message(self, format, *args): """Override to use our logger""" logger.info(f"HTTP: {format % args}") if __name__ == '__main__': config = None try: # Load configuration config = Config() logger.info("Configuration loaded successfully") # Add ntfy log handler after config is loaded if config.send_all_logs: ntfy_handler = NtfyLogHandler(config) min_level = getattr(logging, config.log_level.upper(), logging.WARNING) ntfy_handler.setLevel(min_level) # Add to root logger to catch all application logs logging.getLogger().addHandler(ntfy_handler) # Validate system setup if not validate_setup(): logger.error("System validation failed, exiting") sys.exit(1) # Add Flask routes dynamically for each key for key_id, key_config in config.keys.items(): handler = create_key_handler(key_config) endpoint_name = f'get_key_part_{key_id}' app.add_url_rule(key_config.route, endpoint_name, handler, methods=['GET']) logger.info(f"Registered authenticated key route for '{key_id}': {key_config.route}") # Add health check route app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET']) logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}") logger.info(f"Health route: {config.health_route}") logger.info(f"Configured {len(config.keys)} key(s)") # Use production-ready server with better error handling try: from waitress import serve logger.info("Using Waitress WSGI server for production") serve( app, host=config.server_host, port=config.server_port, threads=6, connection_limit=100, cleanup_interval=30, channel_timeout=120 ) except ImportError: logger.warning("Waitress not available, falling back to Flask dev server") app.run( host=config.server_host, port=config.server_port, debug=False, threaded=True, request_handler=ProductionRequestHandler ) except KeyboardInterrupt: logger.info("Received keyboard interrupt, shutting down gracefully") except Exception as e: logger.error(f"Failed to start server: {str(e)}") if config and hasattr(config, 'ntfy_backends_health'): try: send_ntfy_notification( config.ntfy_backends_health, f"🚨 Emergency Access server failed to start: {str(e)}", "Emergency Access CRITICAL" ) except Exception: pass sys.exit(1) finally: logger.info("Emergency Access server shutdown complete")