|
|
@@ -0,0 +1,430 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+
|
|
|
+"""
|
|
|
+Test script for Emergency Access Server
|
|
|
+Validates system configuration, file access, notifications, and endpoints
|
|
|
+"""
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import json
|
|
|
+import time
|
|
|
+import yaml
|
|
|
+import tempfile
|
|
|
+import subprocess
|
|
|
+from typing import Dict, List, Tuple, Any
|
|
|
+from config import Config
|
|
|
+
|
|
|
+class EmergencyAccessTester:
|
|
|
+ def __init__(self, config_path: str = None):
|
|
|
+ """Initialize tester with configuration"""
|
|
|
+ self.config = Config(config_path)
|
|
|
+ self.test_results = []
|
|
|
+ self.server_process = None
|
|
|
+
|
|
|
+ def log_test(self, test_name: str, success: bool, message: str = ""):
|
|
|
+ """Log test result"""
|
|
|
+ status = "PASS" if success else "FAIL"
|
|
|
+ result = {
|
|
|
+ 'test': test_name,
|
|
|
+ 'status': status,
|
|
|
+ 'message': message,
|
|
|
+ 'timestamp': time.time()
|
|
|
+ }
|
|
|
+ self.test_results.append(result)
|
|
|
+
|
|
|
+ color = '\033[92m' if success else '\033[91m' # Green or Red
|
|
|
+ reset = '\033[0m'
|
|
|
+ print(f"{color}[{status}]{reset} {test_name}: {message}")
|
|
|
+
|
|
|
+ return success
|
|
|
+
|
|
|
+ def test_config_loading(self) -> bool:
|
|
|
+ """Test configuration file loading"""
|
|
|
+ try:
|
|
|
+ # Test all required config properties
|
|
|
+ self.config.server_host
|
|
|
+ self.config.server_port
|
|
|
+ self.config.key_route
|
|
|
+ self.config.health_route
|
|
|
+ self.config.key_file_path
|
|
|
+ self.config.dummy_file_path
|
|
|
+ self.config.ntfy_backends_key
|
|
|
+ self.config.ntfy_backends_health
|
|
|
+
|
|
|
+ return self.log_test("Config Loading", True, "All config properties accessible")
|
|
|
+ except Exception as e:
|
|
|
+ return self.log_test("Config Loading", False, f"Error: {str(e)}")
|
|
|
+
|
|
|
+ def test_file_access(self) -> bool:
|
|
|
+ """Test access to key and dummy files"""
|
|
|
+ success = True
|
|
|
+
|
|
|
+ # Test key file
|
|
|
+ try:
|
|
|
+ if not os.path.exists(self.config.key_file_path):
|
|
|
+ self.log_test("Key File Exists", False, f"File not found: {self.config.key_file_path}")
|
|
|
+ success = False
|
|
|
+ else:
|
|
|
+ with open(self.config.key_file_path, 'r') as f:
|
|
|
+ content = f.read().strip()
|
|
|
+ if not content:
|
|
|
+ self.log_test("Key File Content", False, "Key file is empty")
|
|
|
+ success = False
|
|
|
+ else:
|
|
|
+ self.log_test("Key File Access", True, f"Key file readable, length: {len(content)}")
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("Key File Access", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ # Test dummy file
|
|
|
+ try:
|
|
|
+ if not os.path.exists(self.config.dummy_file_path):
|
|
|
+ self.log_test("Dummy File Exists", False, f"File not found: {self.config.dummy_file_path}")
|
|
|
+ success = False
|
|
|
+ else:
|
|
|
+ with open(self.config.dummy_file_path, 'r') as f:
|
|
|
+ content = f.read().strip()
|
|
|
+ if not content:
|
|
|
+ self.log_test("Dummy File Content", False, "Dummy file is empty")
|
|
|
+ success = False
|
|
|
+ else:
|
|
|
+ self.log_test("Dummy File Access", True, f"Dummy file readable, length: {len(content)}")
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("Dummy File Access", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ return success
|
|
|
+
|
|
|
+ def test_ntfy_connectivity(self) -> bool:
|
|
|
+ """Test ntfy backend connectivity"""
|
|
|
+ success = True
|
|
|
+
|
|
|
+ # Test importing ntfy
|
|
|
+ try:
|
|
|
+ import ntfy
|
|
|
+ self.log_test("ntfy Import", True, "dschep/ntfy package available")
|
|
|
+ except ImportError:
|
|
|
+ self.log_test("ntfy Import", False, "dschep/ntfy package not installed")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Test key notification backends
|
|
|
+ for backend in self.config.ntfy_backends_key:
|
|
|
+ try:
|
|
|
+ # Just verify the backend name is valid (assumes global ntfy config is set up)
|
|
|
+ # We don't actually send notifications during testing
|
|
|
+ if backend and isinstance(backend, str) and len(backend.strip()) > 0:
|
|
|
+ self.log_test(f"Key Backend: {backend}", True, "Backend name valid (using global ntfy config)")
|
|
|
+ else:
|
|
|
+ self.log_test(f"Key Backend: {backend}", False, "Invalid backend name")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test(f"Key Backend: {backend}", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ # Test health notification backends
|
|
|
+ for backend in self.config.ntfy_backends_health:
|
|
|
+ try:
|
|
|
+ # Similar validation for health backends
|
|
|
+ if backend and isinstance(backend, str) and len(backend.strip()) > 0:
|
|
|
+ self.log_test(f"Health Backend: {backend}", True, "Backend name valid (using global ntfy config)")
|
|
|
+ else:
|
|
|
+ self.log_test(f"Health Backend: {backend}", False, "Invalid backend name")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test(f"Health Backend: {backend}", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ return success
|
|
|
+
|
|
|
+ def start_test_server(self) -> bool:
|
|
|
+ """Start the server for endpoint testing"""
|
|
|
+ try:
|
|
|
+ import subprocess
|
|
|
+ import time
|
|
|
+
|
|
|
+ # Start server in background
|
|
|
+ cmd = [sys.executable, "main.py"]
|
|
|
+ env = os.environ.copy()
|
|
|
+ env['EMERGENCY_CONFIG'] = self.config.config_path
|
|
|
+
|
|
|
+ self.server_process = subprocess.Popen(
|
|
|
+ cmd,
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE,
|
|
|
+ env=env
|
|
|
+ )
|
|
|
+
|
|
|
+ # Wait for server to start
|
|
|
+ time.sleep(3)
|
|
|
+
|
|
|
+ # Check if server is running
|
|
|
+ if self.server_process.poll() is None:
|
|
|
+ return self.log_test("Server Start", True, "Server started successfully")
|
|
|
+ else:
|
|
|
+ stdout, stderr = self.server_process.communicate()
|
|
|
+ error_msg = stderr.decode() if stderr else "Unknown error"
|
|
|
+ return self.log_test("Server Start", False, f"Server failed to start: {error_msg}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return self.log_test("Server Start", False, f"Error: {str(e)}")
|
|
|
+
|
|
|
+ def stop_test_server(self):
|
|
|
+ """Stop the test server"""
|
|
|
+ if self.server_process and self.server_process.poll() is None:
|
|
|
+ self.server_process.terminate()
|
|
|
+ try:
|
|
|
+ self.server_process.wait(timeout=5)
|
|
|
+ except subprocess.TimeoutExpired:
|
|
|
+ self.server_process.kill()
|
|
|
+ self.server_process.wait()
|
|
|
+ self.log_test("Server Stop", True, "Server stopped successfully")
|
|
|
+
|
|
|
+ def test_endpoints(self) -> bool:
|
|
|
+ """Test server endpoints"""
|
|
|
+ success = True
|
|
|
+ base_url = f"http://{self.config.server_host}:{self.config.server_port}"
|
|
|
+
|
|
|
+ # Test health endpoint
|
|
|
+ try:
|
|
|
+ import requests
|
|
|
+ response = requests.get(
|
|
|
+ f"{base_url}{self.config.health_route}",
|
|
|
+ timeout=30
|
|
|
+ )
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ data = response.json()
|
|
|
+ if data.get('status') == 'ok':
|
|
|
+ self.log_test("Health Endpoint", True, f"Response: {response.status_code}")
|
|
|
+ else:
|
|
|
+ self.log_test("Health Endpoint", False, f"Invalid response: {data}")
|
|
|
+ success = False
|
|
|
+ else:
|
|
|
+ self.log_test("Health Endpoint", False, f"HTTP {response.status_code}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("Health Endpoint", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ # Test key endpoint
|
|
|
+ try:
|
|
|
+ import requests
|
|
|
+ response = requests.get(
|
|
|
+ f"{base_url}{self.config.key_route}",
|
|
|
+ timeout=30
|
|
|
+ )
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ data = response.json()
|
|
|
+ if data.get('success') and 'key_part' in data:
|
|
|
+ self.log_test("Key Endpoint", True, f"Key retrieved successfully")
|
|
|
+ else:
|
|
|
+ self.log_test("Key Endpoint", False, f"Invalid response: {data}")
|
|
|
+ success = False
|
|
|
+ else:
|
|
|
+ self.log_test("Key Endpoint", False, f"HTTP {response.status_code}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("Key Endpoint", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ # Test 404 handling
|
|
|
+ try:
|
|
|
+ import requests
|
|
|
+ response = requests.get(f"{base_url}/nonexistent-path", timeout=10)
|
|
|
+ if response.status_code == 404:
|
|
|
+ self.log_test("404 Handling", True, "Correctly returns 404 for invalid paths")
|
|
|
+ else:
|
|
|
+ self.log_test("404 Handling", False, f"Expected 404, got {response.status_code}")
|
|
|
+ success = False
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("404 Handling", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ return success
|
|
|
+
|
|
|
+ def test_fail_safe_behavior(self) -> bool:
|
|
|
+ """Test fail-safe behavior with invalid backends"""
|
|
|
+ success = True
|
|
|
+
|
|
|
+ # Create temporary config with invalid backends
|
|
|
+ try:
|
|
|
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
|
|
|
+ invalid_config = {
|
|
|
+ "server": {
|
|
|
+ "host": self.config.server_host,
|
|
|
+ "port": self.config.server_port + 1 # Different port
|
|
|
+ },
|
|
|
+ "routes": {
|
|
|
+ "key_route": "/test-key",
|
|
|
+ "health_route": "/test-health"
|
|
|
+ },
|
|
|
+ "files": {
|
|
|
+ "key_file": self.config.key_file_path,
|
|
|
+ "dummy_file": self.config.dummy_file_path
|
|
|
+ },
|
|
|
+ "notifications": {
|
|
|
+ "key_backends": ["invalid_backend_test"],
|
|
|
+ "health_backends": ["invalid_backend_test"],
|
|
|
+ "key_message": "Test message",
|
|
|
+ "health_message": "Test health message"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ json.dump(invalid_config, f)
|
|
|
+ temp_config_path = f.name
|
|
|
+
|
|
|
+ # Test with invalid config
|
|
|
+ test_config = Config(temp_config_path)
|
|
|
+
|
|
|
+ # Start server with invalid config
|
|
|
+ cmd = [sys.executable, "main.py"]
|
|
|
+ env = os.environ.copy()
|
|
|
+ env['EMERGENCY_CONFIG'] = temp_config_path
|
|
|
+
|
|
|
+ test_process = subprocess.Popen(
|
|
|
+ cmd,
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE,
|
|
|
+ env=env
|
|
|
+ )
|
|
|
+
|
|
|
+ time.sleep(3)
|
|
|
+
|
|
|
+ if test_process.poll() is None:
|
|
|
+ # Try to access endpoints - should fail due to notification failures
|
|
|
+ base_url = f"http://{test_config.server_host}:{test_config.server_port}"
|
|
|
+
|
|
|
+ try:
|
|
|
+ import requests
|
|
|
+ response = requests.get(f"{base_url}/test-key", timeout=15)
|
|
|
+ if response.status_code == 500:
|
|
|
+ self.log_test("Fail-Safe Key", True, "Correctly blocks access when notifications fail")
|
|
|
+ else:
|
|
|
+ self.log_test("Fail-Safe Key", False, f"Expected 500, got {response.status_code}")
|
|
|
+ success = False
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("Fail-Safe Key", False, f"Error testing fail-safe: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ # Clean up
|
|
|
+ test_process.terminate()
|
|
|
+ try:
|
|
|
+ test_process.wait(timeout=5)
|
|
|
+ except subprocess.TimeoutExpired:
|
|
|
+ test_process.kill()
|
|
|
+ else:
|
|
|
+ self.log_test("Fail-Safe Test", False, "Test server failed to start")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ # Clean up temp file
|
|
|
+ os.unlink(temp_config_path)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.log_test("Fail-Safe Test", False, f"Error: {str(e)}")
|
|
|
+ success = False
|
|
|
+
|
|
|
+ return success
|
|
|
+
|
|
|
+ def run_all_tests(self) -> bool:
|
|
|
+ """Run all tests and return overall success"""
|
|
|
+ print("=" * 60)
|
|
|
+ print("Emergency Access Server Test Suite")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ overall_success = True
|
|
|
+
|
|
|
+ # Configuration tests
|
|
|
+ print("\n--- Configuration Tests ---")
|
|
|
+ overall_success &= self.test_config_loading()
|
|
|
+
|
|
|
+ # File system tests
|
|
|
+ print("\n--- File System Tests ---")
|
|
|
+ overall_success &= self.test_file_access()
|
|
|
+
|
|
|
+ # Network tests
|
|
|
+ print("\n--- Network Tests ---")
|
|
|
+ overall_success &= self.test_ntfy_connectivity()
|
|
|
+
|
|
|
+ # Server tests
|
|
|
+ print("\n--- Server Tests ---")
|
|
|
+ if self.start_test_server():
|
|
|
+ time.sleep(2) # Give server time to fully start
|
|
|
+ overall_success &= self.test_endpoints()
|
|
|
+ self.stop_test_server()
|
|
|
+ else:
|
|
|
+ overall_success = False
|
|
|
+
|
|
|
+ # Fail-safe tests
|
|
|
+ print("\n--- Fail-Safe Tests ---")
|
|
|
+ overall_success &= self.test_fail_safe_behavior()
|
|
|
+
|
|
|
+ # Print summary
|
|
|
+ self.print_summary()
|
|
|
+
|
|
|
+ return overall_success
|
|
|
+
|
|
|
+ def print_summary(self):
|
|
|
+ """Print test summary"""
|
|
|
+ print("\n" + "=" * 60)
|
|
|
+ print("TEST SUMMARY")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ passed = sum(1 for r in self.test_results if r['status'] == 'PASS')
|
|
|
+ failed = sum(1 for r in self.test_results if r['status'] == 'FAIL')
|
|
|
+ total = len(self.test_results)
|
|
|
+
|
|
|
+ print(f"Total Tests: {total}")
|
|
|
+ print(f"Passed: {passed}")
|
|
|
+ print(f"Failed: {failed}")
|
|
|
+
|
|
|
+ if failed > 0:
|
|
|
+ print("\nFAILED TESTS:")
|
|
|
+ for result in self.test_results:
|
|
|
+ if result['status'] == 'FAIL':
|
|
|
+ print(f" - {result['test']}: {result['message']}")
|
|
|
+
|
|
|
+ overall_status = "PASS" if failed == 0 else "FAIL"
|
|
|
+ color = '\033[92m' if failed == 0 else '\033[91m'
|
|
|
+ reset = '\033[0m'
|
|
|
+ print(f"\n{color}Overall Status: {overall_status}{reset}")
|
|
|
+
|
|
|
+def main():
|
|
|
+ """Main function"""
|
|
|
+ import argparse
|
|
|
+
|
|
|
+ parser = argparse.ArgumentParser(description='Test Emergency Access Server')
|
|
|
+ parser.add_argument('--config', help='Configuration file path')
|
|
|
+ parser.add_argument('--quick', action='store_true', help='Run quick tests only (skip server startup)')
|
|
|
+
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ try:
|
|
|
+ tester = EmergencyAccessTester(args.config)
|
|
|
+
|
|
|
+ if args.quick:
|
|
|
+ # Quick tests only
|
|
|
+ success = True
|
|
|
+ success &= tester.test_config_loading()
|
|
|
+ success &= tester.test_file_access()
|
|
|
+ success &= tester.test_ntfy_connectivity()
|
|
|
+ tester.print_summary()
|
|
|
+ else:
|
|
|
+ # Full test suite
|
|
|
+ success = tester.run_all_tests()
|
|
|
+
|
|
|
+ sys.exit(0 if success else 1)
|
|
|
+
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ print("\nTest interrupted by user")
|
|
|
+ sys.exit(1)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Test suite failed: {str(e)}")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|