test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. #!/usr/bin/env python3
  2. """
  3. Test script for Emergency Access Server
  4. Validates system configuration, file access, notifications, and endpoints
  5. """
  6. import os
  7. import sys
  8. import json
  9. import time
  10. import yaml
  11. import tempfile
  12. import subprocess
  13. from typing import Dict, List, Tuple, Any
  14. from config import Config
  15. class EmergencyAccessTester:
  16. def __init__(self, config_path: str = None):
  17. """Initialize tester with configuration"""
  18. self.config = Config(config_path)
  19. self.test_results = []
  20. self.server_process = None
  21. def log_test(self, test_name: str, success: bool, message: str = ""):
  22. """Log test result"""
  23. status = "PASS" if success else "FAIL"
  24. result = {
  25. 'test': test_name,
  26. 'status': status,
  27. 'message': message,
  28. 'timestamp': time.time()
  29. }
  30. self.test_results.append(result)
  31. color = '\033[92m' if success else '\033[91m' # Green or Red
  32. reset = '\033[0m'
  33. print(f"{color}[{status}]{reset} {test_name}: {message}")
  34. return success
  35. def test_config_loading(self) -> bool:
  36. """Test configuration file loading"""
  37. try:
  38. # Test all required config properties
  39. self.config.server_host
  40. self.config.server_port
  41. self.config.key_route
  42. self.config.health_route
  43. self.config.key_file_path
  44. self.config.dummy_file_path
  45. self.config.ntfy_backends_key
  46. self.config.ntfy_backends_health
  47. return self.log_test("Config Loading", True, "All config properties accessible")
  48. except Exception as e:
  49. return self.log_test("Config Loading", False, f"Error: {str(e)}")
  50. def test_file_access(self) -> bool:
  51. """Test access to key and dummy files"""
  52. success = True
  53. # Test key file
  54. try:
  55. if not os.path.exists(self.config.key_file_path):
  56. self.log_test("Key File Exists", False, f"File not found: {self.config.key_file_path}")
  57. success = False
  58. else:
  59. with open(self.config.key_file_path, 'r') as f:
  60. content = f.read().strip()
  61. if not content:
  62. self.log_test("Key File Content", False, "Key file is empty")
  63. success = False
  64. else:
  65. self.log_test("Key File Access", True, f"Key file readable, length: {len(content)}")
  66. except Exception as e:
  67. self.log_test("Key File Access", False, f"Error: {str(e)}")
  68. success = False
  69. # Test dummy file
  70. try:
  71. if not os.path.exists(self.config.dummy_file_path):
  72. self.log_test("Dummy File Exists", False, f"File not found: {self.config.dummy_file_path}")
  73. success = False
  74. else:
  75. with open(self.config.dummy_file_path, 'r') as f:
  76. content = f.read().strip()
  77. if not content:
  78. self.log_test("Dummy File Content", False, "Dummy file is empty")
  79. success = False
  80. else:
  81. self.log_test("Dummy File Access", True, f"Dummy file readable, length: {len(content)}")
  82. except Exception as e:
  83. self.log_test("Dummy File Access", False, f"Error: {str(e)}")
  84. success = False
  85. return success
  86. def test_ntfy_connectivity(self) -> bool:
  87. """Test ntfy backend connectivity"""
  88. success = True
  89. # Test importing ntfy and loading config
  90. try:
  91. import ntfy
  92. self.log_test("ntfy Import", True, "dschep/ntfy package available")
  93. # Test loading the ntfy config file
  94. ntfy_config = ntfy.load_config(self.config.ntfy_config_path)
  95. self.log_test("ntfy Config Load", True, f"Config loaded from {self.config.ntfy_config_path}")
  96. except ImportError:
  97. self.log_test("ntfy Import", False, "dschep/ntfy package not installed")
  98. return False
  99. except Exception as e:
  100. self.log_test("ntfy Config Load", False, f"Failed to load config: {str(e)}")
  101. return False
  102. # Test key notification backends
  103. for backend in self.config.ntfy_backends_key:
  104. try:
  105. # Just verify the backend name is valid (assumes global ntfy config is set up)
  106. # We don't actually send notifications during testing
  107. if backend and isinstance(backend, str) and len(backend.strip()) > 0:
  108. self.log_test(f"Key Backend: {backend}", True, "Backend name valid (using global ntfy config)")
  109. else:
  110. self.log_test(f"Key Backend: {backend}", False, "Invalid backend name")
  111. success = False
  112. except Exception as e:
  113. self.log_test(f"Key Backend: {backend}", False, f"Error: {str(e)}")
  114. success = False
  115. # Test health notification backends
  116. for backend in self.config.ntfy_backends_health:
  117. try:
  118. # Similar validation for health backends
  119. if backend and isinstance(backend, str) and len(backend.strip()) > 0:
  120. self.log_test(f"Health Backend: {backend}", True, "Backend name valid (using global ntfy config)")
  121. else:
  122. self.log_test(f"Health Backend: {backend}", False, "Invalid backend name")
  123. success = False
  124. except Exception as e:
  125. self.log_test(f"Health Backend: {backend}", False, f"Error: {str(e)}")
  126. success = False
  127. return success
  128. def start_test_server(self) -> bool:
  129. """Start the server for endpoint testing"""
  130. try:
  131. import subprocess
  132. import time
  133. # Start server in background
  134. cmd = [sys.executable, "main.py"]
  135. env = os.environ.copy()
  136. env['EMERGENCY_CONFIG'] = self.config.config_path
  137. self.server_process = subprocess.Popen(
  138. cmd,
  139. stdout=subprocess.PIPE,
  140. stderr=subprocess.PIPE,
  141. env=env
  142. )
  143. # Wait for server to start
  144. time.sleep(3)
  145. # Check if server is running
  146. if self.server_process.poll() is None:
  147. return self.log_test("Server Start", True, "Server started successfully")
  148. else:
  149. stdout, stderr = self.server_process.communicate()
  150. error_msg = stderr.decode() if stderr else "Unknown error"
  151. return self.log_test("Server Start", False, f"Server failed to start: {error_msg}")
  152. except Exception as e:
  153. return self.log_test("Server Start", False, f"Error: {str(e)}")
  154. def stop_test_server(self):
  155. """Stop the test server"""
  156. if self.server_process and self.server_process.poll() is None:
  157. self.server_process.terminate()
  158. try:
  159. self.server_process.wait(timeout=5)
  160. except subprocess.TimeoutExpired:
  161. self.server_process.kill()
  162. self.server_process.wait()
  163. self.log_test("Server Stop", True, "Server stopped successfully")
  164. def test_endpoints(self) -> bool:
  165. """Test server endpoints"""
  166. success = True
  167. base_url = f"http://{self.config.server_host}:{self.config.server_port}"
  168. # Test health endpoint
  169. try:
  170. import requests
  171. response = requests.get(
  172. f"{base_url}{self.config.health_route}",
  173. timeout=30
  174. )
  175. if response.status_code == 200:
  176. data = response.json()
  177. if data.get('status') == 'ok':
  178. self.log_test("Health Endpoint", True, f"Response: {response.status_code}")
  179. else:
  180. self.log_test("Health Endpoint", False, f"Invalid response: {data}")
  181. success = False
  182. else:
  183. self.log_test("Health Endpoint", False, f"HTTP {response.status_code}")
  184. success = False
  185. except Exception as e:
  186. self.log_test("Health Endpoint", False, f"Error: {str(e)}")
  187. success = False
  188. # Test key endpoint
  189. try:
  190. import requests
  191. response = requests.get(
  192. f"{base_url}{self.config.key_route}",
  193. timeout=30
  194. )
  195. if response.status_code == 200:
  196. data = response.json()
  197. if data.get('success') and 'key_part' in data:
  198. self.log_test("Key Endpoint", True, f"Key retrieved successfully")
  199. else:
  200. self.log_test("Key Endpoint", False, f"Invalid response: {data}")
  201. success = False
  202. else:
  203. self.log_test("Key Endpoint", False, f"HTTP {response.status_code}")
  204. success = False
  205. except Exception as e:
  206. self.log_test("Key Endpoint", False, f"Error: {str(e)}")
  207. success = False
  208. # Test 404 handling
  209. try:
  210. import requests
  211. response = requests.get(f"{base_url}/nonexistent-path", timeout=10)
  212. if response.status_code == 404:
  213. self.log_test("404 Handling", True, "Correctly returns 404 for invalid paths")
  214. else:
  215. self.log_test("404 Handling", False, f"Expected 404, got {response.status_code}")
  216. success = False
  217. except Exception as e:
  218. self.log_test("404 Handling", False, f"Error: {str(e)}")
  219. success = False
  220. return success
  221. def test_fail_safe_behavior(self) -> bool:
  222. """Test fail-safe behavior with invalid backends"""
  223. success = True
  224. # Create temporary config with invalid backends
  225. try:
  226. with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
  227. invalid_config = {
  228. "server": {
  229. "host": self.config.server_host,
  230. "port": self.config.server_port + 1 # Different port
  231. },
  232. "routes": {
  233. "key_route": "/test-key",
  234. "health_route": "/test-health"
  235. },
  236. "files": {
  237. "key_file": self.config.key_file_path,
  238. "dummy_file": self.config.dummy_file_path
  239. },
  240. "notifications": {
  241. "key_backends": ["invalid_backend_test"],
  242. "health_backends": ["invalid_backend_test"],
  243. "key_message": "Test message",
  244. "health_message": "Test health message"
  245. }
  246. }
  247. json.dump(invalid_config, f)
  248. temp_config_path = f.name
  249. # Test with invalid config
  250. test_config = Config(temp_config_path)
  251. # Start server with invalid config
  252. cmd = [sys.executable, "main.py"]
  253. env = os.environ.copy()
  254. env['EMERGENCY_CONFIG'] = temp_config_path
  255. test_process = subprocess.Popen(
  256. cmd,
  257. stdout=subprocess.PIPE,
  258. stderr=subprocess.PIPE,
  259. env=env
  260. )
  261. time.sleep(3)
  262. if test_process.poll() is None:
  263. # Try to access endpoints - should fail due to notification failures
  264. base_url = f"http://{test_config.server_host}:{test_config.server_port}"
  265. try:
  266. import requests
  267. response = requests.get(f"{base_url}/test-key", timeout=15)
  268. if response.status_code == 500:
  269. self.log_test("Fail-Safe Key", True, "Correctly blocks access when notifications fail")
  270. else:
  271. self.log_test("Fail-Safe Key", False, f"Expected 500, got {response.status_code}")
  272. success = False
  273. except Exception as e:
  274. self.log_test("Fail-Safe Key", False, f"Error testing fail-safe: {str(e)}")
  275. success = False
  276. # Clean up
  277. test_process.terminate()
  278. try:
  279. test_process.wait(timeout=5)
  280. except subprocess.TimeoutExpired:
  281. test_process.kill()
  282. else:
  283. self.log_test("Fail-Safe Test", False, "Test server failed to start")
  284. success = False
  285. # Clean up temp file
  286. os.unlink(temp_config_path)
  287. except Exception as e:
  288. self.log_test("Fail-Safe Test", False, f"Error: {str(e)}")
  289. success = False
  290. return success
  291. def run_all_tests(self) -> bool:
  292. """Run all tests and return overall success"""
  293. print("=" * 60)
  294. print("Emergency Access Server Test Suite")
  295. print("=" * 60)
  296. overall_success = True
  297. # Configuration tests
  298. print("\n--- Configuration Tests ---")
  299. overall_success &= self.test_config_loading()
  300. # File system tests
  301. print("\n--- File System Tests ---")
  302. overall_success &= self.test_file_access()
  303. # Network tests
  304. print("\n--- Network Tests ---")
  305. overall_success &= self.test_ntfy_connectivity()
  306. # Server tests
  307. print("\n--- Server Tests ---")
  308. if self.start_test_server():
  309. time.sleep(2) # Give server time to fully start
  310. overall_success &= self.test_endpoints()
  311. self.stop_test_server()
  312. else:
  313. overall_success = False
  314. # Fail-safe tests
  315. print("\n--- Fail-Safe Tests ---")
  316. overall_success &= self.test_fail_safe_behavior()
  317. # Print summary
  318. self.print_summary()
  319. return overall_success
  320. def print_summary(self):
  321. """Print test summary"""
  322. print("\n" + "=" * 60)
  323. print("TEST SUMMARY")
  324. print("=" * 60)
  325. passed = sum(1 for r in self.test_results if r['status'] == 'PASS')
  326. failed = sum(1 for r in self.test_results if r['status'] == 'FAIL')
  327. total = len(self.test_results)
  328. print(f"Total Tests: {total}")
  329. print(f"Passed: {passed}")
  330. print(f"Failed: {failed}")
  331. if failed > 0:
  332. print("\nFAILED TESTS:")
  333. for result in self.test_results:
  334. if result['status'] == 'FAIL':
  335. print(f" - {result['test']}: {result['message']}")
  336. overall_status = "PASS" if failed == 0 else "FAIL"
  337. color = '\033[92m' if failed == 0 else '\033[91m'
  338. reset = '\033[0m'
  339. print(f"\n{color}Overall Status: {overall_status}{reset}")
  340. def main():
  341. """Main function"""
  342. import argparse
  343. parser = argparse.ArgumentParser(description='Test Emergency Access Server')
  344. parser.add_argument('--config', help='Configuration file path')
  345. parser.add_argument('--quick', action='store_true', help='Run quick tests only (skip server startup)')
  346. args = parser.parse_args()
  347. try:
  348. tester = EmergencyAccessTester(args.config)
  349. if args.quick:
  350. # Quick tests only
  351. success = True
  352. success &= tester.test_config_loading()
  353. success &= tester.test_file_access()
  354. success &= tester.test_ntfy_connectivity()
  355. tester.print_summary()
  356. else:
  357. # Full test suite
  358. success = tester.run_all_tests()
  359. sys.exit(0 if success else 1)
  360. except KeyboardInterrupt:
  361. print("\nTest interrupted by user")
  362. sys.exit(1)
  363. except Exception as e:
  364. print(f"Test suite failed: {str(e)}")
  365. sys.exit(1)
  366. if __name__ == '__main__':
  367. main()