| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795 |
- #!/usr/bin/env python3
- import os
- import sys
- import logging
- import time
- import tempfile
- import threading
- import subprocess
- from flask import Flask, jsonify, request
- from werkzeug.security import check_password_hash
- from werkzeug.serving import WSGIRequestHandler
- from functools import wraps
- from config import Config
- from typing import List, Tuple
- import base64
- import signal
- # Configure logging with custom handler
- class NtfyLogHandler(logging.Handler):
- """Custom logging handler that sends logs to ntfy health backends"""
- def __init__(self, config_obj):
- super().__init__()
- self.config = config_obj
- def emit(self, record):
- """Send log record to health backends"""
- if hasattr(self.config, "ntfy_backends_health") and self.config.send_all_logs:
- try:
- log_message = self.format(record)
- # Get configured log level or default to WARNING
- min_level = getattr(
- logging, self.config.log_level.upper(), logging.WARNING
- )
- if record.levelno >= min_level:
- # Format message with appropriate emoji based on log level
- emoji = (
- "🚨"
- if record.levelno >= logging.ERROR
- else "⚠️"
- if record.levelno >= logging.WARNING
- else "ℹ️"
- )
- title = f"Emergency Access {record.levelname}"
- message = f"{emoji} {record.name}: {record.getMessage()}"
- send_ntfy_notification(
- self.config.ntfy_backends_health, message, title
- )
- except Exception:
- # Don't fail the application if logging notification fails
- pass
- # Configure logging with secure-file fallback
- log_path = "/var/log/emergency-access.log"
- try:
- os.makedirs(os.path.dirname(log_path), exist_ok=True)
- # Ensure the log file exists with restrictive permissions
- if not os.path.exists(log_path):
- open(log_path, "a").close()
- try:
- os.chmod(log_path, 0o600)
- except Exception:
- # Best-effort; if this fails we continue with stream logging
- pass
- log_handlers = [logging.FileHandler(log_path), logging.StreamHandler()]
- except Exception:
- # If any issue creating the file (permissions, etc.), fall back to stream-only logging
- log_handlers = [logging.StreamHandler()]
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
- handlers=log_handlers,
- )
- logger = logging.getLogger(__name__)
- app = Flask(__name__)
- # Global config instance - initialized during main execution
- config = None
- def require_auth(key_config=None, is_health=False):
- """Decorator for HTTP Basic Authentication with simple in-memory rate limiting.
- This implements a small, best-effort per-(IP,username) rate limiter and lockout
- to mitigate brute-force attempts. It is intentionally simple and in-memory:
- - MAX_ATTEMPTS: number of failed attempts within WINDOW before lockout
- - WINDOW: window (seconds) to count attempts
- - LOCKOUT: lockout duration (seconds) after exceeding MAX_ATTEMPTS
- Notes:
- - Because this is in-memory, it resets on process restart.
- - For multi-process deployments, offload rate limiting to the reverse proxy (Caddy)
- or a shared store (redis).
- - On repeated lockouts a notification is sent (best-effort) to configured health backends
- and, when applicable, to the key's backends.
- """
- # Initialize shared state on the require_auth function object (module-global alternative)
- if not hasattr(require_auth, "_state"):
- require_auth._state = {
- "attempts": {}, # (ip, username) -> list[timestamps]
- "blocked": {}, # (ip, username) -> blocked_until_timestamp
- "lock": threading.Lock(),
- }
- MAX_ATTEMPTS = 5
- WINDOW = 300 # seconds
- LOCKOUT = 900 # seconds
- def decorator(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
- if config is None:
- logger.error("Configuration not loaded")
- return jsonify({"error": "Server configuration error"}), 500
- auth = request.authorization
- client_ip = request.remote_addr or "unknown"
- # determine username for rate limit key; use placeholder if missing
- username_for_key = auth.username if auth and auth.username else "<no_auth>"
- key = (client_ip, username_for_key)
- now_ts = time.time()
- state = require_auth._state
- # Check block list
- with state["lock"]:
- blocked_until = state["blocked"].get(key)
- if blocked_until and now_ts < blocked_until:
- retry_after = int(blocked_until - now_ts)
- logger.warning(
- f"Blocked auth attempt from {client_ip} for user '{username_for_key}' (locked until {blocked_until})"
- )
- # Return 429 Too Many Requests
- return (
- jsonify(
- {
- "error": "Too many authentication attempts, temporarily locked"
- }
- ),
- 429,
- {"Retry-After": str(retry_after)},
- )
- if not auth:
- # For missing auth, still account for attempts keyed by ip and '<no_auth>'
- # Increment attempt count and possibly lock
- with state["lock"]:
- attempts = state["attempts"].setdefault(key, [])
- # prune old timestamps
- attempts = [t for t in attempts if now_ts - t <= WINDOW]
- attempts.append(now_ts)
- state["attempts"][key] = attempts
- if len(attempts) > MAX_ATTEMPTS:
- # Lock out
- state["blocked"][key] = now_ts + LOCKOUT
- logger.error(
- f"Locking out {client_ip} for missing/invalid auth (no credentials) after repeated attempts"
- )
- # Try to notify about brute force (best-effort)
- try:
- if config and hasattr(config, "ntfy_backends_health"):
- send_ntfy_notification(
- config.ntfy_backends_health,
- f"🚨 Brute-force lockout: {client_ip} (no credentials) on server",
- "Emergency Access ALERT",
- )
- except Exception:
- pass
- return (
- jsonify(
- {
- "error": "Too many authentication attempts, temporarily locked"
- }
- ),
- 429,
- {"Retry-After": str(LOCKOUT)},
- )
- return (
- jsonify({"error": "Authentication required"}),
- 401,
- {"WWW-Authenticate": 'Basic realm="Emergency Access"'},
- )
- # Determine expected credentials
- if is_health:
- expected_username = config.health_username
- expected_password_hash = config.health_password_hash
- auth_type = "health check"
- else:
- if key_config is None:
- logger.error("Key configuration not provided for authentication")
- return jsonify({"error": "Server configuration error"}), 500
- expected_username = key_config.username
- expected_password_hash = key_config.password_hash
- auth_type = f"key '{key_config.key_id}'"
- # Verify credentials
- credential_ok = (
- auth.username == expected_username
- and Config.verify_password(auth.password, expected_password_hash)
- )
- if not credential_ok:
- # record failed attempt and check for lockout
- with state["lock"]:
- attempts = state["attempts"].setdefault(key, [])
- attempts = [t for t in attempts if now_ts - t <= WINDOW]
- attempts.append(now_ts)
- state["attempts"][key] = attempts
- if len(attempts) > MAX_ATTEMPTS:
- state["blocked"][key] = now_ts + LOCKOUT
- logger.error(
- f"Locking out {client_ip} for user '{username_for_key}' after repeated failed attempts"
- )
- # Notify about brute-force lockout (best-effort)
- try:
- notify_backends = []
- if config and hasattr(config, "ntfy_backends_health"):
- notify_backends.extend(config.ntfy_backends_health)
- # Include key-specific backends when available
- if key_config and getattr(key_config, "backends", None):
- notify_backends.extend(key_config.backends)
- if notify_backends:
- send_ntfy_notification(
- list(dict.fromkeys(notify_backends)), # deduplicate
- f"🚨 Brute-force lockout: {client_ip} user='{username_for_key}' on {auth_type}",
- "Emergency Access ALERT",
- )
- except Exception:
- pass
- return (
- jsonify(
- {
- "error": "Too many authentication attempts, temporarily locked"
- }
- ),
- 429,
- {"Retry-After": str(LOCKOUT)},
- )
- logger.warning(
- f"Authentication failed for {auth_type}: invalid credentials from {client_ip}"
- )
- return (
- jsonify({"error": "Invalid credentials"}),
- 401,
- {"WWW-Authenticate": 'Basic realm="Emergency Access"'},
- )
- # On success, clear recorded failed attempts for this key
- with state["lock"]:
- if key in state["attempts"]:
- try:
- del state["attempts"][key]
- except KeyError:
- pass
- if key in state["blocked"]:
- try:
- del state["blocked"][key]
- except KeyError:
- pass
- logger.info(f"Authentication successful for {auth_type}")
- return f(*args, **kwargs)
- return decorated_function
- return decorator
- def send_ntfy_notification(
- backends: List[str], message: str, title: str = None
- ) -> Tuple[bool, List[str]]:
- """
- Send notification using dschep/ntfy with dedicated config file
- Returns: (success, successful_backends)
- """
- successful_backends = []
- max_retries = 3
- retry_delay = 1
- for backend in backends:
- success = False
- last_error = None
- for attempt in range(max_retries):
- try:
- # Import ntfy here to avoid import issues during startup
- try:
- import ntfy
- from ntfy.config import load_config
- except ImportError:
- raise Exception(
- "ntfy package not available. Please install with: pip install ntfy"
- )
- if config is None:
- raise Exception("Configuration not loaded")
- # Load the ntfy config file
- ntfy_config = load_config(config.ntfy_config_path)
- ntfy_config["backends"] = [backend]
- # Add timeout to prevent hanging using threading
- result = [None]
- exception = [None]
- def notify_with_timeout():
- try:
- # Send notification using the backend name from our config file
- if title:
- result[0] = ntfy.notify(
- message, title=title, config=ntfy_config
- )
- else:
- result[0] = ntfy.notify(
- message, config=ntfy_config, title="Note"
- )
- except Exception as e:
- exception[0] = e
- thread = threading.Thread(target=notify_with_timeout)
- thread.daemon = True
- thread.start()
- thread.join(timeout=15) # 15 second timeout
- if thread.is_alive():
- raise Exception("Notification timeout")
- if exception[0]:
- raise exception[0]
- ret = result[0]
- if ret == 0:
- successful_backends.append(backend)
- logger.info(f"Notification sent successfully via {backend}")
- success = True
- break
- else:
- raise Exception(f"ntfy returned error code {ret}")
- except ImportError as e:
- logger.error(f"ntfy package not available for backend {backend}: {e}")
- last_error = e
- break # Don't retry import errors
- except Exception as e:
- last_error = e
- if attempt < max_retries - 1:
- logger.warning(
- f"Notification attempt {attempt + 1} failed for {backend}: {str(e)}, retrying..."
- )
- time.sleep(retry_delay * (attempt + 1)) # Exponential backoff
- else:
- logger.error(
- f"All notification attempts failed for {backend}: {str(e)}"
- )
- if not success and last_error:
- # Don't raise exception - just log failure and continue with other backends
- logger.error(
- f"Failed to send notification to {backend} after {max_retries} attempts: {str(last_error)}"
- )
- success = len(successful_backends) > 0
- return success, successful_backends
- def read_file_safely(file_path: str) -> Tuple[bool, str]:
- """
- Safely read file content
- Returns: (success, content)
- """
- try:
- if not os.path.exists(file_path):
- logger.error(f"File not found: {file_path}")
- return False, f"File not found: {file_path}"
- with open(file_path, "r") as f:
- content = f.read().strip()
- if not content:
- logger.error(f"File is empty: {file_path}")
- return False, f"File is empty: {file_path}"
- return True, content
- except PermissionError:
- logger.error(f"Permission denied reading file: {file_path}")
- return False, f"Permission denied: {file_path}"
- except Exception as e:
- logger.error(f"Failed to read file {file_path}: {str(e)}")
- return False, f"Failed to read file: {str(e)}"
- def create_key_handler(key_config):
- """Create a key access handler for a specific key configuration"""
- @require_auth(key_config=key_config)
- def get_key_part():
- """Emergency key access endpoint"""
- if key_config is None:
- logger.error("Key configuration is None in get_key_part")
- return jsonify({"error": "Server configuration error"}), 500
- logger.warning(
- f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'"
- )
- try:
- # Send notification first - fail-safe approach
- notification_success, successful_backends = send_ntfy_notification(
- key_config.backends, key_config.message, "EMERGENCY ACCESS ALERT"
- )
- if not notification_success:
- logger.error(
- f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'"
- )
- return jsonify(
- {
- "error": "Notification system failure",
- "message": "Access denied for security reasons",
- }
- ), 500
- logger.info(
- f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'"
- )
- # Read key file
- file_success, content = read_file_safely(key_config.file_path)
- if not file_success:
- logger.error(
- f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}"
- )
- return jsonify(
- {
- "error": "File access failure",
- "message": "Unable to retrieve key part",
- }
- ), 500
- logger.warning(
- f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'"
- )
- resp = jsonify(
- {
- "success": True,
- "key_id": key_config.key_id,
- "key_part": content,
- "timestamp": time.time(),
- "notified_backends": successful_backends,
- }
- )
- # Ensure responses with secret material are never cached by clients or intermediaries
- resp.headers["Cache-Control"] = (
- "no-store, no-cache, must-revalidate, private"
- )
- resp.headers["Pragma"] = "no-cache"
- resp.headers["Expires"] = "0"
- # Additional defensive headers
- resp.headers["X-Content-Type-Options"] = "nosniff"
- resp.headers["Content-Security-Policy"] = (
- "default-src 'none'; frame-ancestors 'none'; sandbox"
- )
- return resp
- except Exception as e:
- logger.error(
- f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}"
- )
- return jsonify(
- {"error": "System error", "message": "Internal server error"}
- ), 500
- return get_key_part
- @require_auth(is_health=True)
- def health_check():
- """Health check endpoint that verifies both health monitoring and all key request functionality"""
- logger.info("Health check requested")
- if config is None:
- logger.error("Configuration not loaded during health check")
- return jsonify({"status": "error", "message": "Configuration not loaded"}), 500
- try:
- # Test health notification system
- health_notification_success, health_backends = send_ntfy_notification(
- config.ntfy_backends_health, config.ntfy_health_message, "Health Check"
- )
- # Test dummy file access
- dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path)
- # Test all key files access (without exposing content)
- key_files_status = {}
- all_key_files_ok = True
- for key_id, key_config in config.keys.items():
- key_file_success, key_content = read_file_safely(key_config.file_path)
- key_files_status[key_id] = key_file_success
- if not key_file_success:
- all_key_files_ok = False
- # Test all key notification backends
- key_backends_status = {}
- all_key_backends_ok = True
- for key_id, key_config in config.keys.items():
- try:
- # Test notification without actually sending
- backend_test_success = len(key_config.backends) > 0
- key_backends_status[key_id] = {
- "backends": key_config.backends,
- "success": backend_test_success,
- }
- if not backend_test_success:
- all_key_backends_ok = False
- except Exception as e:
- key_backends_status[key_id] = {
- "backends": key_config.backends,
- "success": False,
- "error": str(e),
- }
- all_key_backends_ok = False
- # Determine overall health status
- all_systems_ok = (
- health_notification_success
- and dummy_file_success
- and all_key_files_ok
- and all_key_backends_ok
- )
- if not all_systems_ok:
- error_details = []
- if not health_notification_success:
- error_details.append("health notifications failed")
- if not dummy_file_success:
- error_details.append(f"dummy file access failed: {dummy_content}")
- # Add key-specific errors
- for key_id, status in key_files_status.items():
- if not status:
- error_details.append(f"key file access failed for '{key_id}'")
- for key_id, status in key_backends_status.items():
- if not status["success"]:
- error_msg = f"key backends failed for '{key_id}'"
- if "error" in status:
- error_msg += f": {status['error']}"
- error_details.append(error_msg)
- logger.error(f"Health check failed: {', '.join(error_details)}")
- return jsonify(
- {
- "status": "error",
- "message": "System components failed",
- "details": error_details,
- "health_notifications": health_notification_success,
- "dummy_file_access": dummy_file_success,
- "key_files_status": key_files_status,
- "key_backends_status": key_backends_status,
- }
- ), 500
- logger.info("Health check completed successfully - all systems operational")
- return jsonify(
- {
- "status": "ok",
- "timestamp": time.time(),
- "health_backends_notified": health_backends,
- "dummy_content_length": len(dummy_content),
- "keys_accessible": len(key_files_status),
- "key_files_status": key_files_status,
- "key_backends_status": key_backends_status,
- "emergency_system_ready": True,
- }
- )
- except Exception as e:
- logger.error(f"Health check error: {str(e)}")
- return jsonify(
- {"status": "error", "message": "System error", "error": str(e)}
- ), 500
- @app.errorhandler(404)
- def not_found(error):
- """Handle 404 errors silently for security"""
- logger.info(f"404 attempt: {error}")
- return jsonify({"error": "Not found"}), 404
- @app.errorhandler(500)
- def internal_error(error):
- """Handle internal server errors"""
- logger.error(f"Internal server error: {error}")
- return jsonify({"error": "Internal server error"}), 500
- def validate_setup():
- """Validate system setup before starting"""
- logger.info("Validating system setup...")
- if config is None:
- logger.error("Configuration not loaded")
- return False
- # Check dummy file exists
- if not os.path.exists(config.dummy_file_path):
- logger.error(f"Dummy file not found: {config.dummy_file_path}")
- return False
- # Test dummy file permissions
- try:
- with open(config.dummy_file_path, "r") as f:
- f.read(1)
- except Exception as e:
- logger.error(f"Dummy file permission test failed: {str(e)}")
- return False
- # Validate all key configurations
- for key_id, key_config in config.keys.items():
- logger.info(f"Validating key '{key_id}'...")
- # Check key file exists
- if not os.path.exists(key_config.file_path):
- logger.error(f"Key file not found for '{key_id}': {key_config.file_path}")
- return False
- # Test key file permissions
- try:
- with open(key_config.file_path, "r") as f:
- f.read(1)
- except Exception as e:
- logger.error(f"Key file permission test failed for '{key_id}': {str(e)}")
- return False
- # Validate backends are configured
- if not key_config.backends:
- logger.error(f"No notification backends configured for key '{key_id}'")
- return False
- # Test notification system
- logger.info("Testing notification system...")
- try:
- # Test health notification system
- health_success, _ = send_ntfy_notification(
- config.ntfy_backends_health[:1], # Test only first backend
- "System startup test - health notifications",
- "Emergency Access Startup Test",
- )
- if not health_success:
- logger.error("Health notification system test failed")
- return False
- # Test each key's notification backends
- for key_id, key_config in config.keys.items():
- key_success, _ = send_ntfy_notification(
- key_config.backends[:1], # Test only first backend
- f"System startup test - key '{key_id}' notifications",
- "Emergency Access Startup Test",
- )
- if not key_success:
- logger.error(f"Key notification system test failed for '{key_id}'")
- return False
- except Exception as e:
- logger.warning(f"Notification test failed, but continuing: {str(e)}")
- logger.info("System validation completed successfully")
- return True
- class ProductionRequestHandler(WSGIRequestHandler):
- """Custom request handler with improved error handling"""
- def log_error(self, format, *args):
- """Override to use our logger"""
- logger.error(f"HTTP Error: {format % args}")
- def log_message(self, format, *args):
- """Override to use our logger"""
- logger.info(f"HTTP: {format % args}")
- def main():
- global config
- try:
- # Load configuration
- config = Config()
- logger.info("Configuration loaded successfully")
- # Add ntfy log handler after config is loaded
- if config.send_all_logs:
- ntfy_handler = NtfyLogHandler(config)
- min_level = getattr(logging, config.log_level.upper(), logging.WARNING)
- ntfy_handler.setLevel(min_level)
- # Add to root logger to catch all application logs
- logging.getLogger().addHandler(ntfy_handler)
- # Validate system setup
- if not validate_setup():
- logger.error("System validation failed, exiting")
- sys.exit(1)
- # Add Flask routes dynamically for each key
- for key_id, key_config in config.keys.items():
- handler = create_key_handler(key_config)
- endpoint_name = f"get_key_part_{key_id}"
- app.add_url_rule(key_config.route, endpoint_name, handler, methods=["GET"])
- logger.info(
- f"Registered authenticated key route for '{key_id}': {key_config.route}"
- )
- # Add health check route
- app.add_url_rule(
- config.health_route, "health_check", health_check, methods=["GET"]
- )
- logger.info(
- f"Starting emergency access server on {config.server_host}:{config.server_port}"
- )
- logger.info(f"Health route: {config.health_route}")
- logger.info(f"Configured {len(config.keys)} key(s)")
- # Use production-ready server with better error handling
- try:
- from waitress import serve
- logger.info("Using Waitress WSGI server for production")
- serve(
- app,
- host=config.server_host,
- port=config.server_port,
- threads=6,
- connection_limit=100,
- cleanup_interval=30,
- channel_timeout=120,
- )
- except ImportError:
- logger.critical(
- "Waitress WSGI server not installed. This application must run under a production WSGI server such as 'waitress' or 'gunicorn'. Aborting startup."
- )
- # Attempt to notify configured health backends about startup failure (best-effort)
- try:
- if config and hasattr(config, "ntfy_backends_health"):
- send_ntfy_notification(
- config.ntfy_backends_health,
- "🚨 Emergency Access server failed to start: waitress not installed",
- "Emergency Access CRITICAL",
- )
- except Exception:
- # Do not raise further - we are already aborting startup
- pass
- sys.exit(1)
- except KeyboardInterrupt:
- logger.info("Received keyboard interrupt, shutting down gracefully")
- except Exception as e:
- logger.error(f"Failed to start server: {str(e)}")
- if config and hasattr(config, "ntfy_backends_health"):
- try:
- send_ntfy_notification(
- config.ntfy_backends_health,
- f"🚨 Emergency Access server failed to start: {str(e)}",
- "Emergency Access CRITICAL",
- )
- except Exception:
- pass
- sys.exit(1)
- finally:
- logger.info("Emergency Access server shutdown complete")
- if __name__ == "__main__":
- main()
|