#!/usr/bin/env python3 import os import sys import logging import time import tempfile import threading import subprocess from flask import Flask, jsonify, request from werkzeug.security import check_password_hash from werkzeug.serving import WSGIRequestHandler from functools import wraps from config import Config from typing import List, Tuple import base64 import signal # Configure logging with custom handler class NtfyLogHandler(logging.Handler): """Custom logging handler that sends logs to ntfy health backends""" def __init__(self, config_obj): super().__init__() self.config = config_obj def emit(self, record): """Send log record to health backends""" if hasattr(self.config, "ntfy_backends_health") and self.config.send_all_logs: try: log_message = self.format(record) # Get configured log level or default to WARNING min_level = getattr( logging, self.config.log_level.upper(), logging.WARNING ) if record.levelno >= min_level: # Format message with appropriate emoji based on log level emoji = ( "🚨" if record.levelno >= logging.ERROR else "âš ī¸" if record.levelno >= logging.WARNING else "â„šī¸" ) title = f"Emergency Access {record.levelname}" message = f"{emoji} {record.name}: {record.getMessage()}" send_ntfy_notification( self.config.ntfy_backends_health, message, title ) except Exception: # Don't fail the application if logging notification fails pass # Configure logging with secure-file fallback log_path = "/var/log/emergency-access.log" try: os.makedirs(os.path.dirname(log_path), exist_ok=True) # Ensure the log file exists with restrictive permissions if not os.path.exists(log_path): open(log_path, "a").close() try: os.chmod(log_path, 0o600) except Exception: # Best-effort; if this fails we continue with stream logging pass log_handlers = [logging.FileHandler(log_path), logging.StreamHandler()] except Exception: # If any issue creating the file (permissions, etc.), fall back to stream-only logging log_handlers = [logging.StreamHandler()] logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=log_handlers, ) logger = logging.getLogger(__name__) app = Flask(__name__) # Global config instance - initialized during main execution config = None def require_auth(key_config=None, is_health=False): """Decorator for HTTP Basic Authentication with simple in-memory rate limiting. This implements a small, best-effort per-(IP,username) rate limiter and lockout to mitigate brute-force attempts. It is intentionally simple and in-memory: - MAX_ATTEMPTS: number of failed attempts within WINDOW before lockout - WINDOW: window (seconds) to count attempts - LOCKOUT: lockout duration (seconds) after exceeding MAX_ATTEMPTS Notes: - Because this is in-memory, it resets on process restart. - For multi-process deployments, offload rate limiting to the reverse proxy (Caddy) or a shared store (redis). - On repeated lockouts a notification is sent (best-effort) to configured health backends and, when applicable, to the key's backends. """ # Initialize shared state on the require_auth function object (module-global alternative) if not hasattr(require_auth, "_state"): require_auth._state = { "attempts": {}, # (ip, username) -> list[timestamps] "blocked": {}, # (ip, username) -> blocked_until_timestamp "lock": threading.Lock(), } MAX_ATTEMPTS = 5 WINDOW = 300 # seconds LOCKOUT = 900 # seconds def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if config is None: logger.error("Configuration not loaded") return jsonify({"error": "Server configuration error"}), 500 auth = request.authorization client_ip = request.remote_addr or "unknown" # determine username for rate limit key; use placeholder if missing username_for_key = auth.username if auth and auth.username else "" key = (client_ip, username_for_key) now_ts = time.time() state = require_auth._state # Check block list with state["lock"]: blocked_until = state["blocked"].get(key) if blocked_until and now_ts < blocked_until: retry_after = int(blocked_until - now_ts) logger.warning( f"Blocked auth attempt from {client_ip} for user '{username_for_key}' (locked until {blocked_until})" ) # Return 429 Too Many Requests return ( jsonify( { "error": "Too many authentication attempts, temporarily locked" } ), 429, {"Retry-After": str(retry_after)}, ) if not auth: # For missing auth, still account for attempts keyed by ip and '' # Increment attempt count and possibly lock with state["lock"]: attempts = state["attempts"].setdefault(key, []) # prune old timestamps attempts = [t for t in attempts if now_ts - t <= WINDOW] attempts.append(now_ts) state["attempts"][key] = attempts if len(attempts) > MAX_ATTEMPTS: # Lock out state["blocked"][key] = now_ts + LOCKOUT logger.error( f"Locking out {client_ip} for missing/invalid auth (no credentials) after repeated attempts" ) # Try to notify about brute force (best-effort) try: if config and hasattr(config, "ntfy_backends_health"): send_ntfy_notification( config.ntfy_backends_health, f"🚨 Brute-force lockout: {client_ip} (no credentials) on server", "Emergency Access ALERT", ) except Exception: pass return ( jsonify( { "error": "Too many authentication attempts, temporarily locked" } ), 429, {"Retry-After": str(LOCKOUT)}, ) return ( jsonify({"error": "Authentication required"}), 401, {"WWW-Authenticate": 'Basic realm="Emergency Access"'}, ) # Determine expected credentials if is_health: expected_username = config.health_username expected_password_hash = config.health_password_hash auth_type = "health check" else: if key_config is None: logger.error("Key configuration not provided for authentication") return jsonify({"error": "Server configuration error"}), 500 expected_username = key_config.username expected_password_hash = key_config.password_hash auth_type = f"key '{key_config.key_id}'" # Verify credentials credential_ok = ( auth.username == expected_username and Config.verify_password(auth.password, expected_password_hash) ) if not credential_ok: # record failed attempt and check for lockout with state["lock"]: attempts = state["attempts"].setdefault(key, []) attempts = [t for t in attempts if now_ts - t <= WINDOW] attempts.append(now_ts) state["attempts"][key] = attempts if len(attempts) > MAX_ATTEMPTS: state["blocked"][key] = now_ts + LOCKOUT logger.error( f"Locking out {client_ip} for user '{username_for_key}' after repeated failed attempts" ) # Notify about brute-force lockout (best-effort) try: notify_backends = [] if config and hasattr(config, "ntfy_backends_health"): notify_backends.extend(config.ntfy_backends_health) # Include key-specific backends when available if key_config and getattr(key_config, "backends", None): notify_backends.extend(key_config.backends) if notify_backends: send_ntfy_notification( list(dict.fromkeys(notify_backends)), # deduplicate f"🚨 Brute-force lockout: {client_ip} user='{username_for_key}' on {auth_type}", "Emergency Access ALERT", ) except Exception: pass return ( jsonify( { "error": "Too many authentication attempts, temporarily locked" } ), 429, {"Retry-After": str(LOCKOUT)}, ) logger.warning( f"Authentication failed for {auth_type}: invalid credentials from {client_ip}" ) return ( jsonify({"error": "Invalid credentials"}), 401, {"WWW-Authenticate": 'Basic realm="Emergency Access"'}, ) # On success, clear recorded failed attempts for this key with state["lock"]: if key in state["attempts"]: try: del state["attempts"][key] except KeyError: pass if key in state["blocked"]: try: del state["blocked"][key] except KeyError: pass logger.info(f"Authentication successful for {auth_type}") return f(*args, **kwargs) return decorated_function return decorator def send_ntfy_notification( backends: List[str], message: str, title: str = None ) -> Tuple[bool, List[str]]: """ Send notification using dschep/ntfy with dedicated config file Returns: (success, successful_backends) """ successful_backends = [] max_retries = 3 retry_delay = 1 for backend in backends: success = False last_error = None for attempt in range(max_retries): try: # Import ntfy here to avoid import issues during startup try: import ntfy from ntfy.config import load_config except ImportError: raise Exception( "ntfy package not available. Please install with: pip install ntfy" ) if config is None: raise Exception("Configuration not loaded") # Load the ntfy config file ntfy_config = load_config(config.ntfy_config_path) ntfy_config["backends"] = [backend] # Add timeout to prevent hanging using threading result = [None] exception = [None] def notify_with_timeout(): try: # Send notification using the backend name from our config file if title: result[0] = ntfy.notify( message, title=title, config=ntfy_config ) else: result[0] = ntfy.notify( message, config=ntfy_config, title="Note" ) except Exception as e: exception[0] = e thread = threading.Thread(target=notify_with_timeout) thread.daemon = True thread.start() thread.join(timeout=15) # 15 second timeout if thread.is_alive(): raise Exception("Notification timeout") if exception[0]: raise exception[0] ret = result[0] if ret == 0: successful_backends.append(backend) logger.info(f"Notification sent successfully via {backend}") success = True break else: raise Exception(f"ntfy returned error code {ret}") except ImportError as e: logger.error(f"ntfy package not available for backend {backend}: {e}") last_error = e break # Don't retry import errors except Exception as e: last_error = e if attempt < max_retries - 1: logger.warning( f"Notification attempt {attempt + 1} failed for {backend}: {str(e)}, retrying..." ) time.sleep(retry_delay * (attempt + 1)) # Exponential backoff else: logger.error( f"All notification attempts failed for {backend}: {str(e)}" ) if not success and last_error: # Don't raise exception - just log failure and continue with other backends logger.error( f"Failed to send notification to {backend} after {max_retries} attempts: {str(last_error)}" ) success = len(successful_backends) > 0 return success, successful_backends def read_file_safely(file_path: str) -> Tuple[bool, str]: """ Safely read file content Returns: (success, content) """ try: if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False, f"File not found: {file_path}" with open(file_path, "r") as f: content = f.read().strip() if not content: logger.error(f"File is empty: {file_path}") return False, f"File is empty: {file_path}" return True, content except PermissionError: logger.error(f"Permission denied reading file: {file_path}") return False, f"Permission denied: {file_path}" except Exception as e: logger.error(f"Failed to read file {file_path}: {str(e)}") return False, f"Failed to read file: {str(e)}" def create_key_handler(key_config): """Create a key access handler for a specific key configuration""" @require_auth(key_config=key_config) def get_key_part(): """Emergency key access endpoint""" if key_config is None: logger.error("Key configuration is None in get_key_part") return jsonify({"error": "Server configuration error"}), 500 logger.warning( f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'" ) try: # Send notification first - fail-safe approach notification_success, successful_backends = send_ntfy_notification( key_config.backends, key_config.message, "EMERGENCY ACCESS ALERT" ) if not notification_success: logger.error( f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'" ) return jsonify( { "error": "Notification system failure", "message": "Access denied for security reasons", } ), 500 logger.info( f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'" ) # Read key file file_success, content = read_file_safely(key_config.file_path) if not file_success: logger.error( f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}" ) return jsonify( { "error": "File access failure", "message": "Unable to retrieve key part", } ), 500 logger.warning( f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'" ) resp = jsonify( { "success": True, "key_id": key_config.key_id, "key_part": content, "timestamp": time.time(), "notified_backends": successful_backends, } ) # Ensure responses with secret material are never cached by clients or intermediaries resp.headers["Cache-Control"] = ( "no-store, no-cache, must-revalidate, private" ) resp.headers["Pragma"] = "no-cache" resp.headers["Expires"] = "0" # Additional defensive headers resp.headers["X-Content-Type-Options"] = "nosniff" resp.headers["Content-Security-Policy"] = ( "default-src 'none'; frame-ancestors 'none'; sandbox" ) return resp except Exception as e: logger.error( f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}" ) return jsonify( {"error": "System error", "message": "Internal server error"} ), 500 return get_key_part @require_auth(is_health=True) def health_check(): """Health check endpoint that verifies both health monitoring and all key request functionality""" logger.info("Health check requested") if config is None: logger.error("Configuration not loaded during health check") return jsonify({"status": "error", "message": "Configuration not loaded"}), 500 try: # Test health notification system health_notification_success, health_backends = send_ntfy_notification( config.ntfy_backends_health, config.ntfy_health_message, "Health Check" ) # Test dummy file access dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path) # Test all key files access (without exposing content) key_files_status = {} all_key_files_ok = True for key_id, key_config in config.keys.items(): key_file_success, key_content = read_file_safely(key_config.file_path) key_files_status[key_id] = key_file_success if not key_file_success: all_key_files_ok = False # Test all key notification backends key_backends_status = {} all_key_backends_ok = True for key_id, key_config in config.keys.items(): try: # Test notification without actually sending backend_test_success = len(key_config.backends) > 0 key_backends_status[key_id] = { "backends": key_config.backends, "success": backend_test_success, } if not backend_test_success: all_key_backends_ok = False except Exception as e: key_backends_status[key_id] = { "backends": key_config.backends, "success": False, "error": str(e), } all_key_backends_ok = False # Determine overall health status all_systems_ok = ( health_notification_success and dummy_file_success and all_key_files_ok and all_key_backends_ok ) if not all_systems_ok: error_details = [] if not health_notification_success: error_details.append("health notifications failed") if not dummy_file_success: error_details.append(f"dummy file access failed: {dummy_content}") # Add key-specific errors for key_id, status in key_files_status.items(): if not status: error_details.append(f"key file access failed for '{key_id}'") for key_id, status in key_backends_status.items(): if not status["success"]: error_msg = f"key backends failed for '{key_id}'" if "error" in status: error_msg += f": {status['error']}" error_details.append(error_msg) logger.error(f"Health check failed: {', '.join(error_details)}") return jsonify( { "status": "error", "message": "System components failed", "details": error_details, "health_notifications": health_notification_success, "dummy_file_access": dummy_file_success, "key_files_status": key_files_status, "key_backends_status": key_backends_status, } ), 500 logger.info("Health check completed successfully - all systems operational") return jsonify( { "status": "ok", "timestamp": time.time(), "health_backends_notified": health_backends, "dummy_content_length": len(dummy_content), "keys_accessible": len(key_files_status), "key_files_status": key_files_status, "key_backends_status": key_backends_status, "emergency_system_ready": True, } ) except Exception as e: logger.error(f"Health check error: {str(e)}") return jsonify( {"status": "error", "message": "System error", "error": str(e)} ), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors silently for security""" logger.info(f"404 attempt: {error}") return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def internal_error(error): """Handle internal server errors""" logger.error(f"Internal server error: {error}") return jsonify({"error": "Internal server error"}), 500 def validate_setup(): """Validate system setup before starting""" logger.info("Validating system setup...") if config is None: logger.error("Configuration not loaded") return False # Check dummy file exists if not os.path.exists(config.dummy_file_path): logger.error(f"Dummy file not found: {config.dummy_file_path}") return False # Test dummy file permissions try: with open(config.dummy_file_path, "r") as f: f.read(1) except Exception as e: logger.error(f"Dummy file permission test failed: {str(e)}") return False # Validate all key configurations for key_id, key_config in config.keys.items(): logger.info(f"Validating key '{key_id}'...") # Check key file exists if not os.path.exists(key_config.file_path): logger.error(f"Key file not found for '{key_id}': {key_config.file_path}") return False # Test key file permissions try: with open(key_config.file_path, "r") as f: f.read(1) except Exception as e: logger.error(f"Key file permission test failed for '{key_id}': {str(e)}") return False # Validate backends are configured if not key_config.backends: logger.error(f"No notification backends configured for key '{key_id}'") return False # Test notification system logger.info("Testing notification system...") try: # Test health notification system health_success, _ = send_ntfy_notification( config.ntfy_backends_health[:1], # Test only first backend "System startup test - health notifications", "Emergency Access Startup Test", ) if not health_success: logger.error("Health notification system test failed") return False # Test each key's notification backends for key_id, key_config in config.keys.items(): key_success, _ = send_ntfy_notification( key_config.backends[:1], # Test only first backend f"System startup test - key '{key_id}' notifications", "Emergency Access Startup Test", ) if not key_success: logger.error(f"Key notification system test failed for '{key_id}'") return False except Exception as e: logger.warning(f"Notification test failed, but continuing: {str(e)}") logger.info("System validation completed successfully") return True class ProductionRequestHandler(WSGIRequestHandler): """Custom request handler with improved error handling""" def log_error(self, format, *args): """Override to use our logger""" logger.error(f"HTTP Error: {format % args}") def log_message(self, format, *args): """Override to use our logger""" logger.info(f"HTTP: {format % args}") def main(): global config try: # Load configuration config = Config() logger.info("Configuration loaded successfully") # Add ntfy log handler after config is loaded if config.send_all_logs: ntfy_handler = NtfyLogHandler(config) min_level = getattr(logging, config.log_level.upper(), logging.WARNING) ntfy_handler.setLevel(min_level) # Add to root logger to catch all application logs logging.getLogger().addHandler(ntfy_handler) # Validate system setup if not validate_setup(): logger.error("System validation failed, exiting") sys.exit(1) # Add Flask routes dynamically for each key for key_id, key_config in config.keys.items(): handler = create_key_handler(key_config) endpoint_name = f"get_key_part_{key_id}" app.add_url_rule(key_config.route, endpoint_name, handler, methods=["GET"]) logger.info( f"Registered authenticated key route for '{key_id}': {key_config.route}" ) # Add health check route app.add_url_rule( config.health_route, "health_check", health_check, methods=["GET"] ) logger.info( f"Starting emergency access server on {config.server_host}:{config.server_port}" ) logger.info(f"Health route: {config.health_route}") logger.info(f"Configured {len(config.keys)} key(s)") # Use production-ready server with better error handling try: from waitress import serve logger.info("Using Waitress WSGI server for production") serve( app, host=config.server_host, port=config.server_port, threads=6, connection_limit=100, cleanup_interval=30, channel_timeout=120, ) except ImportError: logger.critical( "Waitress WSGI server not installed. This application must run under a production WSGI server such as 'waitress' or 'gunicorn'. Aborting startup." ) # Attempt to notify configured health backends about startup failure (best-effort) try: if config and hasattr(config, "ntfy_backends_health"): send_ntfy_notification( config.ntfy_backends_health, "🚨 Emergency Access server failed to start: waitress not installed", "Emergency Access CRITICAL", ) except Exception: # Do not raise further - we are already aborting startup pass sys.exit(1) except KeyboardInterrupt: logger.info("Received keyboard interrupt, shutting down gracefully") except Exception as e: logger.error(f"Failed to start server: {str(e)}") if config and hasattr(config, "ntfy_backends_health"): try: send_ntfy_notification( config.ntfy_backends_health, f"🚨 Emergency Access server failed to start: {str(e)}", "Emergency Access CRITICAL", ) except Exception: pass sys.exit(1) finally: logger.info("Emergency Access server shutdown complete") if __name__ == "__main__": main()