Browse Source

changed 404 to info

zehe 1 month ago
parent
commit
61df6b64bb
1 changed files with 258 additions and 178 deletions
  1. 258 178
      main.py

+ 258 - 178
main.py

@@ -16,6 +16,7 @@ from typing import List, Tuple
 import base64
 import signal
 
+
 # Configure logging with custom handler
 class NtfyLogHandler(logging.Handler):
     """Custom logging handler that sends logs to ntfy health backends"""
@@ -26,51 +27,55 @@ class NtfyLogHandler(logging.Handler):
 
     def emit(self, record):
         """Send log record to health backends"""
-        if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs:
+        if hasattr(self.config, "ntfy_backends_health") and self.config.send_all_logs:
             try:
                 log_message = self.format(record)
                 # Get configured log level or default to WARNING
-                min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING)
+                min_level = getattr(
+                    logging, self.config.log_level.upper(), logging.WARNING
+                )
 
                 if record.levelno >= min_level:
                     # Format message with appropriate emoji based on log level
-                    emoji = "🚨" if record.levelno >= logging.ERROR else "⚠️" if record.levelno >= logging.WARNING else "ℹ️"
+                    emoji = (
+                        "🚨"
+                        if record.levelno >= logging.ERROR
+                        else "⚠️"
+                        if record.levelno >= logging.WARNING
+                        else "ℹ️"
+                    )
                     title = f"Emergency Access {record.levelname}"
                     message = f"{emoji} {record.name}: {record.getMessage()}"
 
                     send_ntfy_notification(
-                        self.config.ntfy_backends_health,
-                        message,
-                        title
+                        self.config.ntfy_backends_health, message, title
                     )
             except Exception:
                 # Don't fail the application if logging notification fails
                 pass
 
+
 # Configure logging with secure-file fallback
-log_path = '/var/log/emergency-access.log'
+log_path = "/var/log/emergency-access.log"
 try:
     os.makedirs(os.path.dirname(log_path), exist_ok=True)
     # Ensure the log file exists with restrictive permissions
     if not os.path.exists(log_path):
-        open(log_path, 'a').close()
+        open(log_path, "a").close()
         try:
             os.chmod(log_path, 0o600)
         except Exception:
             # Best-effort; if this fails we continue with stream logging
             pass
-    log_handlers = [
-        logging.FileHandler(log_path),
-        logging.StreamHandler()
-    ]
+    log_handlers = [logging.FileHandler(log_path), logging.StreamHandler()]
 except Exception:
     # If any issue creating the file (permissions, etc.), fall back to stream-only logging
     log_handlers = [logging.StreamHandler()]
 
 logging.basicConfig(
     level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=log_handlers
+    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+    handlers=log_handlers,
 )
 logger = logging.getLogger(__name__)
 
@@ -80,7 +85,6 @@ app = Flask(__name__)
 config = None
 
 
-
 def require_auth(key_config=None, is_health=False):
     """Decorator for HTTP Basic Authentication with simple in-memory rate limiting.
 
@@ -100,24 +104,24 @@ def require_auth(key_config=None, is_health=False):
     # Initialize shared state on the require_auth function object (module-global alternative)
     if not hasattr(require_auth, "_state"):
         require_auth._state = {
-            'attempts': {},  # (ip, username) -> list[timestamps]
-            'blocked': {},   # (ip, username) -> blocked_until_timestamp
-            'lock': threading.Lock()
+            "attempts": {},  # (ip, username) -> list[timestamps]
+            "blocked": {},  # (ip, username) -> blocked_until_timestamp
+            "lock": threading.Lock(),
         }
 
     MAX_ATTEMPTS = 5
-    WINDOW = 300      # seconds
-    LOCKOUT = 900     # seconds
+    WINDOW = 300  # seconds
+    LOCKOUT = 900  # seconds
 
     def decorator(f):
         @wraps(f)
         def decorated_function(*args, **kwargs):
             if config is None:
                 logger.error("Configuration not loaded")
-                return jsonify({'error': 'Server configuration error'}), 500
+                return jsonify({"error": "Server configuration error"}), 500
 
             auth = request.authorization
-            client_ip = request.remote_addr or 'unknown'
+            client_ip = request.remote_addr or "unknown"
 
             # determine username for rate limit key; use placeholder if missing
             username_for_key = auth.username if auth and auth.username else "<no_auth>"
@@ -128,46 +132,64 @@ def require_auth(key_config=None, is_health=False):
             state = require_auth._state
 
             # Check block list
-            with state['lock']:
-                blocked_until = state['blocked'].get(key)
+            with state["lock"]:
+                blocked_until = state["blocked"].get(key)
                 if blocked_until and now_ts < blocked_until:
                     retry_after = int(blocked_until - now_ts)
-                    logger.warning(f"Blocked auth attempt from {client_ip} for user '{username_for_key}' (locked until {blocked_until})")
+                    logger.warning(
+                        f"Blocked auth attempt from {client_ip} for user '{username_for_key}' (locked until {blocked_until})"
+                    )
                     # Return 429 Too Many Requests
-                    return jsonify({'error': 'Too many authentication attempts, temporarily locked'}), 429, {
-                        'Retry-After': str(retry_after)
-                    }
+                    return (
+                        jsonify(
+                            {
+                                "error": "Too many authentication attempts, temporarily locked"
+                            }
+                        ),
+                        429,
+                        {"Retry-After": str(retry_after)},
+                    )
 
             if not auth:
                 # For missing auth, still account for attempts keyed by ip and '<no_auth>'
                 # Increment attempt count and possibly lock
-                with state['lock']:
-                    attempts = state['attempts'].setdefault(key, [])
+                with state["lock"]:
+                    attempts = state["attempts"].setdefault(key, [])
                     # prune old timestamps
                     attempts = [t for t in attempts if now_ts - t <= WINDOW]
                     attempts.append(now_ts)
-                    state['attempts'][key] = attempts
+                    state["attempts"][key] = attempts
                     if len(attempts) > MAX_ATTEMPTS:
                         # Lock out
-                        state['blocked'][key] = now_ts + LOCKOUT
-                        logger.error(f"Locking out {client_ip} for missing/invalid auth (no credentials) after repeated attempts")
+                        state["blocked"][key] = now_ts + LOCKOUT
+                        logger.error(
+                            f"Locking out {client_ip} for missing/invalid auth (no credentials) after repeated attempts"
+                        )
                         # Try to notify about brute force (best-effort)
                         try:
-                            if config and hasattr(config, 'ntfy_backends_health'):
+                            if config and hasattr(config, "ntfy_backends_health"):
                                 send_ntfy_notification(
                                     config.ntfy_backends_health,
                                     f"🚨 Brute-force lockout: {client_ip} (no credentials) on server",
-                                    "Emergency Access ALERT"
+                                    "Emergency Access ALERT",
                                 )
                         except Exception:
                             pass
-                        return jsonify({'error': 'Too many authentication attempts, temporarily locked'}), 429, {
-                            'Retry-After': str(LOCKOUT)
-                        }
-
-                return jsonify({'error': 'Authentication required'}), 401, {
-                    'WWW-Authenticate': 'Basic realm="Emergency Access"'
-                }
+                        return (
+                            jsonify(
+                                {
+                                    "error": "Too many authentication attempts, temporarily locked"
+                                }
+                            ),
+                            429,
+                            {"Retry-After": str(LOCKOUT)},
+                        )
+
+                return (
+                    jsonify({"error": "Authentication required"}),
+                    401,
+                    {"WWW-Authenticate": 'Basic realm="Emergency Access"'},
+                )
 
             # Determine expected credentials
             if is_health:
@@ -177,71 +199,90 @@ def require_auth(key_config=None, is_health=False):
             else:
                 if key_config is None:
                     logger.error("Key configuration not provided for authentication")
-                    return jsonify({'error': 'Server configuration error'}), 500
+                    return jsonify({"error": "Server configuration error"}), 500
                 expected_username = key_config.username
                 expected_password_hash = key_config.password_hash
                 auth_type = f"key '{key_config.key_id}'"
 
             # Verify credentials
-            credential_ok = (auth.username == expected_username and
-                             Config.verify_password(auth.password, expected_password_hash))
+            credential_ok = (
+                auth.username == expected_username
+                and Config.verify_password(auth.password, expected_password_hash)
+            )
 
             if not credential_ok:
                 # record failed attempt and check for lockout
-                with state['lock']:
-                    attempts = state['attempts'].setdefault(key, [])
+                with state["lock"]:
+                    attempts = state["attempts"].setdefault(key, [])
                     attempts = [t for t in attempts if now_ts - t <= WINDOW]
                     attempts.append(now_ts)
-                    state['attempts'][key] = attempts
+                    state["attempts"][key] = attempts
 
                     if len(attempts) > MAX_ATTEMPTS:
-                        state['blocked'][key] = now_ts + LOCKOUT
-                        logger.error(f"Locking out {client_ip} for user '{username_for_key}' after repeated failed attempts")
+                        state["blocked"][key] = now_ts + LOCKOUT
+                        logger.error(
+                            f"Locking out {client_ip} for user '{username_for_key}' after repeated failed attempts"
+                        )
                         # Notify about brute-force lockout (best-effort)
                         try:
                             notify_backends = []
-                            if config and hasattr(config, 'ntfy_backends_health'):
+                            if config and hasattr(config, "ntfy_backends_health"):
                                 notify_backends.extend(config.ntfy_backends_health)
                             # Include key-specific backends when available
-                            if key_config and getattr(key_config, 'backends', None):
+                            if key_config and getattr(key_config, "backends", None):
                                 notify_backends.extend(key_config.backends)
                             if notify_backends:
                                 send_ntfy_notification(
                                     list(dict.fromkeys(notify_backends)),  # deduplicate
                                     f"🚨 Brute-force lockout: {client_ip} user='{username_for_key}' on {auth_type}",
-                                    "Emergency Access ALERT"
+                                    "Emergency Access ALERT",
                                 )
                         except Exception:
                             pass
 
-                        return jsonify({'error': 'Too many authentication attempts, temporarily locked'}), 429, {
-                            'Retry-After': str(LOCKOUT)
-                        }
-
-                logger.warning(f"Authentication failed for {auth_type}: invalid credentials from {client_ip}")
-                return jsonify({'error': 'Invalid credentials'}), 401, {
-                    'WWW-Authenticate': 'Basic realm="Emergency Access"'
-                }
+                        return (
+                            jsonify(
+                                {
+                                    "error": "Too many authentication attempts, temporarily locked"
+                                }
+                            ),
+                            429,
+                            {"Retry-After": str(LOCKOUT)},
+                        )
+
+                logger.warning(
+                    f"Authentication failed for {auth_type}: invalid credentials from {client_ip}"
+                )
+                return (
+                    jsonify({"error": "Invalid credentials"}),
+                    401,
+                    {"WWW-Authenticate": 'Basic realm="Emergency Access"'},
+                )
 
             # On success, clear recorded failed attempts for this key
-            with state['lock']:
-                if key in state['attempts']:
+            with state["lock"]:
+                if key in state["attempts"]:
                     try:
-                        del state['attempts'][key]
+                        del state["attempts"][key]
                     except KeyError:
                         pass
-                if key in state['blocked']:
+                if key in state["blocked"]:
                     try:
-                        del state['blocked'][key]
+                        del state["blocked"][key]
                     except KeyError:
                         pass
 
             logger.info(f"Authentication successful for {auth_type}")
             return f(*args, **kwargs)
+
         return decorated_function
+
     return decorator
 
-def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]:
+
+def send_ntfy_notification(
+    backends: List[str], message: str, title: str = None
+) -> Tuple[bool, List[str]]:
     """
     Send notification using dschep/ntfy with dedicated config file
     Returns: (success, successful_backends)
@@ -261,7 +302,9 @@ def send_ntfy_notification(backends: List[str], message: str, title: str = None)
                     import ntfy
                     from ntfy.config import load_config
                 except ImportError:
-                    raise Exception("ntfy package not available. Please install with: pip install ntfy")
+                    raise Exception(
+                        "ntfy package not available. Please install with: pip install ntfy"
+                    )
 
                 if config is None:
                     raise Exception("Configuration not loaded")
@@ -279,9 +322,13 @@ def send_ntfy_notification(backends: List[str], message: str, title: str = None)
                     try:
                         # Send notification using the backend name from our config file
                         if title:
-                            result[0] = ntfy.notify(message, title=title, config=ntfy_config)
+                            result[0] = ntfy.notify(
+                                message, title=title, config=ntfy_config
+                            )
                         else:
-                            result[0] = ntfy.notify(message, config=ntfy_config, title="Note")
+                            result[0] = ntfy.notify(
+                                message, config=ntfy_config, title="Note"
+                            )
                     except Exception as e:
                         exception[0] = e
 
@@ -313,18 +360,25 @@ def send_ntfy_notification(backends: List[str], message: str, title: str = None)
             except Exception as e:
                 last_error = e
                 if attempt < max_retries - 1:
-                    logger.warning(f"Notification attempt {attempt + 1} failed for {backend}: {str(e)}, retrying...")
+                    logger.warning(
+                        f"Notification attempt {attempt + 1} failed for {backend}: {str(e)}, retrying..."
+                    )
                     time.sleep(retry_delay * (attempt + 1))  # Exponential backoff
                 else:
-                    logger.error(f"All notification attempts failed for {backend}: {str(e)}")
+                    logger.error(
+                        f"All notification attempts failed for {backend}: {str(e)}"
+                    )
 
         if not success and last_error:
             # Don't raise exception - just log failure and continue with other backends
-            logger.error(f"Failed to send notification to {backend} after {max_retries} attempts: {str(last_error)}")
+            logger.error(
+                f"Failed to send notification to {backend} after {max_retries} attempts: {str(last_error)}"
+            )
 
     success = len(successful_backends) > 0
     return success, successful_backends
 
+
 def read_file_safely(file_path: str) -> Tuple[bool, str]:
     """
     Safely read file content
@@ -335,7 +389,7 @@ def read_file_safely(file_path: str) -> Tuple[bool, str]:
             logger.error(f"File not found: {file_path}")
             return False, f"File not found: {file_path}"
 
-        with open(file_path, 'r') as f:
+        with open(file_path, "r") as f:
             content = f.read().strip()
 
         if not content:
@@ -351,69 +405,88 @@ def read_file_safely(file_path: str) -> Tuple[bool, str]:
         logger.error(f"Failed to read file {file_path}: {str(e)}")
         return False, f"Failed to read file: {str(e)}"
 
+
 def create_key_handler(key_config):
     """Create a key access handler for a specific key configuration"""
+
     @require_auth(key_config=key_config)
     def get_key_part():
         """Emergency key access endpoint"""
         if key_config is None:
             logger.error("Key configuration is None in get_key_part")
-            return jsonify({'error': 'Server configuration error'}), 500
-
-        logger.warning(f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'")
-
+            return jsonify({"error": "Server configuration error"}), 500
 
+        logger.warning(
+            f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'"
+        )
 
         try:
             # Send notification first - fail-safe approach
             notification_success, successful_backends = send_ntfy_notification(
-                key_config.backends,
-                key_config.message,
-                "EMERGENCY ACCESS ALERT"
+                key_config.backends, key_config.message, "EMERGENCY ACCESS ALERT"
             )
 
             if not notification_success:
-                logger.error(f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'")
-                return jsonify({
-                    'error': 'Notification system failure',
-                    'message': 'Access denied for security reasons'
-                }), 500
+                logger.error(
+                    f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'"
+                )
+                return jsonify(
+                    {
+                        "error": "Notification system failure",
+                        "message": "Access denied for security reasons",
+                    }
+                ), 500
 
-            logger.info(f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'")
+            logger.info(
+                f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'"
+            )
 
             # Read key file
             file_success, content = read_file_safely(key_config.file_path)
 
             if not file_success:
-                logger.error(f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}")
-                return jsonify({
-                    'error': 'File access failure',
-                    'message': 'Unable to retrieve key part'
-                }), 500
-
-            logger.warning(f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'")
-            resp = jsonify({
-                'success': True,
-                'key_id': key_config.key_id,
-                'key_part': content,
-                'timestamp': time.time(),
-                'notified_backends': successful_backends
-            })
+                logger.error(
+                    f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}"
+                )
+                return jsonify(
+                    {
+                        "error": "File access failure",
+                        "message": "Unable to retrieve key part",
+                    }
+                ), 500
+
+            logger.warning(
+                f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'"
+            )
+            resp = jsonify(
+                {
+                    "success": True,
+                    "key_id": key_config.key_id,
+                    "key_part": content,
+                    "timestamp": time.time(),
+                    "notified_backends": successful_backends,
+                }
+            )
             # Ensure responses with secret material are never cached by clients or intermediaries
-            resp.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, private'
-            resp.headers['Pragma'] = 'no-cache'
-            resp.headers['Expires'] = '0'
+            resp.headers["Cache-Control"] = (
+                "no-store, no-cache, must-revalidate, private"
+            )
+            resp.headers["Pragma"] = "no-cache"
+            resp.headers["Expires"] = "0"
             # Additional defensive headers
-            resp.headers['X-Content-Type-Options'] = 'nosniff'
-            resp.headers['Content-Security-Policy'] = "default-src 'none'; frame-ancestors 'none'; sandbox"
+            resp.headers["X-Content-Type-Options"] = "nosniff"
+            resp.headers["Content-Security-Policy"] = (
+                "default-src 'none'; frame-ancestors 'none'; sandbox"
+            )
             return resp
 
         except Exception as e:
-            logger.error(f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}")
-            return jsonify({
-                'error': 'System error',
-                'message': 'Internal server error'
-            }), 500
+            logger.error(
+                f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}"
+            )
+            return jsonify(
+                {"error": "System error", "message": "Internal server error"}
+            ), 500
 
     return get_key_part
 
@@ -423,18 +496,14 @@ def health_check():
     """Health check endpoint that verifies both health monitoring and all key request functionality"""
     logger.info("Health check requested")
 
-
-
     if config is None:
         logger.error("Configuration not loaded during health check")
-        return jsonify({'status': 'error', 'message': 'Configuration not loaded'}), 500
+        return jsonify({"status": "error", "message": "Configuration not loaded"}), 500
 
     try:
         # Test health notification system
         health_notification_success, health_backends = send_ntfy_notification(
-            config.ntfy_backends_health,
-            config.ntfy_health_message,
-            "Health Check"
+            config.ntfy_backends_health, config.ntfy_health_message, "Health Check"
         )
 
         # Test dummy file access
@@ -457,24 +526,26 @@ def health_check():
                 # Test notification without actually sending
                 backend_test_success = len(key_config.backends) > 0
                 key_backends_status[key_id] = {
-                    'backends': key_config.backends,
-                    'success': backend_test_success
+                    "backends": key_config.backends,
+                    "success": backend_test_success,
                 }
                 if not backend_test_success:
                     all_key_backends_ok = False
             except Exception as e:
                 key_backends_status[key_id] = {
-                    'backends': key_config.backends,
-                    'success': False,
-                    'error': str(e)
+                    "backends": key_config.backends,
+                    "success": False,
+                    "error": str(e),
                 }
                 all_key_backends_ok = False
 
         # Determine overall health status
-        all_systems_ok = (health_notification_success and
-                         dummy_file_success and
-                         all_key_files_ok and
-                         all_key_backends_ok)
+        all_systems_ok = (
+            health_notification_success
+            and dummy_file_success
+            and all_key_files_ok
+            and all_key_backends_ok
+        )
 
         if not all_systems_ok:
             error_details = []
@@ -489,57 +560,59 @@ def health_check():
                     error_details.append(f"key file access failed for '{key_id}'")
 
             for key_id, status in key_backends_status.items():
-                if not status['success']:
+                if not status["success"]:
                     error_msg = f"key backends failed for '{key_id}'"
-                    if 'error' in status:
+                    if "error" in status:
                         error_msg += f": {status['error']}"
                     error_details.append(error_msg)
 
             logger.error(f"Health check failed: {', '.join(error_details)}")
-            return jsonify({
-                'status': 'error',
-                'message': 'System components failed',
-                'details': error_details,
-                'health_notifications': health_notification_success,
-                'dummy_file_access': dummy_file_success,
-                'key_files_status': key_files_status,
-                'key_backends_status': key_backends_status
-            }), 500
+            return jsonify(
+                {
+                    "status": "error",
+                    "message": "System components failed",
+                    "details": error_details,
+                    "health_notifications": health_notification_success,
+                    "dummy_file_access": dummy_file_success,
+                    "key_files_status": key_files_status,
+                    "key_backends_status": key_backends_status,
+                }
+            ), 500
 
         logger.info("Health check completed successfully - all systems operational")
-        return jsonify({
-            'status': 'ok',
-            'timestamp': time.time(),
-            'health_backends_notified': health_backends,
-            'dummy_content_length': len(dummy_content),
-            'keys_accessible': len(key_files_status),
-            'key_files_status': key_files_status,
-            'key_backends_status': key_backends_status,
-            'emergency_system_ready': True
-        })
+        return jsonify(
+            {
+                "status": "ok",
+                "timestamp": time.time(),
+                "health_backends_notified": health_backends,
+                "dummy_content_length": len(dummy_content),
+                "keys_accessible": len(key_files_status),
+                "key_files_status": key_files_status,
+                "key_backends_status": key_backends_status,
+                "emergency_system_ready": True,
+            }
+        )
 
     except Exception as e:
         logger.error(f"Health check error: {str(e)}")
-        return jsonify({
-            'status': 'error',
-            'message': 'System error',
-            'error': str(e)
-        }), 500
-
-
+        return jsonify(
+            {"status": "error", "message": "System error", "error": str(e)}
+        ), 500
 
 
 @app.errorhandler(404)
 def not_found(error):
     """Handle 404 errors silently for security"""
-    logger.warning(f"404 attempt: {error}")
-    return jsonify({'error': 'Not found'}), 404
+    logger.info(f"404 attempt: {error}")
+    return jsonify({"error": "Not found"}), 404
+
 
 @app.errorhandler(500)
 def internal_error(error):
     """Handle internal server errors"""
     logger.error(f"Internal server error: {error}")
-    return jsonify({'error': 'Internal server error'}), 500
+    return jsonify({"error": "Internal server error"}), 500
+
 
 def validate_setup():
     """Validate system setup before starting"""
@@ -556,7 +629,7 @@ def validate_setup():
 
     # Test dummy file permissions
     try:
-        with open(config.dummy_file_path, 'r') as f:
+        with open(config.dummy_file_path, "r") as f:
             f.read(1)
     except Exception as e:
         logger.error(f"Dummy file permission test failed: {str(e)}")
@@ -573,7 +646,7 @@ def validate_setup():
 
         # Test key file permissions
         try:
-            with open(key_config.file_path, 'r') as f:
+            with open(key_config.file_path, "r") as f:
                 f.read(1)
         except Exception as e:
             logger.error(f"Key file permission test failed for '{key_id}': {str(e)}")
@@ -591,7 +664,7 @@ def validate_setup():
         health_success, _ = send_ntfy_notification(
             config.ntfy_backends_health[:1],  # Test only first backend
             "System startup test - health notifications",
-            "Emergency Access Startup Test"
+            "Emergency Access Startup Test",
         )
 
         if not health_success:
@@ -603,7 +676,7 @@ def validate_setup():
             key_success, _ = send_ntfy_notification(
                 key_config.backends[:1],  # Test only first backend
                 f"System startup test - key '{key_id}' notifications",
-                "Emergency Access Startup Test"
+                "Emergency Access Startup Test",
             )
 
             if not key_success:
@@ -616,6 +689,7 @@ def validate_setup():
     logger.info("System validation completed successfully")
     return True
 
+
 class ProductionRequestHandler(WSGIRequestHandler):
     """Custom request handler with improved error handling"""
 
@@ -627,6 +701,7 @@ class ProductionRequestHandler(WSGIRequestHandler):
         """Override to use our logger"""
         logger.info(f"HTTP: {format % args}")
 
+
 def main():
     global config
     try:
@@ -650,25 +725,27 @@ def main():
         # Add Flask routes dynamically for each key
         for key_id, key_config in config.keys.items():
             handler = create_key_handler(key_config)
-            endpoint_name = f'get_key_part_{key_id}'
-            app.add_url_rule(key_config.route, endpoint_name, handler, methods=['GET'])
-            logger.info(f"Registered authenticated key route for '{key_id}': {key_config.route}")
+            endpoint_name = f"get_key_part_{key_id}"
+            app.add_url_rule(key_config.route, endpoint_name, handler, methods=["GET"])
+            logger.info(
+                f"Registered authenticated key route for '{key_id}': {key_config.route}"
+            )
 
         # Add health check route
-        app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET'])
-
-
+        app.add_url_rule(
+            config.health_route, "health_check", health_check, methods=["GET"]
+        )
 
-        logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}")
+        logger.info(
+            f"Starting emergency access server on {config.server_host}:{config.server_port}"
+        )
         logger.info(f"Health route: {config.health_route}")
         logger.info(f"Configured {len(config.keys)} key(s)")
 
-
-
-
         # Use production-ready server with better error handling
         try:
             from waitress import serve
+
             logger.info("Using Waitress WSGI server for production")
             serve(
                 app,
@@ -677,17 +754,19 @@ def main():
                 threads=6,
                 connection_limit=100,
                 cleanup_interval=30,
-                channel_timeout=120
+                channel_timeout=120,
             )
         except ImportError:
-            logger.critical("Waitress WSGI server not installed. This application must run under a production WSGI server such as 'waitress' or 'gunicorn'. Aborting startup.")
+            logger.critical(
+                "Waitress WSGI server not installed. This application must run under a production WSGI server such as 'waitress' or 'gunicorn'. Aborting startup."
+            )
             # Attempt to notify configured health backends about startup failure (best-effort)
             try:
-                if config and hasattr(config, 'ntfy_backends_health'):
+                if config and hasattr(config, "ntfy_backends_health"):
                     send_ntfy_notification(
                         config.ntfy_backends_health,
                         "🚨 Emergency Access server failed to start: waitress not installed",
-                        "Emergency Access CRITICAL"
+                        "Emergency Access CRITICAL",
                     )
             except Exception:
                 # Do not raise further - we are already aborting startup
@@ -698,12 +777,12 @@ def main():
         logger.info("Received keyboard interrupt, shutting down gracefully")
     except Exception as e:
         logger.error(f"Failed to start server: {str(e)}")
-        if config and hasattr(config, 'ntfy_backends_health'):
+        if config and hasattr(config, "ntfy_backends_health"):
             try:
                 send_ntfy_notification(
                     config.ntfy_backends_health,
                     f"🚨 Emergency Access server failed to start: {str(e)}",
-                    "Emergency Access CRITICAL"
+                    "Emergency Access CRITICAL",
                 )
             except Exception:
                 pass
@@ -711,5 +790,6 @@ def main():
     finally:
         logger.info("Emergency Access server shutdown complete")
 
-if __name__ == '__main__':
+
+if __name__ == "__main__":
     main()