| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- #!/usr/bin/env python3
- """
- Test script for Emergency Access Server
- Validates system configuration, file access, notifications, and endpoints
- """
- import os
- import sys
- import json
- import time
- import yaml
- import tempfile
- import subprocess
- from typing import Dict, List, Tuple, Any
- from config import Config
- class EmergencyAccessTester:
- def __init__(self, config_path: str = None):
- """Initialize tester with configuration"""
- self.config = Config(config_path)
- self.test_results = []
- self.server_process = None
- def log_test(self, test_name: str, success: bool, message: str = ""):
- """Log test result"""
- status = "PASS" if success else "FAIL"
- result = {
- 'test': test_name,
- 'status': status,
- 'message': message,
- 'timestamp': time.time()
- }
- self.test_results.append(result)
- color = '\033[92m' if success else '\033[91m' # Green or Red
- reset = '\033[0m'
- print(f"{color}[{status}]{reset} {test_name}: {message}")
- return success
- def test_config_loading(self) -> bool:
- """Test configuration file loading"""
- try:
- # Test all required config properties
- self.config.server_host
- self.config.server_port
- self.config.key_route
- self.config.health_route
- self.config.key_file_path
- self.config.dummy_file_path
- self.config.ntfy_backends_key
- self.config.ntfy_backends_health
- return self.log_test("Config Loading", True, "All config properties accessible")
- except Exception as e:
- return self.log_test("Config Loading", False, f"Error: {str(e)}")
- def test_file_access(self) -> bool:
- """Test access to key and dummy files"""
- success = True
- # Test key file
- try:
- if not os.path.exists(self.config.key_file_path):
- self.log_test("Key File Exists", False, f"File not found: {self.config.key_file_path}")
- success = False
- else:
- with open(self.config.key_file_path, 'r') as f:
- content = f.read().strip()
- if not content:
- self.log_test("Key File Content", False, "Key file is empty")
- success = False
- else:
- self.log_test("Key File Access", True, f"Key file readable, length: {len(content)}")
- except Exception as e:
- self.log_test("Key File Access", False, f"Error: {str(e)}")
- success = False
- # Test dummy file
- try:
- if not os.path.exists(self.config.dummy_file_path):
- self.log_test("Dummy File Exists", False, f"File not found: {self.config.dummy_file_path}")
- success = False
- else:
- with open(self.config.dummy_file_path, 'r') as f:
- content = f.read().strip()
- if not content:
- self.log_test("Dummy File Content", False, "Dummy file is empty")
- success = False
- else:
- self.log_test("Dummy File Access", True, f"Dummy file readable, length: {len(content)}")
- except Exception as e:
- self.log_test("Dummy File Access", False, f"Error: {str(e)}")
- success = False
- return success
- def test_ntfy_connectivity(self) -> bool:
- """Test ntfy backend connectivity"""
- success = True
- # Test importing ntfy and loading config
- try:
- import ntfy
- self.log_test("ntfy Import", True, "dschep/ntfy package available")
- # Test loading the ntfy config file
- ntfy_config = ntfy.load_config(self.config.ntfy_config_path)
- self.log_test("ntfy Config Load", True, f"Config loaded from {self.config.ntfy_config_path}")
- except ImportError:
- self.log_test("ntfy Import", False, "dschep/ntfy package not installed")
- return False
- except Exception as e:
- self.log_test("ntfy Config Load", False, f"Failed to load config: {str(e)}")
- return False
- # Test key notification backends
- for backend in self.config.ntfy_backends_key:
- try:
- # Just verify the backend name is valid (assumes global ntfy config is set up)
- # We don't actually send notifications during testing
- if backend and isinstance(backend, str) and len(backend.strip()) > 0:
- self.log_test(f"Key Backend: {backend}", True, "Backend name valid (using global ntfy config)")
- else:
- self.log_test(f"Key Backend: {backend}", False, "Invalid backend name")
- success = False
- except Exception as e:
- self.log_test(f"Key Backend: {backend}", False, f"Error: {str(e)}")
- success = False
- # Test health notification backends
- for backend in self.config.ntfy_backends_health:
- try:
- # Similar validation for health backends
- if backend and isinstance(backend, str) and len(backend.strip()) > 0:
- self.log_test(f"Health Backend: {backend}", True, "Backend name valid (using global ntfy config)")
- else:
- self.log_test(f"Health Backend: {backend}", False, "Invalid backend name")
- success = False
- except Exception as e:
- self.log_test(f"Health Backend: {backend}", False, f"Error: {str(e)}")
- success = False
- return success
- def start_test_server(self) -> bool:
- """Start the server for endpoint testing"""
- try:
- import subprocess
- import time
- # Start server in background
- cmd = [sys.executable, "main.py"]
- env = os.environ.copy()
- env['EMERGENCY_CONFIG'] = self.config.config_path
- self.server_process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env
- )
- # Wait for server to start
- time.sleep(3)
- # Check if server is running
- if self.server_process.poll() is None:
- return self.log_test("Server Start", True, "Server started successfully")
- else:
- stdout, stderr = self.server_process.communicate()
- error_msg = stderr.decode() if stderr else "Unknown error"
- return self.log_test("Server Start", False, f"Server failed to start: {error_msg}")
- except Exception as e:
- return self.log_test("Server Start", False, f"Error: {str(e)}")
- def stop_test_server(self):
- """Stop the test server"""
- if self.server_process and self.server_process.poll() is None:
- self.server_process.terminate()
- try:
- self.server_process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- self.server_process.kill()
- self.server_process.wait()
- self.log_test("Server Stop", True, "Server stopped successfully")
- def test_endpoints(self) -> bool:
- """Test server endpoints"""
- success = True
- base_url = f"http://{self.config.server_host}:{self.config.server_port}"
- # Test health endpoint
- try:
- import requests
- response = requests.get(
- f"{base_url}{self.config.health_route}",
- timeout=30
- )
- if response.status_code == 200:
- data = response.json()
- if data.get('status') == 'ok':
- self.log_test("Health Endpoint", True, f"Response: {response.status_code}")
- else:
- self.log_test("Health Endpoint", False, f"Invalid response: {data}")
- success = False
- else:
- self.log_test("Health Endpoint", False, f"HTTP {response.status_code}")
- success = False
- except Exception as e:
- self.log_test("Health Endpoint", False, f"Error: {str(e)}")
- success = False
- # Test key endpoint
- try:
- import requests
- response = requests.get(
- f"{base_url}{self.config.key_route}",
- timeout=30
- )
- if response.status_code == 200:
- data = response.json()
- if data.get('success') and 'key_part' in data:
- self.log_test("Key Endpoint", True, f"Key retrieved successfully")
- else:
- self.log_test("Key Endpoint", False, f"Invalid response: {data}")
- success = False
- else:
- self.log_test("Key Endpoint", False, f"HTTP {response.status_code}")
- success = False
- except Exception as e:
- self.log_test("Key Endpoint", False, f"Error: {str(e)}")
- success = False
- # Test 404 handling
- try:
- import requests
- response = requests.get(f"{base_url}/nonexistent-path", timeout=10)
- if response.status_code == 404:
- self.log_test("404 Handling", True, "Correctly returns 404 for invalid paths")
- else:
- self.log_test("404 Handling", False, f"Expected 404, got {response.status_code}")
- success = False
- except Exception as e:
- self.log_test("404 Handling", False, f"Error: {str(e)}")
- success = False
- return success
- def test_fail_safe_behavior(self) -> bool:
- """Test fail-safe behavior with invalid backends"""
- success = True
- # Create temporary config with invalid backends
- try:
- with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
- invalid_config = {
- "server": {
- "host": self.config.server_host,
- "port": self.config.server_port + 1 # Different port
- },
- "routes": {
- "key_route": "/test-key",
- "health_route": "/test-health"
- },
- "files": {
- "key_file": self.config.key_file_path,
- "dummy_file": self.config.dummy_file_path
- },
- "notifications": {
- "key_backends": ["invalid_backend_test"],
- "health_backends": ["invalid_backend_test"],
- "key_message": "Test message",
- "health_message": "Test health message"
- }
- }
- json.dump(invalid_config, f)
- temp_config_path = f.name
- # Test with invalid config
- test_config = Config(temp_config_path)
- # Start server with invalid config
- cmd = [sys.executable, "main.py"]
- env = os.environ.copy()
- env['EMERGENCY_CONFIG'] = temp_config_path
- test_process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env
- )
- time.sleep(3)
- if test_process.poll() is None:
- # Try to access endpoints - should fail due to notification failures
- base_url = f"http://{test_config.server_host}:{test_config.server_port}"
- try:
- import requests
- response = requests.get(f"{base_url}/test-key", timeout=15)
- if response.status_code == 500:
- self.log_test("Fail-Safe Key", True, "Correctly blocks access when notifications fail")
- else:
- self.log_test("Fail-Safe Key", False, f"Expected 500, got {response.status_code}")
- success = False
- except Exception as e:
- self.log_test("Fail-Safe Key", False, f"Error testing fail-safe: {str(e)}")
- success = False
- # Clean up
- test_process.terminate()
- try:
- test_process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- test_process.kill()
- else:
- self.log_test("Fail-Safe Test", False, "Test server failed to start")
- success = False
- # Clean up temp file
- os.unlink(temp_config_path)
- except Exception as e:
- self.log_test("Fail-Safe Test", False, f"Error: {str(e)}")
- success = False
- return success
- def run_all_tests(self) -> bool:
- """Run all tests and return overall success"""
- print("=" * 60)
- print("Emergency Access Server Test Suite")
- print("=" * 60)
- overall_success = True
- # Configuration tests
- print("\n--- Configuration Tests ---")
- overall_success &= self.test_config_loading()
- # File system tests
- print("\n--- File System Tests ---")
- overall_success &= self.test_file_access()
- # Network tests
- print("\n--- Network Tests ---")
- overall_success &= self.test_ntfy_connectivity()
- # Server tests
- print("\n--- Server Tests ---")
- if self.start_test_server():
- time.sleep(2) # Give server time to fully start
- overall_success &= self.test_endpoints()
- self.stop_test_server()
- else:
- overall_success = False
- # Fail-safe tests
- print("\n--- Fail-Safe Tests ---")
- overall_success &= self.test_fail_safe_behavior()
- # Print summary
- self.print_summary()
- return overall_success
- def print_summary(self):
- """Print test summary"""
- print("\n" + "=" * 60)
- print("TEST SUMMARY")
- print("=" * 60)
- passed = sum(1 for r in self.test_results if r['status'] == 'PASS')
- failed = sum(1 for r in self.test_results if r['status'] == 'FAIL')
- total = len(self.test_results)
- print(f"Total Tests: {total}")
- print(f"Passed: {passed}")
- print(f"Failed: {failed}")
- if failed > 0:
- print("\nFAILED TESTS:")
- for result in self.test_results:
- if result['status'] == 'FAIL':
- print(f" - {result['test']}: {result['message']}")
- overall_status = "PASS" if failed == 0 else "FAIL"
- color = '\033[92m' if failed == 0 else '\033[91m'
- reset = '\033[0m'
- print(f"\n{color}Overall Status: {overall_status}{reset}")
- def main():
- """Main function"""
- import argparse
- parser = argparse.ArgumentParser(description='Test Emergency Access Server')
- parser.add_argument('--config', help='Configuration file path')
- parser.add_argument('--quick', action='store_true', help='Run quick tests only (skip server startup)')
- args = parser.parse_args()
- try:
- tester = EmergencyAccessTester(args.config)
- if args.quick:
- # Quick tests only
- success = True
- success &= tester.test_config_loading()
- success &= tester.test_file_access()
- success &= tester.test_ntfy_connectivity()
- tester.print_summary()
- else:
- # Full test suite
- success = tester.run_all_tests()
- sys.exit(0 if success else 1)
- except KeyboardInterrupt:
- print("\nTest interrupted by user")
- sys.exit(1)
- except Exception as e:
- print(f"Test suite failed: {str(e)}")
- sys.exit(1)
- if __name__ == '__main__':
- main()
|