| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- #!/usr/bin/env python3
- import os
- import sys
- import json
- import tempfile
- import shutil
- from pathlib import Path
- # Add the current directory to Python path to import our modules
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from config import Config, KeyConfig
- def test_multikey_config():
- """Test multi-key configuration loading"""
- print("Testing multi-key configuration...")
- # Create temporary config file
- config_data = {
- "server": {
- "host": "127.0.0.1",
- "port": 1127
- },
- "routes": {
- "health_route": "/health-check-test"
- },
- "files": {
- "dummy_file": "/tmp/dummy.txt"
- },
- "keys": {
- "backup_key": {
- "route": "/emergency-key-backup",
- "file": "/tmp/backup-key.txt",
- "username": "emergency_backup",
- "password_hash": "salt123:hash123",
- "backends": ["test_backend1", "test_backend2"],
- "message": "Backup key accessed"
- },
- "master_key": {
- "route": "/emergency-key-master",
- "file": "/tmp/master-key.txt",
- "username": "emergency_master",
- "password_hash": "salt456:hash456",
- "backends": ["test_backend1", "test_backend3"],
- "message": "Master key accessed"
- }
- },
- "notifications": {
- "health_backends": ["health_backend"],
- "config_path": "/tmp/ntfy.yml",
- "health_message": "Health check",
- "log_level": "WARNING",
- "send_all_logs": true
- }
- }
- # Write config to temporary file
- with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
- json.dump(config_data, f)
- config_file = f.name
- try:
- # Load config
- config = Config(config_file)
- # Test basic properties
- assert config.server_host == "127.0.0.1"
- assert config.server_port == 1127
- assert config.health_route == "/health-check-test"
- # Test keys
- keys = config.keys
- assert len(keys) == 2
- assert "backup_key" in keys
- assert "master_key" in keys
- # Test backup key
- backup_key = keys["backup_key"]
- assert backup_key.route == "/emergency-key-backup"
- assert backup_key.username == "emergency_backup"
- assert backup_key.password_hash == "salt123:hash123"
- assert backup_key.file_path == "/tmp/backup-key.txt"
- assert backup_key.backends == ["test_backend1", "test_backend2"]
- assert backup_key.message == "Backup key accessed"
- # Test master key
- master_key = keys["master_key"]
- assert master_key.route == "/emergency-key-master"
- assert master_key.username == "emergency_master"
- assert master_key.password_hash == "salt456:hash456"
- assert master_key.file_path == "/tmp/master-key.txt"
- assert master_key.backends == ["test_backend1", "test_backend3"]
- assert master_key.message == "Master key accessed"
- # Test key lookup methods
- found_key = config.get_key_by_route("/emergency-key-backup")
- assert found_key is not None
- assert found_key.key_id == "backup_key"
- found_key = config.get_key_by_id("master_key")
- assert found_key is not None
- assert found_key.route == "/emergency-key-master"
- print("✅ Multi-key configuration test passed")
- finally:
- os.unlink(config_file)
- def test_invalid_config():
- """Test invalid configuration handling"""
- print("Testing invalid configuration handling...")
- # Test config without keys section
- config_data = {
- "server": {
- "host": "127.0.0.1",
- "port": 1127
- },
- "routes": {
- "health_route": "/health-check-test"
- },
- "files": {
- "dummy_file": "/tmp/dummy.txt"
- },
- "notifications": {
- "health_backends": ["health_backend"],
- "config_path": "/tmp/ntfy.yml",
- "health_message": "Health check",
- "log_level": "WARNING",
- "send_all_logs": true
- }
- }
- # Write config to temporary file
- with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
- json.dump(config_data, f)
- config_file = f.name
- try:
- # Should raise exception for missing keys
- try:
- config = Config(config_file)
- assert False, "Should have raised exception for missing keys section"
- except Exception as e:
- assert "No keys configured" in str(e)
- print("✅ Invalid configuration test passed")
- finally:
- os.unlink(config_file)
- def test_key_config_validation():
- """Test key configuration validation"""
- print("Testing key configuration validation...")
- # Test missing file
- try:
- KeyConfig("test", {"username": "test", "password_hash": "salt:hash", "backends": ["test"]}, {})
- assert False, "Should have raised exception for missing file"
- except Exception as e:
- assert "File path not configured" in str(e)
- # Test missing password hash
- try:
- KeyConfig("test", {"file": "/tmp/test.txt", "backends": ["test"]}, {})
- assert False, "Should have raised exception for missing password hash"
- except Exception as e:
- assert "Password hash not configured" in str(e)
- # Test missing backends
- try:
- KeyConfig("test", {"file": "/tmp/test.txt", "password_hash": "salt:hash"}, {})
- assert False, "Should have raised exception for missing backends"
- except Exception as e:
- assert "No notification backends configured" in str(e)
- # Test valid config
- key_config = KeyConfig(
- "test",
- {
- "route": "/test",
- "file": "/tmp/test.txt",
- "username": "test_user",
- "password_hash": "salt:hash",
- "backends": ["backend1"],
- "message": "Test message"
- },
- {}
- )
- assert key_config.key_id == "test"
- assert key_config.route == "/test"
- assert key_config.file_path == "/tmp/test.txt"
- assert key_config.username == "test_user"
- assert key_config.password_hash == "salt:hash"
- assert key_config.backends == ["backend1"]
- assert key_config.message == "Test message"
- print("✅ Key configuration validation test passed")
- def test_password_functions():
- """Test password generation and verification functions"""
- print("Testing password generation and verification...")
- from config import Config
- # Test password hash generation
- password = "test_password_123"
- password_hash = Config.generate_password_hash(password)
- # Hash should be in format "salt:hash"
- assert ':' in password_hash
- salt, hash_part = password_hash.split(':')
- assert len(salt) == 32 # 16 bytes hex = 32 chars
- assert len(hash_part) == 64 # 32 bytes hex = 64 chars
- # Test password verification - correct password
- assert Config.verify_password(password, password_hash) == True
- # Test password verification - incorrect password
- assert Config.verify_password("wrong_password", password_hash) == False
- # Test password verification - malformed hash
- assert Config.verify_password(password, "invalid_hash") == False
- assert Config.verify_password(password, "no:colon:format:hash") == False
- print("✅ Password generation and verification test passed")
- def test_app_integration():
- """Test Flask app integration with multiple keys"""
- print("Testing Flask app integration...")
- # Create test files
- temp_dir = tempfile.mkdtemp()
- try:
- # Create test key files
- backup_key_file = os.path.join(temp_dir, "backup-key.txt")
- master_key_file = os.path.join(temp_dir, "master-key.txt")
- dummy_file = os.path.join(temp_dir, "dummy.txt")
- with open(backup_key_file, 'w') as f:
- f.write("backup_key_content_123")
- with open(master_key_file, 'w') as f:
- f.write("master_key_content_456")
- with open(dummy_file, 'w') as f:
- f.write("system_healthy")
- # Create config
- config_data = {
- "server": {
- "host": "127.0.0.1",
- "port": 1127
- },
- "routes": {
- "health_route": "/health-check-test"
- },
- "files": {
- "dummy_file": dummy_file
- },
- "keys": {
- "backup_key": {
- "route": "/emergency-key-backup",
- "file": backup_key_file,
- "username": "emergency_backup",
- "password_hash": "salt123:hash123",
- "backends": ["test_backend1"],
- "message": "Backup key accessed"
- },
- "master_key": {
- "route": "/emergency-key-master",
- "file": master_key_file,
- "username": "emergency_master",
- "password_hash": "salt456:hash456",
- "backends": ["test_backend2"],
- "message": "Master key accessed"
- }
- },
- "notifications": {
- "health_backends": ["health_backend"],
- "config_path": "/tmp/ntfy.yml",
- "health_message": "Health check",
- "log_level": "WARNING",
- "send_all_logs": false
- }
- }
- # Write config
- with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
- json.dump(config_data, f)
- config_file = f.name
- try:
- # Set environment variable
- os.environ['EMERGENCY_CONFIG'] = config_file
- # Import main module (this tests configuration loading)
- import main
- # Test config loading
- main.config = Config(config_file)
- # Test file reading
- success, content = main.read_file_safely(backup_key_file)
- assert success == True
- assert content == "backup_key_content_123"
- success, content = main.read_file_safely(master_key_file)
- assert success == True
- assert content == "master_key_content_456"
- # Test key handler creation
- backup_key_config = main.config.get_key_by_id("backup_key")
- handler = main.create_key_handler(backup_key_config)
- assert handler is not None
- print("✅ Flask app integration test passed")
- finally:
- os.unlink(config_file)
- if 'EMERGENCY_CONFIG' in os.environ:
- del os.environ['EMERGENCY_CONFIG']
- finally:
- shutil.rmtree(temp_dir)
- def main():
- """Run all tests"""
- print("Running multi-key functionality tests...\n")
- try:
- test_multikey_config()
- test_invalid_config()
- test_key_config_validation()
- test_password_functions()
- test_app_integration()
- print("\n🎉 All tests passed! Multi-key functionality is working correctly.")
- return True
- except AssertionError as e:
- print(f"\n❌ Test failed: {e}")
- return False
- except Exception as e:
- print(f"\n❌ Unexpected error: {e}")
- import traceback
- traceback.print_exc()
- return False
- if __name__ == "__main__":
- success = main()
- sys.exit(0 if success else 1)
|