test.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import json
  5. import tempfile
  6. import shutil
  7. from pathlib import Path
  8. # Add the current directory to Python path to import our modules
  9. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  10. from config import Config, KeyConfig
  11. def test_multikey_config():
  12. """Test multi-key configuration loading"""
  13. print("Testing multi-key configuration...")
  14. # Create temporary config file
  15. config_data = {
  16. "server": {
  17. "host": "127.0.0.1",
  18. "port": 1127
  19. },
  20. "routes": {
  21. "health_route": "/health-check-test"
  22. },
  23. "files": {
  24. "dummy_file": "/tmp/dummy.txt"
  25. },
  26. "keys": {
  27. "backup_key": {
  28. "route": "/emergency-key-backup",
  29. "file": "/tmp/backup-key.txt",
  30. "username": "emergency_backup",
  31. "password_hash": "salt123:hash123",
  32. "backends": ["test_backend1", "test_backend2"],
  33. "message": "Backup key accessed"
  34. },
  35. "master_key": {
  36. "route": "/emergency-key-master",
  37. "file": "/tmp/master-key.txt",
  38. "username": "emergency_master",
  39. "password_hash": "salt456:hash456",
  40. "backends": ["test_backend1", "test_backend3"],
  41. "message": "Master key accessed"
  42. }
  43. },
  44. "notifications": {
  45. "health_backends": ["health_backend"],
  46. "config_path": "/tmp/ntfy.yml",
  47. "health_message": "Health check",
  48. "log_level": "WARNING",
  49. "send_all_logs": true
  50. }
  51. }
  52. # Write config to temporary file
  53. with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
  54. json.dump(config_data, f)
  55. config_file = f.name
  56. try:
  57. # Load config
  58. config = Config(config_file)
  59. # Test basic properties
  60. assert config.server_host == "127.0.0.1"
  61. assert config.server_port == 1127
  62. assert config.health_route == "/health-check-test"
  63. # Test keys
  64. keys = config.keys
  65. assert len(keys) == 2
  66. assert "backup_key" in keys
  67. assert "master_key" in keys
  68. # Test backup key
  69. backup_key = keys["backup_key"]
  70. assert backup_key.route == "/emergency-key-backup"
  71. assert backup_key.username == "emergency_backup"
  72. assert backup_key.password_hash == "salt123:hash123"
  73. assert backup_key.file_path == "/tmp/backup-key.txt"
  74. assert backup_key.backends == ["test_backend1", "test_backend2"]
  75. assert backup_key.message == "Backup key accessed"
  76. # Test master key
  77. master_key = keys["master_key"]
  78. assert master_key.route == "/emergency-key-master"
  79. assert master_key.username == "emergency_master"
  80. assert master_key.password_hash == "salt456:hash456"
  81. assert master_key.file_path == "/tmp/master-key.txt"
  82. assert master_key.backends == ["test_backend1", "test_backend3"]
  83. assert master_key.message == "Master key accessed"
  84. # Test key lookup methods
  85. found_key = config.get_key_by_route("/emergency-key-backup")
  86. assert found_key is not None
  87. assert found_key.key_id == "backup_key"
  88. found_key = config.get_key_by_id("master_key")
  89. assert found_key is not None
  90. assert found_key.route == "/emergency-key-master"
  91. print("✅ Multi-key configuration test passed")
  92. finally:
  93. os.unlink(config_file)
  94. def test_invalid_config():
  95. """Test invalid configuration handling"""
  96. print("Testing invalid configuration handling...")
  97. # Test config without keys section
  98. config_data = {
  99. "server": {
  100. "host": "127.0.0.1",
  101. "port": 1127
  102. },
  103. "routes": {
  104. "health_route": "/health-check-test"
  105. },
  106. "files": {
  107. "dummy_file": "/tmp/dummy.txt"
  108. },
  109. "notifications": {
  110. "health_backends": ["health_backend"],
  111. "config_path": "/tmp/ntfy.yml",
  112. "health_message": "Health check",
  113. "log_level": "WARNING",
  114. "send_all_logs": true
  115. }
  116. }
  117. # Write config to temporary file
  118. with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
  119. json.dump(config_data, f)
  120. config_file = f.name
  121. try:
  122. # Should raise exception for missing keys
  123. try:
  124. config = Config(config_file)
  125. assert False, "Should have raised exception for missing keys section"
  126. except Exception as e:
  127. assert "No keys configured" in str(e)
  128. print("✅ Invalid configuration test passed")
  129. finally:
  130. os.unlink(config_file)
  131. def test_key_config_validation():
  132. """Test key configuration validation"""
  133. print("Testing key configuration validation...")
  134. # Test missing file
  135. try:
  136. KeyConfig("test", {"username": "test", "password_hash": "salt:hash", "backends": ["test"]}, {})
  137. assert False, "Should have raised exception for missing file"
  138. except Exception as e:
  139. assert "File path not configured" in str(e)
  140. # Test missing password hash
  141. try:
  142. KeyConfig("test", {"file": "/tmp/test.txt", "backends": ["test"]}, {})
  143. assert False, "Should have raised exception for missing password hash"
  144. except Exception as e:
  145. assert "Password hash not configured" in str(e)
  146. # Test missing backends
  147. try:
  148. KeyConfig("test", {"file": "/tmp/test.txt", "password_hash": "salt:hash"}, {})
  149. assert False, "Should have raised exception for missing backends"
  150. except Exception as e:
  151. assert "No notification backends configured" in str(e)
  152. # Test valid config
  153. key_config = KeyConfig(
  154. "test",
  155. {
  156. "route": "/test",
  157. "file": "/tmp/test.txt",
  158. "username": "test_user",
  159. "password_hash": "salt:hash",
  160. "backends": ["backend1"],
  161. "message": "Test message"
  162. },
  163. {}
  164. )
  165. assert key_config.key_id == "test"
  166. assert key_config.route == "/test"
  167. assert key_config.file_path == "/tmp/test.txt"
  168. assert key_config.username == "test_user"
  169. assert key_config.password_hash == "salt:hash"
  170. assert key_config.backends == ["backend1"]
  171. assert key_config.message == "Test message"
  172. print("✅ Key configuration validation test passed")
  173. def test_password_functions():
  174. """Test password generation and verification functions"""
  175. print("Testing password generation and verification...")
  176. from config import Config
  177. # Test password hash generation
  178. password = "test_password_123"
  179. password_hash = Config.generate_password_hash(password)
  180. # Hash should be in format "salt:hash"
  181. assert ':' in password_hash
  182. salt, hash_part = password_hash.split(':')
  183. assert len(salt) == 32 # 16 bytes hex = 32 chars
  184. assert len(hash_part) == 64 # 32 bytes hex = 64 chars
  185. # Test password verification - correct password
  186. assert Config.verify_password(password, password_hash) == True
  187. # Test password verification - incorrect password
  188. assert Config.verify_password("wrong_password", password_hash) == False
  189. # Test password verification - malformed hash
  190. assert Config.verify_password(password, "invalid_hash") == False
  191. assert Config.verify_password(password, "no:colon:format:hash") == False
  192. print("✅ Password generation and verification test passed")
  193. def test_app_integration():
  194. """Test Flask app integration with multiple keys"""
  195. print("Testing Flask app integration...")
  196. # Create test files
  197. temp_dir = tempfile.mkdtemp()
  198. try:
  199. # Create test key files
  200. backup_key_file = os.path.join(temp_dir, "backup-key.txt")
  201. master_key_file = os.path.join(temp_dir, "master-key.txt")
  202. dummy_file = os.path.join(temp_dir, "dummy.txt")
  203. with open(backup_key_file, 'w') as f:
  204. f.write("backup_key_content_123")
  205. with open(master_key_file, 'w') as f:
  206. f.write("master_key_content_456")
  207. with open(dummy_file, 'w') as f:
  208. f.write("system_healthy")
  209. # Create config
  210. config_data = {
  211. "server": {
  212. "host": "127.0.0.1",
  213. "port": 1127
  214. },
  215. "routes": {
  216. "health_route": "/health-check-test"
  217. },
  218. "files": {
  219. "dummy_file": dummy_file
  220. },
  221. "keys": {
  222. "backup_key": {
  223. "route": "/emergency-key-backup",
  224. "file": backup_key_file,
  225. "username": "emergency_backup",
  226. "password_hash": "salt123:hash123",
  227. "backends": ["test_backend1"],
  228. "message": "Backup key accessed"
  229. },
  230. "master_key": {
  231. "route": "/emergency-key-master",
  232. "file": master_key_file,
  233. "username": "emergency_master",
  234. "password_hash": "salt456:hash456",
  235. "backends": ["test_backend2"],
  236. "message": "Master key accessed"
  237. }
  238. },
  239. "notifications": {
  240. "health_backends": ["health_backend"],
  241. "config_path": "/tmp/ntfy.yml",
  242. "health_message": "Health check",
  243. "log_level": "WARNING",
  244. "send_all_logs": false
  245. }
  246. }
  247. # Write config
  248. with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
  249. json.dump(config_data, f)
  250. config_file = f.name
  251. try:
  252. # Set environment variable
  253. os.environ['EMERGENCY_CONFIG'] = config_file
  254. # Import main module (this tests configuration loading)
  255. import main
  256. # Test config loading
  257. main.config = Config(config_file)
  258. # Test file reading
  259. success, content = main.read_file_safely(backup_key_file)
  260. assert success == True
  261. assert content == "backup_key_content_123"
  262. success, content = main.read_file_safely(master_key_file)
  263. assert success == True
  264. assert content == "master_key_content_456"
  265. # Test key handler creation
  266. backup_key_config = main.config.get_key_by_id("backup_key")
  267. handler = main.create_key_handler(backup_key_config)
  268. assert handler is not None
  269. print("✅ Flask app integration test passed")
  270. finally:
  271. os.unlink(config_file)
  272. if 'EMERGENCY_CONFIG' in os.environ:
  273. del os.environ['EMERGENCY_CONFIG']
  274. finally:
  275. shutil.rmtree(temp_dir)
  276. def main():
  277. """Run all tests"""
  278. print("Running multi-key functionality tests...\n")
  279. try:
  280. test_multikey_config()
  281. test_invalid_config()
  282. test_key_config_validation()
  283. test_password_functions()
  284. test_app_integration()
  285. print("\n🎉 All tests passed! Multi-key functionality is working correctly.")
  286. return True
  287. except AssertionError as e:
  288. print(f"\n❌ Test failed: {e}")
  289. return False
  290. except Exception as e:
  291. print(f"\n❌ Unexpected error: {e}")
  292. import traceback
  293. traceback.print_exc()
  294. return False
  295. if __name__ == "__main__":
  296. success = main()
  297. sys.exit(0 if success else 1)