| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641 |
- #!/usr/bin/env python3
- import os
- import sys
- import logging
- import time
- import tempfile
- import threading
- import subprocess
- from flask import Flask, jsonify, request
- from werkzeug.security import check_password_hash
- from werkzeug.serving import WSGIRequestHandler
- from functools import wraps
- from config import Config
- from typing import List, Tuple
- import base64
- import signal
- # Configure logging with custom handler
- class NtfyLogHandler(logging.Handler):
- """Custom logging handler that sends logs to ntfy health backends"""
- def __init__(self, config_obj):
- super().__init__()
- self.config = config_obj
- def emit(self, record):
- """Send log record to health backends"""
- if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs:
- try:
- log_message = self.format(record)
- # Get configured log level or default to WARNING
- min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING)
- if record.levelno >= min_level:
- # Format message with appropriate emoji based on log level
- emoji = "🚨" if record.levelno >= logging.ERROR else "⚠️" if record.levelno >= logging.WARNING else "ℹ️"
- title = f"Emergency Access {record.levelname}"
- message = f"{emoji} {record.name}: {record.getMessage()}"
- send_ntfy_notification(
- self.config.ntfy_backends_health,
- message,
- title
- )
- except Exception:
- # Don't fail the application if logging notification fails
- pass
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('/var/log/emergency-access.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- app = Flask(__name__)
- # Global config instance - initialized during main execution
- config = None
- # Systemd watchdog support
- watchdog_interval = None
- last_watchdog_time = 0
- def setup_systemd_watchdog():
- """Setup systemd watchdog notifications"""
- global watchdog_interval
- import os
- import shutil
- # Check if we have systemd watchdog enabled
- watchdog_usec = os.environ.get('WATCHDOG_USEC')
- if not watchdog_usec:
- logger.info("Systemd watchdog not enabled")
- return False
- # Check if systemd-notify command exists
- if not shutil.which('systemd-notify'):
- logger.warning("systemd-notify command not found, watchdog disabled")
- return False
- try:
- # Convert microseconds to seconds and send notifications at half the interval
- watchdog_interval = int(watchdog_usec) / 2000000 # Half interval in seconds
- logger.info(f"Systemd watchdog enabled, will send notifications every {watchdog_interval:.1f}s")
- return True
- except Exception as e:
- logger.warning(f"Failed to setup systemd watchdog: {e}")
- return False
- def send_watchdog_if_needed():
- """Send watchdog notification if it's time"""
- global last_watchdog_time
- if watchdog_interval is None:
- return
- current_time = time.time()
- if current_time - last_watchdog_time >= watchdog_interval:
- try:
- result = subprocess.run(['systemd-notify', 'WATCHDOG=1'],
- check=False, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL, timeout=2)
- last_watchdog_time = current_time
- if result.returncode != 0:
- logger.debug(f"systemd-notify returned {result.returncode}")
- except Exception as e:
- logger.warning(f"Failed to send watchdog notification: {e}")
- def require_auth(key_config=None, is_health=False):
- """Decorator for HTTP Basic Authentication"""
- def decorator(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
- if config is None:
- logger.error("Configuration not loaded")
- return jsonify({'error': 'Server configuration error'}), 500
- auth = request.authorization
- if not auth:
- return jsonify({'error': 'Authentication required'}), 401, {
- 'WWW-Authenticate': 'Basic realm="Emergency Access"'
- }
- # Determine expected credentials
- if is_health:
- expected_username = config.health_username
- expected_password_hash = config.health_password_hash
- auth_type = "health check"
- else:
- if key_config is None:
- logger.error("Key configuration not provided for authentication")
- return jsonify({'error': 'Server configuration error'}), 500
- expected_username = key_config.username
- expected_password_hash = key_config.password_hash
- auth_type = f"key '{key_config.key_id}'"
- # Verify credentials
- if (auth.username != expected_username or
- not Config.verify_password(auth.password, expected_password_hash)):
- logger.warning(f"Authentication failed for {auth_type}: invalid credentials from {request.remote_addr}")
- return jsonify({'error': 'Invalid credentials'}), 401, {
- 'WWW-Authenticate': 'Basic realm="Emergency Access"'
- }
- logger.info(f"Authentication successful for {auth_type}")
- return f(*args, **kwargs)
- return decorated_function
- return decorator
- def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]:
- """
- Send notification using dschep/ntfy with dedicated config file
- Returns: (success, successful_backends)
- """
- successful_backends = []
- max_retries = 3
- retry_delay = 1
- for backend in backends:
- success = False
- last_error = None
- for attempt in range(max_retries):
- try:
- # Import ntfy here to avoid import issues during startup
- try:
- import ntfy
- from ntfy.config import load_config
- except ImportError:
- raise Exception("ntfy package not available. Please install with: pip install ntfy")
- if config is None:
- raise Exception("Configuration not loaded")
- # Load the ntfy config file
- ntfy_config = load_config(config.ntfy_config_path)
- ntfy_config["backends"] = [backend]
- # Add timeout to prevent hanging using threading
- result = [None]
- exception = [None]
- def notify_with_timeout():
- try:
- # Send notification using the backend name from our config file
- if title:
- result[0] = ntfy.notify(message, title=title, config=ntfy_config)
- else:
- result[0] = ntfy.notify(message, config=ntfy_config, title="Note")
- except Exception as e:
- exception[0] = e
- thread = threading.Thread(target=notify_with_timeout)
- thread.daemon = True
- thread.start()
- thread.join(timeout=15) # 15 second timeout
- if thread.is_alive():
- raise Exception("Notification timeout")
- if exception[0]:
- raise exception[0]
- ret = result[0]
- if ret == 0:
- successful_backends.append(backend)
- logger.info(f"Notification sent successfully via {backend}")
- success = True
- break
- else:
- raise Exception(f"ntfy returned error code {ret}")
- except ImportError as e:
- logger.error(f"ntfy package not available for backend {backend}: {e}")
- last_error = e
- break # Don't retry import errors
- except Exception as e:
- last_error = e
- if attempt < max_retries - 1:
- logger.warning(f"Notification attempt {attempt + 1} failed for {backend}: {str(e)}, retrying...")
- time.sleep(retry_delay * (attempt + 1)) # Exponential backoff
- else:
- logger.error(f"All notification attempts failed for {backend}: {str(e)}")
- if not success and last_error:
- # Don't raise exception - just log failure and continue with other backends
- logger.error(f"Failed to send notification to {backend} after {max_retries} attempts: {str(last_error)}")
- success = len(successful_backends) > 0
- return success, successful_backends
- def read_file_safely(file_path: str) -> Tuple[bool, str]:
- """
- Safely read file content
- Returns: (success, content)
- """
- try:
- if not os.path.exists(file_path):
- logger.error(f"File not found: {file_path}")
- return False, f"File not found: {file_path}"
- with open(file_path, 'r') as f:
- content = f.read().strip()
- if not content:
- logger.error(f"File is empty: {file_path}")
- return False, f"File is empty: {file_path}"
- return True, content
- except PermissionError:
- logger.error(f"Permission denied reading file: {file_path}")
- return False, f"Permission denied: {file_path}"
- except Exception as e:
- logger.error(f"Failed to read file {file_path}: {str(e)}")
- return False, f"Failed to read file: {str(e)}"
- def create_key_handler(key_config):
- """Create a key access handler for a specific key configuration"""
- @require_auth(key_config=key_config)
- def get_key_part():
- """Emergency key access endpoint"""
- if key_config is None:
- logger.error("Key configuration is None in get_key_part")
- return jsonify({'error': 'Server configuration error'}), 500
- logger.warning(f"EMERGENCY: Key access attempt detected for key '{key_config.key_id}'")
- # Send watchdog notification
- send_watchdog_if_needed()
- try:
- # Send notification first - fail-safe approach
- notification_success, successful_backends = send_ntfy_notification(
- key_config.backends,
- key_config.message,
- "EMERGENCY ACCESS ALERT"
- )
- if not notification_success:
- logger.error(f"CRITICAL: Failed to send notifications to any backend for key '{key_config.key_id}'")
- return jsonify({
- 'error': 'Notification system failure',
- 'message': 'Access denied for security reasons'
- }), 500
- logger.info(f"Notifications sent successfully to: {successful_backends} for key '{key_config.key_id}'")
- # Read key file
- file_success, content = read_file_safely(key_config.file_path)
- if not file_success:
- logger.error(f"CRITICAL: Failed to read key file for '{key_config.key_id}': {content}")
- return jsonify({
- 'error': 'File access failure',
- 'message': 'Unable to retrieve key part'
- }), 500
- logger.warning(f"EMERGENCY: Key part successfully retrieved and sent for key '{key_config.key_id}'")
- return jsonify({
- 'success': True,
- 'key_id': key_config.key_id,
- 'key_part': content,
- 'timestamp': time.time(),
- 'notified_backends': successful_backends
- })
- except Exception as e:
- logger.error(f"CRITICAL: Unexpected error in key access for '{key_config.key_id}': {str(e)}")
- return jsonify({
- 'error': 'System error',
- 'message': 'Internal server error'
- }), 500
- return get_key_part
- @require_auth(is_health=True)
- def health_check():
- """Health check endpoint that verifies both health monitoring and all key request functionality"""
- logger.info("Health check requested")
- # Send watchdog notification
- send_watchdog_if_needed()
- if config is None:
- logger.error("Configuration not loaded during health check")
- return jsonify({'status': 'error', 'message': 'Configuration not loaded'}), 500
- try:
- # Test health notification system
- health_notification_success, health_backends = send_ntfy_notification(
- config.ntfy_backends_health,
- config.ntfy_health_message,
- "Health Check"
- )
- # Test dummy file access
- dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path)
- # Test all key files access (without exposing content)
- key_files_status = {}
- all_key_files_ok = True
- for key_id, key_config in config.keys.items():
- key_file_success, key_content = read_file_safely(key_config.file_path)
- key_files_status[key_id] = key_file_success
- if not key_file_success:
- all_key_files_ok = False
- # Test all key notification backends
- key_backends_status = {}
- all_key_backends_ok = True
- for key_id, key_config in config.keys.items():
- try:
- # Test notification without actually sending
- backend_test_success = len(key_config.backends) > 0
- key_backends_status[key_id] = {
- 'backends': key_config.backends,
- 'success': backend_test_success
- }
- if not backend_test_success:
- all_key_backends_ok = False
- except Exception as e:
- key_backends_status[key_id] = {
- 'backends': key_config.backends,
- 'success': False,
- 'error': str(e)
- }
- all_key_backends_ok = False
- # Determine overall health status
- all_systems_ok = (health_notification_success and
- dummy_file_success and
- all_key_files_ok and
- all_key_backends_ok)
- if not all_systems_ok:
- error_details = []
- if not health_notification_success:
- error_details.append("health notifications failed")
- if not dummy_file_success:
- error_details.append(f"dummy file access failed: {dummy_content}")
- # Add key-specific errors
- for key_id, status in key_files_status.items():
- if not status:
- error_details.append(f"key file access failed for '{key_id}'")
- for key_id, status in key_backends_status.items():
- if not status['success']:
- error_msg = f"key backends failed for '{key_id}'"
- if 'error' in status:
- error_msg += f": {status['error']}"
- error_details.append(error_msg)
- logger.error(f"Health check failed: {', '.join(error_details)}")
- return jsonify({
- 'status': 'error',
- 'message': 'System components failed',
- 'details': error_details,
- 'health_notifications': health_notification_success,
- 'dummy_file_access': dummy_file_success,
- 'key_files_status': key_files_status,
- 'key_backends_status': key_backends_status
- }), 500
- logger.info("Health check completed successfully - all systems operational")
- return jsonify({
- 'status': 'ok',
- 'timestamp': time.time(),
- 'health_backends_notified': health_backends,
- 'dummy_content_length': len(dummy_content),
- 'keys_accessible': len(key_files_status),
- 'key_files_status': key_files_status,
- 'key_backends_status': key_backends_status,
- 'emergency_system_ready': True
- })
- except Exception as e:
- logger.error(f"Health check error: {str(e)}")
- return jsonify({
- 'status': 'error',
- 'message': 'System error',
- 'error': str(e)
- }), 500
- @app.errorhandler(404)
- def not_found(error):
- """Handle 404 errors silently for security"""
- logger.warning(f"404 attempt: {error}")
- return jsonify({'error': 'Not found'}), 404
- @app.errorhandler(500)
- def internal_error(error):
- """Handle internal server errors"""
- logger.error(f"Internal server error: {error}")
- return jsonify({'error': 'Internal server error'}), 500
- def validate_setup():
- """Validate system setup before starting"""
- logger.info("Validating system setup...")
- if config is None:
- logger.error("Configuration not loaded")
- return False
- # Check dummy file exists
- if not os.path.exists(config.dummy_file_path):
- logger.error(f"Dummy file not found: {config.dummy_file_path}")
- return False
- # Test dummy file permissions
- try:
- with open(config.dummy_file_path, 'r') as f:
- f.read(1)
- except Exception as e:
- logger.error(f"Dummy file permission test failed: {str(e)}")
- return False
- # Validate all key configurations
- for key_id, key_config in config.keys.items():
- logger.info(f"Validating key '{key_id}'...")
- # Check key file exists
- if not os.path.exists(key_config.file_path):
- logger.error(f"Key file not found for '{key_id}': {key_config.file_path}")
- return False
- # Test key file permissions
- try:
- with open(key_config.file_path, 'r') as f:
- f.read(1)
- except Exception as e:
- logger.error(f"Key file permission test failed for '{key_id}': {str(e)}")
- return False
- # Validate backends are configured
- if not key_config.backends:
- logger.error(f"No notification backends configured for key '{key_id}'")
- return False
- # Test notification system
- logger.info("Testing notification system...")
- try:
- # Test health notification system
- health_success, _ = send_ntfy_notification(
- config.ntfy_backends_health[:1], # Test only first backend
- "System startup test - health notifications",
- "Emergency Access Startup Test"
- )
- if not health_success:
- logger.error("Health notification system test failed")
- return False
- # Test each key's notification backends
- for key_id, key_config in config.keys.items():
- key_success, _ = send_ntfy_notification(
- key_config.backends[:1], # Test only first backend
- f"System startup test - key '{key_id}' notifications",
- "Emergency Access Startup Test"
- )
- if not key_success:
- logger.error(f"Key notification system test failed for '{key_id}'")
- return False
- except Exception as e:
- logger.warning(f"Notification test failed, but continuing: {str(e)}")
- logger.info("System validation completed successfully")
- return True
- class ProductionRequestHandler(WSGIRequestHandler):
- """Custom request handler with improved error handling"""
- def log_error(self, format, *args):
- """Override to use our logger"""
- logger.error(f"HTTP Error: {format % args}")
- def log_message(self, format, *args):
- """Override to use our logger"""
- logger.info(f"HTTP: {format % args}")
- def main():
- global config
- try:
- # Load configuration
- config = Config()
- logger.info("Configuration loaded successfully")
- # Add ntfy log handler after config is loaded
- if config.send_all_logs:
- ntfy_handler = NtfyLogHandler(config)
- min_level = getattr(logging, config.log_level.upper(), logging.WARNING)
- ntfy_handler.setLevel(min_level)
- # Add to root logger to catch all application logs
- logging.getLogger().addHandler(ntfy_handler)
- # Validate system setup
- if not validate_setup():
- logger.error("System validation failed, exiting")
- sys.exit(1)
- # Add Flask routes dynamically for each key
- for key_id, key_config in config.keys.items():
- handler = create_key_handler(key_config)
- endpoint_name = f'get_key_part_{key_id}'
- app.add_url_rule(key_config.route, endpoint_name, handler, methods=['GET'])
- logger.info(f"Registered authenticated key route for '{key_id}': {key_config.route}")
- # Add health check route
- app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET'])
- # Add Flask before_request handler for watchdog
- @app.before_request
- def before_request():
- send_watchdog_if_needed()
- # Setup systemd watchdog
- watchdog_enabled = setup_systemd_watchdog()
- logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}")
- logger.info(f"Health route: {config.health_route}")
- logger.info(f"Configured {len(config.keys)} key(s)")
- logger.info(f"Systemd watchdog: {'enabled' if watchdog_enabled else 'disabled'}")
- # Notify systemd that we're ready
- try:
- import shutil
- if shutil.which('systemd-notify'):
- result = subprocess.run(['systemd-notify', 'READY=1'],
- check=False, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL, timeout=10)
- if result.returncode == 0:
- logger.info("Successfully sent READY=1 to systemd")
- else:
- logger.warning(f"systemd-notify returned {result.returncode}")
- else:
- logger.info("systemd-notify not available, skipping READY notification")
- except subprocess.TimeoutExpired:
- logger.error("Timeout sending READY=1 to systemd")
- except Exception as e:
- logger.warning(f"Failed to notify systemd ready: {e}")
- # Send initial watchdog notification
- send_watchdog_if_needed()
- # Use production-ready server with better error handling
- try:
- from waitress import serve
- logger.info("Using Waitress WSGI server for production")
- serve(
- app,
- host=config.server_host,
- port=config.server_port,
- threads=6,
- connection_limit=100,
- cleanup_interval=30,
- channel_timeout=120
- )
- except ImportError:
- logger.warning("Waitress not available, falling back to Flask dev server")
- app.run(
- host=config.server_host,
- port=config.server_port,
- debug=False,
- threaded=True,
- request_handler=ProductionRequestHandler
- )
- except KeyboardInterrupt:
- logger.info("Received keyboard interrupt, shutting down gracefully")
- except Exception as e:
- logger.error(f"Failed to start server: {str(e)}")
- if config and hasattr(config, 'ntfy_backends_health'):
- try:
- send_ntfy_notification(
- config.ntfy_backends_health,
- f"🚨 Emergency Access server failed to start: {str(e)}",
- "Emergency Access CRITICAL"
- )
- except Exception:
- pass
- sys.exit(1)
- finally:
- logger.info("Emergency Access server shutdown complete")
- if __name__ == '__main__':
- main()
|