| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- #!/usr/bin/env python3
- import os
- import sys
- import logging
- import time
- import tempfile
- from flask import Flask, jsonify
- from config import Config
- from typing import List, Tuple
- # Configure logging with custom handler
- class NtfyLogHandler(logging.Handler):
- """Custom logging handler that sends logs to ntfy health backends"""
- def __init__(self, config_obj):
- super().__init__()
- self.config = config_obj
- def emit(self, record):
- """Send log record to health backends"""
- if hasattr(self.config, 'ntfy_backends_health') and self.config.send_all_logs:
- try:
- log_message = self.format(record)
- # Get configured log level or default to WARNING
- min_level = getattr(logging, self.config.log_level.upper(), logging.WARNING)
- if record.levelno >= min_level:
- # Format message with appropriate emoji based on log level
- emoji = "🚨" if record.levelno >= logging.ERROR else "⚠️" if record.levelno >= logging.WARNING else "ℹ️"
- title = f"Emergency Access {record.levelname}"
- message = f"{emoji} {record.name}: {record.getMessage()}"
- send_ntfy_notification(
- self.config.ntfy_backends_health,
- message,
- title
- )
- except Exception:
- # Don't fail the application if logging notification fails
- pass
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('/var/log/emergency-access.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- app = Flask(__name__)
- # Global config instance
- config = None
- def send_ntfy_notification(backends: List[str], message: str, title: str = None) -> Tuple[bool, List[str]]:
- """
- Send notification using dschep/ntfy with dedicated config file
- Returns: (success, successful_backends)
- """
- successful_backends = []
- for backend in backends:
- try:
- # Import ntfy here to avoid import issues during startup
- import ntfy
- from ntfy.config import load_config
- # Load the ntfy config file
- ntfy_config = load_config(config.ntfy_config_path)
- ntfy_config["backends"] = [backend]
- # Send notification using the backend name from our config file
- if title:
- ret = ntfy.notify(message, title=title, config=ntfy_config)
- else:
- ret = ntfy.notify(message, config=ntfy_config, title="Note")
- if ret == 0:
- successful_backends.append(backend)
- logger.info(f"Notification sent successfully via {backend}")
- else:
- logger.error(f"Failed to send notification via {backend}")
- raise Exception(f"Failed to send notification via {backend}")
- except ImportError:
- logger.error(f"ntfy package not available for backend {backend}")
- except Exception as e:
- logger.error(f"Failed to send notification to {backend}: {str(e)}")
- raise
- success = len(successful_backends) > 0
- return success, successful_backends
- def read_file_safely(file_path: str) -> Tuple[bool, str]:
- """
- Safely read file content
- Returns: (success, content)
- """
- try:
- if not os.path.exists(file_path):
- logger.error(f"File not found: {file_path}")
- return False, f"File not found: {file_path}"
- with open(file_path, 'r') as f:
- content = f.read().strip()
- if not content:
- logger.error(f"File is empty: {file_path}")
- return False, f"File is empty: {file_path}"
- return True, content
- except PermissionError:
- logger.error(f"Permission denied reading file: {file_path}")
- return False, f"Permission denied: {file_path}"
- except Exception as e:
- logger.error(f"Failed to read file {file_path}: {str(e)}")
- return False, f"Failed to read file: {str(e)}"
- def get_key_part():
- """Emergency key access endpoint"""
- logger.warning("EMERGENCY: Key access attempt detected")
- try:
- # Send notification first - fail-safe approach
- notification_success, successful_backends = send_ntfy_notification(
- config.ntfy_backends_key,
- config.ntfy_key_message,
- "EMERGENCY ACCESS ALERT"
- )
- if not notification_success:
- logger.error("CRITICAL: Failed to send notifications to any backend")
- return jsonify({
- 'error': 'Notification system failure',
- 'message': 'Access denied for security reasons'
- }), 500
- logger.info(f"Notifications sent successfully to: {successful_backends}")
- # Read key file
- file_success, content = read_file_safely(config.key_file_path)
- if not file_success:
- logger.error(f"CRITICAL: Failed to read key file: {content}")
- return jsonify({
- 'error': 'File access failure',
- 'message': 'Unable to retrieve key part'
- }), 500
- logger.warning("EMERGENCY: Key part successfully retrieved and sent")
- return jsonify({
- 'success': True,
- 'key_part': content,
- 'timestamp': time.time(),
- 'notified_backends': successful_backends
- })
- except Exception as e:
- logger.error(f"CRITICAL: Unexpected error in key access: {str(e)}")
- return jsonify({
- 'error': 'System error',
- 'message': 'Internal server error'
- }), 500
- def health_check():
- """Health check endpoint that verifies both health monitoring and key request functionality"""
- logger.info("Health check requested")
- try:
- # Test health notification system
- health_notification_success, health_backends = send_ntfy_notification(
- config.ntfy_backends_health,
- config.ntfy_health_message,
- "Health Check"
- )
- # Test key notification system (without triggering emergency alert)
- key_test_message = "🔧 Emergency access system health verification - key notification test"
- key_notification_success, key_backends = send_ntfy_notification(
- config.ntfy_backends_key,
- key_test_message,
- "System Health Check"
- )
- # Test dummy file access
- dummy_file_success, dummy_content = read_file_safely(config.dummy_file_path)
- # Test actual key file access (without exposing content)
- key_file_success, key_content = read_file_safely(config.key_file_path)
- # Determine overall health status
- all_systems_ok = (health_notification_success and key_notification_success and
- dummy_file_success and key_file_success)
- if not all_systems_ok:
- error_details = []
- if not health_notification_success:
- error_details.append("health notifications failed")
- if not key_notification_success:
- error_details.append("key notifications failed")
- if not dummy_file_success:
- error_details.append(f"dummy file access failed: {dummy_content}")
- if not key_file_success:
- error_details.append(f"key file access failed: {key_content}")
- logger.error(f"Health check failed: {', '.join(error_details)}")
- return jsonify({
- 'status': 'error',
- 'message': 'System components failed',
- 'details': error_details,
- 'health_notifications': health_notification_success,
- 'key_notifications': key_notification_success,
- 'dummy_file_access': dummy_file_success,
- 'key_file_access': key_file_success
- }), 500
- logger.info("Health check completed successfully - all systems operational")
- return jsonify({
- 'status': 'ok',
- 'timestamp': time.time(),
- 'health_backends_notified': health_backends,
- 'key_backends_tested': key_backends,
- 'dummy_content_length': len(dummy_content),
- 'key_file_accessible': True,
- 'emergency_system_ready': True
- })
- except Exception as e:
- logger.error(f"Health check error: {str(e)}")
- return jsonify({
- 'status': 'error',
- 'message': 'System error',
- 'error': str(e)
- }), 500
- @app.errorhandler(404)
- def not_found(error):
- """Handle 404 errors silently for security"""
- logger.warning(f"404 attempt: {error}")
- return jsonify({'error': 'Not found'}), 404
- @app.errorhandler(500)
- def internal_error(error):
- """Handle internal server errors"""
- logger.error(f"Internal server error: {error}")
- return jsonify({'error': 'Internal server error'}), 500
- def validate_setup():
- """Validate system setup before starting"""
- logger.info("Validating system setup...")
- # Check config files exist
- if not os.path.exists(config.key_file_path):
- logger.error(f"Key file not found: {config.key_file_path}")
- return False
- if not os.path.exists(config.dummy_file_path):
- logger.error(f"Dummy file not found: {config.dummy_file_path}")
- return False
- # Test file permissions
- try:
- with open(config.key_file_path, 'r') as f:
- f.read(1)
- with open(config.dummy_file_path, 'r') as f:
- f.read(1)
- except Exception as e:
- logger.error(f"File permission test failed: {str(e)}")
- return False
- # Test notification system
- logger.info("Testing notification system...")
- try:
- key_success, _ = send_ntfy_notification(
- config.ntfy_backends_key[:1], # Test only first backend
- "System startup test - key notifications",
- "Emergency Access Startup Test"
- )
- health_success, _ = send_ntfy_notification(
- config.ntfy_backends_health[:1], # Test only first backend
- "System startup test - health notifications",
- "Emergency Access Startup Test"
- )
- if not key_success:
- logger.error("Key notification system test failed")
- return False
- if not health_success:
- logger.error("Health notification system test failed")
- return False
- except Exception as e:
- logger.warning(f"Notification test failed, but continuing: {str(e)}")
- logger.info("System validation completed successfully")
- return True
- if __name__ == '__main__':
- try:
- # Load configuration
- config = Config()
- logger.info("Configuration loaded successfully")
- # Add ntfy log handler after config is loaded
- if config.send_all_logs:
- ntfy_handler = NtfyLogHandler(config)
- min_level = getattr(logging, config.log_level.upper(), logging.WARNING)
- ntfy_handler.setLevel(min_level)
- # Add to root logger to catch all application logs
- logging.getLogger().addHandler(ntfy_handler)
- # Validate system setup
- if not validate_setup():
- logger.error("System validation failed, exiting")
- sys.exit(1)
- # Add Flask routes with config values
- app.add_url_rule(config.key_route, 'get_key_part', get_key_part, methods=['GET'])
- app.add_url_rule(config.health_route, 'health_check', health_check, methods=['GET'])
- logger.info(f"Starting emergency access server on {config.server_host}:{config.server_port}")
- logger.info(f"Key route: {config.key_route}")
- logger.info(f"Health route: {config.health_route}")
- # Run the server on local port for Caddy reverse proxy
- app.run(
- host=config.server_host,
- port=config.server_port,
- debug=False,
- threaded=True
- )
- except Exception as e:
- logger.error(f"Failed to start server: {str(e)}")
- sys.exit(1)
|