#!/usr/bin/env python3 import os import sys import logging import time import tempfile from flask import Flask, jsonify from config import Config from typing import List, Tuple # Configure logging with custom handler class NtfyLogHandler(logging.Handler): """Custom logging handler that sends logs to ntfy health backends""" def __init__(self, config_obj): super().__init__() self.config = config_obj def emit(self, record): """Send log record to health backends""" if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs: try: log_message = self.format(record) # Get configured log level or default to WARNING min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING) if record.levelno >= min_level: # Format message with appropriate emoji based on log level emoji = "🚨" if record.levelno >= logging.ERROR else "⚠️" if record.levelno >= logging.WARNING else "ℹ️" title = f"Emergency Access {record.levelname}" message = f"{emoji} {record.name}: {record.getMessage()}" send_ntfy_notification( self.config.ntfy_backends_health, message, title ) except Exception: # Don't fail the application if logging notification fails pass # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/emergency-access.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) app = Flask(__name__) # Global config instance config = None def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]: """ Send notification using dschep/ntfy with dedicated config file Returns: (success, successful_backends) """ successful_backends = [] for backend in backends: try: # Import ntfy here to avoid import issues during startup import ntfy # Load the ntfy config file ntfy_config = ntfy.load_config(config.ntfy_config_path) # Send notification using the backend name from our config file if title: ntfy.notify(message, title=title, backend=backend, config=ntfy_config) else: ntfy.notify(message, backend=backend, config=ntfy_config) successful_backends.append(backend) logger.info(f"Notification sent successfully via {backend}") except ImportError: logger.error(f"ntfy package not available for backend {backend}") except Exception as e: logger.error(f"Failed to send notification to {backend}: {str(e)}") success = len(successful_backends) > 0 return success, successful_backends def read_file_safely(file_path: str) -> Tuple[bool, str]: """ Safely read file content Returns: (success, content) """ try: if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False, f"File not found: {file_path}" with open(file_path, 'r') as f: content = f.read().strip() if not content: logger.error(f"File is empty: {file_path}") return False, f"File is empty: {file_path}" return True, content except PermissionError: logger.error(f"Permission denied reading file: {file_path}") return False, f"Permission denied: {file_path}" except Exception as e: logger.error(f"Failed to read file {file_path}: {str(e)}") return False, f"Failed to read file: {str(e)}" def get_key_part(): """Emergency key access endpoint""" logger.warning("EMERGENCY: Key access attempt detected") try: # Send notification first - fail-safe approach notification_success, successful_backends = send_ntfy_notification( config.ntfy_backends_key, config.ntfy_key_message, "EMERGENCY ACCESS ALERT" ) if not notification_success: logger.error("CRITICAL: Failed to send notifications to any backend") return jsonify({ 'error': 'Notification system failure', 'message': 'Access denied for security reasons' }), 500 logger.info(f"Notifications sent successfully to: {successful_backends}") # Read key file file_success, content = read_file_safely(config.key_file_path) if not file_success: logger.error(f"CRITICAL: Failed to read key file: {content}") return jsonify({ 'error': 'File access failure', 'message': 'Unable to retrieve key part' }), 500 logger.warning("EMERGENCY: Key part successfully retrieved and sent") return jsonify({ 'success': True, 'key_part': content, 'timestamp': time.time(), 'notified_backends': successful_backends }) except Exception as e: logger.error(f"CRITICAL: Unexpected error in key access: {str(e)}") return jsonify({ 'error': 'System error', 'message': 'Internal server error' }), 500 def health_check(): """Health check endpoint with dummy file access""" logger.info("Health check requested") try: # Send notification notification_success, successful_backends = send_ntfy_notification( config.ntfy_backends_health, config.ntfy_health_message, "Health Check" ) if not notification_success: logger.error("Health check notification failed") return jsonify({ 'status': 'error', 'message': 'Notification system failure' }), 500 # Read dummy file file_success, content = read_file_safely(config.dummy_file_path) if not file_success: logger.error(f"Health check file read failed: {content}") return jsonify({ 'status': 'error', 'message': 'File system failure' }), 500 logger.info("Health check completed successfully") return jsonify({ 'status': 'ok', 'timestamp': time.time(), 'notified_backends': successful_backends, 'dummy_content_length': len(content) }) except Exception as e: logger.error(f"Health check error: {str(e)}") return jsonify({ 'status': 'error', 'message': 'System error' }), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors silently for security""" logger.warning(f"404 attempt: {error}") return jsonify({'error': 'Not found'}), 404 @app.errorhandler(500) def internal_error(error): """Handle internal server errors""" logger.error(f"Internal server error: {error}") return jsonify({'error': 'Internal server error'}), 500 def validate_setup(): """Validate system setup before starting""" logger.info("Validating system setup...") # Check config files exist if not os.path.exists(config.key_file_path): logger.error(f"Key file not found: {config.key_file_path}") return False if not os.path.exists(config.dummy_file_path): logger.error(f"Dummy file not found: {config.dummy_file_path}") return False # Test file permissions try: with open(config.key_file_path, 'r') as f: f.read(1) with open(config.dummy_file_path, 'r') as f: f.read(1) except Exception as e: logger.error(f"File permission test failed: {str(e)}") return False # Test notification system logger.info("Testing notification system...") try: key_success, _ = send_ntfy_notification( config.ntfy_backends_key[:1], # Test only first backend "System startup test - key notifications", "Emergency Access Startup Test" ) health_success, _ = send_ntfy_notification( config.ntfy_backends_health[:1], # Test only first backend "System startup test - health notifications", "Emergency Access Startup Test" ) if not key_success: logger.error("Key notification system test failed") return False if not health_success: logger.error("Health notification system test failed") return False except Exception as e: logger.warning(f"Notification test failed, but continuing: {str(e)}") logger.info("System validation completed successfully") return True if __name__ == '__main__': try: # Load configuration config = Config() logger.info("Configuration loaded successfully") # Add ntfy log handler after config is loaded if config.send_all_logs: ntfy_handler = NtfyLogHandler(config) min_level = getattr(logging, config.log_level.upper(), logging.WARNING) ntfy_handler.setLevel(min_level) # Add to root logger to catch all application logs logging.getLogger().addHandler(ntfy_handler) # Validate system setup if not validate_setup(): logger.error("System validation failed, exiting") sys.exit(1) # Add Flask routes with config values app.add_url_rule(config.key_route, 'get_key_part', get_key_part, methods=['GET']) app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET']) logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}") logger.info(f"Key route: {config.key_route}") logger.info(f"Health route: {config.health_route}") # Run the server on local port for Caddy reverse proxy app.run( host=config.server_host, port=config.server_port, debug=False, threaded=True ) except Exception as e: logger.error(f"Failed to start server: {str(e)}") sys.exit(1)