#!/usr/bin/env python3 import os import sys import json import tempfile import shutil from pathlib import Path # Add the current directory to Python path to import our modules sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import Config, KeyConfig def test_multikey_config(): """Test multi-key configuration loading""" print("Testing multi-key configuration...") # Create temporary config file config_data = { "server": { "host": "127.0.0.1", "port": 1127 }, "routes": { "health_route": "/health-check-test" }, "files": { "dummy_file": "/tmp/dummy.txt" }, "keys": { "backup_key": { "route": "/emergency-key-backup", "file": "/tmp/backup-key.txt", "username": "emergency_backup", "password_hash": "salt123:hash123", "backends": ["test_backend1", "test_backend2"], "message": "Backup key accessed" }, "master_key": { "route": "/emergency-key-master", "file": "/tmp/master-key.txt", "username": "emergency_master", "password_hash": "salt456:hash456", "backends": ["test_backend1", "test_backend3"], "message": "Master key accessed" } }, "notifications": { "health_backends": ["health_backend"], "config_path": "/tmp/ntfy.yml", "health_message": "Health check", "log_level": "WARNING", "send_all_logs": true } } # Write config to temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config_data, f) config_file = f.name try: # Load config config = Config(config_file) # Test basic properties assert config.server_host == "127.0.0.1" assert config.server_port == 1127 assert config.health_route == "/health-check-test" # Test keys keys = config.keys assert len(keys) == 2 assert "backup_key" in keys assert "master_key" in keys # Test backup key backup_key = keys["backup_key"] assert backup_key.route == "/emergency-key-backup" assert backup_key.username == "emergency_backup" assert backup_key.password_hash == "salt123:hash123" assert backup_key.file_path == "/tmp/backup-key.txt" assert backup_key.backends == ["test_backend1", "test_backend2"] assert backup_key.message == "Backup key accessed" # Test master key master_key = keys["master_key"] assert master_key.route == "/emergency-key-master" assert master_key.username == "emergency_master" assert master_key.password_hash == "salt456:hash456" assert master_key.file_path == "/tmp/master-key.txt" assert master_key.backends == ["test_backend1", "test_backend3"] assert master_key.message == "Master key accessed" # Test key lookup methods found_key = config.get_key_by_route("/emergency-key-backup") assert found_key is not None assert found_key.key_id == "backup_key" found_key = config.get_key_by_id("master_key") assert found_key is not None assert found_key.route == "/emergency-key-master" print("✅ Multi-key configuration test passed") finally: os.unlink(config_file) def test_invalid_config(): """Test invalid configuration handling""" print("Testing invalid configuration handling...") # Test config without keys section config_data = { "server": { "host": "127.0.0.1", "port": 1127 }, "routes": { "health_route": "/health-check-test" }, "files": { "dummy_file": "/tmp/dummy.txt" }, "notifications": { "health_backends": ["health_backend"], "config_path": "/tmp/ntfy.yml", "health_message": "Health check", "log_level": "WARNING", "send_all_logs": true } } # Write config to temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config_data, f) config_file = f.name try: # Should raise exception for missing keys try: config = Config(config_file) assert False, "Should have raised exception for missing keys section" except Exception as e: assert "No keys configured" in str(e) print("✅ Invalid configuration test passed") finally: os.unlink(config_file) def test_key_config_validation(): """Test key configuration validation""" print("Testing key configuration validation...") # Test missing file try: KeyConfig("test", {"username": "test", "password_hash": "salt:hash", "backends": ["test"]}, {}) assert False, "Should have raised exception for missing file" except Exception as e: assert "File path not configured" in str(e) # Test missing password hash try: KeyConfig("test", {"file": "/tmp/test.txt", "backends": ["test"]}, {}) assert False, "Should have raised exception for missing password hash" except Exception as e: assert "Password hash not configured" in str(e) # Test missing backends try: KeyConfig("test", {"file": "/tmp/test.txt", "password_hash": "salt:hash"}, {}) assert False, "Should have raised exception for missing backends" except Exception as e: assert "No notification backends configured" in str(e) # Test valid config key_config = KeyConfig( "test", { "route": "/test", "file": "/tmp/test.txt", "username": "test_user", "password_hash": "salt:hash", "backends": ["backend1"], "message": "Test message" }, {} ) assert key_config.key_id == "test" assert key_config.route == "/test" assert key_config.file_path == "/tmp/test.txt" assert key_config.username == "test_user" assert key_config.password_hash == "salt:hash" assert key_config.backends == ["backend1"] assert key_config.message == "Test message" print("✅ Key configuration validation test passed") def test_password_functions(): """Test password generation and verification functions""" print("Testing password generation and verification...") from config import Config # Test password hash generation password = "test_password_123" password_hash = Config.generate_password_hash(password) # Hash should be in format "salt:hash" assert ':' in password_hash salt, hash_part = password_hash.split(':') assert len(salt) == 32 # 16 bytes hex = 32 chars assert len(hash_part) == 64 # 32 bytes hex = 64 chars # Test password verification - correct password assert Config.verify_password(password, password_hash) == True # Test password verification - incorrect password assert Config.verify_password("wrong_password", password_hash) == False # Test password verification - malformed hash assert Config.verify_password(password, "invalid_hash") == False assert Config.verify_password(password, "no:colon:format:hash") == False print("✅ Password generation and verification test passed") def test_app_integration(): """Test Flask app integration with multiple keys""" print("Testing Flask app integration...") # Create test files temp_dir = tempfile.mkdtemp() try: # Create test key files backup_key_file = os.path.join(temp_dir, "backup-key.txt") master_key_file = os.path.join(temp_dir, "master-key.txt") dummy_file = os.path.join(temp_dir, "dummy.txt") with open(backup_key_file, 'w') as f: f.write("backup_key_content_123") with open(master_key_file, 'w') as f: f.write("master_key_content_456") with open(dummy_file, 'w') as f: f.write("system_healthy") # Create config config_data = { "server": { "host": "127.0.0.1", "port": 1127 }, "routes": { "health_route": "/health-check-test" }, "files": { "dummy_file": dummy_file }, "keys": { "backup_key": { "route": "/emergency-key-backup", "file": backup_key_file, "username": "emergency_backup", "password_hash": "salt123:hash123", "backends": ["test_backend1"], "message": "Backup key accessed" }, "master_key": { "route": "/emergency-key-master", "file": master_key_file, "username": "emergency_master", "password_hash": "salt456:hash456", "backends": ["test_backend2"], "message": "Master key accessed" } }, "notifications": { "health_backends": ["health_backend"], "config_path": "/tmp/ntfy.yml", "health_message": "Health check", "log_level": "WARNING", "send_all_logs": false } } # Write config with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config_data, f) config_file = f.name try: # Set environment variable os.environ['EMERGENCY_CONFIG'] = config_file # Import main module (this tests configuration loading) import main # Test config loading main.config = Config(config_file) # Test file reading success, content = main.read_file_safely(backup_key_file) assert success == True assert content == "backup_key_content_123" success, content = main.read_file_safely(master_key_file) assert success == True assert content == "master_key_content_456" # Test key handler creation backup_key_config = main.config.get_key_by_id("backup_key") handler = main.create_key_handler(backup_key_config) assert handler is not None print("✅ Flask app integration test passed") finally: os.unlink(config_file) if 'EMERGENCY_CONFIG' in os.environ: del os.environ['EMERGENCY_CONFIG'] finally: shutil.rmtree(temp_dir) def main(): """Run all tests""" print("Running multi-key functionality tests...\n") try: test_multikey_config() test_invalid_config() test_key_config_validation() test_password_functions() test_app_integration() print("\n🎉 All tests passed! Multi-key functionality is working correctly.") return True except AssertionError as e: print(f"\n❌ Test failed: {e}") return False except Exception as e: print(f"\n❌ Unexpected error: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": success = main() sys.exit(0 if success else 1)