| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- #!/usr/bin/env python3
- """
- Add Key Script for Emergency Access Server
- Adds new emergency access keys to the configuration with proper validation
- """
- import json
- import yaml
- import os
- import sys
- import argparse
- import secrets
- import string
- import hashlib
- import shutil
- from pathlib import Path
- from typing import Dict, Any, Optional
- def generate_secure_password(length: int = 64) -> str:
- """Generate a cryptographically secure password"""
- alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
- return ''.join(secrets.choice(alphabet) for _ in range(length))
- def generate_password_hash(password: str) -> str:
- """Generate a password hash using PBKDF2 with SHA-256"""
- salt = secrets.token_hex(16)
- password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
- return f"{salt}:{password_hash.hex()}"
- def load_config(config_path: str) -> Dict[str, Any]:
- """Load configuration from file"""
- try:
- with open(config_path, 'r') as f:
- if config_path.endswith(('.yaml', '.yml')):
- return yaml.safe_load(f)
- else:
- return json.load(f)
- except FileNotFoundError:
- raise Exception(f"Configuration file {config_path} not found")
- except Exception as e:
- raise Exception(f"Failed to load configuration: {str(e)}")
- def save_config(config: Dict[str, Any], config_path: str) -> None:
- """Save configuration to file"""
- # Create backup
- backup_path = f"{config_path}.backup"
- if os.path.exists(config_path):
- shutil.copy2(config_path, backup_path)
- print(f"📁 Created backup: {backup_path}")
- try:
- with open(config_path, 'w') as f:
- if config_path.endswith(('.yaml', '.yml')):
- yaml.safe_dump(config, f, indent=2, default_flow_style=False)
- else:
- json.dump(config, f, indent=2)
- print(f"💾 Configuration saved to: {config_path}")
- except Exception as e:
- # Restore backup if save failed
- if os.path.exists(backup_path):
- shutil.move(backup_path, config_path)
- raise Exception(f"Failed to save configuration: {str(e)}")
- def validate_key_id(key_id: str, existing_keys: Dict[str, Any]) -> None:
- """Validate key ID format and uniqueness"""
- if not key_id:
- raise ValueError("Key ID cannot be empty")
- if not key_id.replace('_', '').replace('-', '').isalnum():
- raise ValueError("Key ID must contain only letters, numbers, underscores, and hyphens")
- if key_id in existing_keys:
- raise ValueError(f"Key ID '{key_id}' already exists")
- def validate_file_path(file_path: str) -> None:
- """Validate file path and create directory if needed"""
- path = Path(file_path)
- # Create parent directory if it doesn't exist
- path.parent.mkdir(parents=True, exist_ok=True)
- # Check if we can write to the directory
- if not os.access(path.parent, os.W_OK):
- raise ValueError(f"Cannot write to directory: {path.parent}")
- def validate_backends(backends: list, available_backends: Optional[list] = None) -> None:
- """Validate notification backends"""
- if not backends:
- raise ValueError("At least one notification backend must be specified")
- if available_backends:
- invalid_backends = [b for b in backends if b not in available_backends]
- if invalid_backends:
- raise ValueError(f"Invalid backends: {invalid_backends}")
- def get_available_backends(config: Dict[str, Any]) -> list:
- """Extract available backends from ntfy config"""
- try:
- ntfy_config_path = config.get('notifications', {}).get('config_path', '/etc/emergency-access/ntfy.yml')
- if os.path.exists(ntfy_config_path):
- with open(ntfy_config_path, 'r') as f:
- ntfy_config = yaml.safe_load(f)
- if 'backends' in ntfy_config:
- return list(ntfy_config['backends'].keys())
- except Exception:
- pass
- return []
- def create_key_file(file_path: str, content: str = None) -> None:
- """Create the key file with content or placeholder"""
- path = Path(file_path)
- if content is None:
- content = f"# Emergency Access Key File\n# Generated on: {os.popen('date').read().strip()}\n# TODO: Replace this with your actual key content\n"
- with open(path, 'w') as f:
- f.write(content)
- # Set secure permissions (owner read/write only)
- os.chmod(path, 0o600)
- print(f"🔑 Created key file: {file_path} (permissions: 600)")
- def add_key_interactive(config: Dict[str, Any], config_path: str) -> None:
- """Interactive key addition"""
- print("\n" + "=" * 60)
- print("INTERACTIVE KEY ADDITION")
- print("=" * 60)
- existing_keys = config.get('keys', {})
- available_backends = get_available_backends(config)
- # Get key ID
- while True:
- key_id = input("\nEnter key ID (e.g., 'backup', 'master', 'recovery'): ").strip()
- try:
- validate_key_id(key_id, existing_keys)
- break
- except ValueError as e:
- print(f"❌ {e}")
- # Get route (optional, defaults to /emergency-key-{key_id})
- default_route = f"/emergency-key-{key_id}"
- route = input(f"Enter route (default: {default_route}): ").strip()
- if not route:
- route = default_route
- # Get file path
- while True:
- default_file = f"/etc/emergency-access/{key_id}-key.txt"
- file_path = input(f"Enter key file path (default: {default_file}): ").strip()
- if not file_path:
- file_path = default_file
- try:
- validate_file_path(file_path)
- break
- except ValueError as e:
- print(f"❌ {e}")
- # Get username (optional, defaults to emergency_{key_id})
- default_username = f"emergency_{key_id}"
- username = input(f"Enter username (default: {default_username}): ").strip()
- if not username:
- username = default_username
- # Get password
- import getpass
- while True:
- password = getpass.getpass("Enter password (leave empty to generate): ").strip()
- if not password:
- password = generate_secure_password(64)
- print(f"🔐 Generated password: {password}")
- break
- elif len(password) < 12:
- print("❌ Password must be at least 12 characters")
- else:
- break
- password_hash = generate_password_hash(password)
- # Get notification backends
- if available_backends:
- print(f"\nAvailable backends: {', '.join(available_backends)}")
- while True:
- backends_input = input("Enter notification backends (comma-separated): ").strip()
- backends = [b.strip() for b in backends_input.split(',') if b.strip()]
- try:
- validate_backends(backends, available_backends if available_backends else None)
- break
- except ValueError as e:
- print(f"❌ {e}")
- # Get custom message (optional)
- default_message = f"🚨 EMERGENCY: Key {key_id} accessed from server"
- message = input(f"Enter notification message (default: {default_message}): ").strip()
- if not message:
- message = default_message
- # Create key file
- create_file = input(f"\nCreate key file at {file_path}? (y/N): ").strip().lower()
- if create_file == 'y':
- key_content = input("Enter key content (leave empty for placeholder): ").strip()
- create_key_file(file_path, key_content if key_content else None)
- # Add key to config
- add_key_to_config(config, key_id, {
- 'route': route,
- 'file': file_path,
- 'username': username,
- 'password_hash': password_hash,
- 'backends': backends,
- 'message': message
- })
- save_config(config, config_path)
- print(f"\n✅ Successfully added key '{key_id}'")
- print(f" Route: {route}")
- print(f" File: {file_path}")
- print(f" Username: {username}")
- print(f" Password: {password}")
- print(f" Backends: {', '.join(backends)}")
- def add_key_to_config(config: Dict[str, Any], key_id: str, key_config: Dict[str, Any]) -> None:
- """Add key configuration to config"""
- if 'keys' not in config:
- config['keys'] = {}
- config['keys'][key_id] = key_config
- def add_key_programmatic(config: Dict[str, Any], config_path: str, args) -> None:
- """Add key programmatically using command line arguments"""
- existing_keys = config.get('keys', {})
- available_backends = get_available_backends(config)
- # Validate inputs
- validate_key_id(args.key_id, existing_keys)
- validate_file_path(args.file)
- validate_backends(args.backends, available_backends if available_backends else None)
- # Generate password if not provided
- if args.password:
- password = args.password
- else:
- password = generate_secure_password(64)
- print(f"🔐 Generated password: {password}")
- password_hash = generate_password_hash(password)
- # Set defaults
- route = args.route or f"/emergency-key-{args.key_id}"
- username = args.username or f"emergency_{args.key_id}"
- message = args.message or f"🚨 EMERGENCY: Key {args.key_id} accessed from server"
- # Create key file if requested
- if args.create_file:
- create_key_file(args.file, args.key_content)
- # Add key to config
- add_key_to_config(config, args.key_id, {
- 'route': route,
- 'file': args.file,
- 'username': username,
- 'password_hash': password_hash,
- 'backends': args.backends,
- 'message': message
- })
- save_config(config, config_path)
- print(f"\n✅ Successfully added key '{args.key_id}'")
- print(f" Route: {route}")
- print(f" File: {args.file}")
- print(f" Username: {username}")
- print(f" Password: {password}")
- print(f" Backends: {', '.join(args.backends)}")
- def list_keys(config: Dict[str, Any]) -> None:
- """List existing keys"""
- keys = config.get('keys', {})
- if not keys:
- print("No keys configured")
- return
- print("\n" + "=" * 60)
- print("CONFIGURED KEYS")
- print("=" * 60)
- for key_id, key_config in keys.items():
- print(f"\n📋 Key ID: {key_id}")
- print(f" Route: {key_config.get('route', 'N/A')}")
- print(f" File: {key_config.get('file', 'N/A')}")
- print(f" Username: {key_config.get('username', 'N/A')}")
- print(f" Backends: {', '.join(key_config.get('backends', []))}")
- print(f" Message: {key_config.get('message', 'N/A')}")
- def remove_key(config: Dict[str, Any], config_path: str, key_id: str, remove_file: bool = False) -> None:
- """Remove a key from configuration"""
- keys = config.get('keys', {})
- if key_id not in keys:
- print(f"❌ Key '{key_id}' not found")
- return
- key_config = keys[key_id]
- # Remove file if requested
- if remove_file and 'file' in key_config:
- file_path = key_config['file']
- if os.path.exists(file_path):
- os.remove(file_path)
- print(f"🗑️ Removed file: {file_path}")
- # Remove from config
- del keys[key_id]
- save_config(config, config_path)
- print(f"✅ Removed key '{key_id}' from configuration")
- def main():
- parser = argparse.ArgumentParser(
- description="Add, list, or remove emergency access keys",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- Examples:
- # Interactive key addition
- python add_key.py --interactive
- # Add key programmatically
- python add_key.py --key-id backup --file /etc/emergency-access/backup.txt --backends matrix_sec,email
- # List existing keys
- python add_key.py --list
- # Remove a key
- python add_key.py --remove backup --remove-file
- # Add key with all options
- python add_key.py --key-id master \\
- --file /etc/emergency-access/master.txt \\
- --route /emergency-master \\
- --username master_user \\
- --password "secure_password" \\
- --backends matrix_sec,pushover \\
- --message "Master key accessed!" \\
- --create-file \\
- --key-content "actual key content here"
- """
- )
- parser.add_argument(
- '--config', '-c',
- default='config.json',
- help='Configuration file path (default: config.json)'
- )
- # Add key options
- parser.add_argument(
- '--interactive', '-i',
- action='store_true',
- help='Interactive key addition'
- )
- parser.add_argument(
- '--key-id',
- help='Key identifier'
- )
- parser.add_argument(
- '--file', '-f',
- help='Path to key file'
- )
- parser.add_argument(
- '--route',
- help='HTTP route for key access (default: /emergency-key-{key_id})'
- )
- parser.add_argument(
- '--username', '-u',
- help='HTTP Basic Auth username (default: emergency_{key_id})'
- )
- parser.add_argument(
- '--password', '-p',
- help='HTTP Basic Auth password (generated if not provided)'
- )
- parser.add_argument(
- '--backends', '-b',
- nargs='+',
- help='Notification backends (space or comma separated)'
- )
- parser.add_argument(
- '--message', '-m',
- help='Notification message'
- )
- parser.add_argument(
- '--create-file',
- action='store_true',
- help='Create the key file'
- )
- parser.add_argument(
- '--key-content',
- help='Content for the key file'
- )
- # List/remove options
- parser.add_argument(
- '--list', '-l',
- action='store_true',
- help='List existing keys'
- )
- parser.add_argument(
- '--remove', '-r',
- help='Remove key by ID'
- )
- parser.add_argument(
- '--remove-file',
- action='store_true',
- help='Also remove the key file when removing a key'
- )
- args = parser.parse_args()
- # Handle backends input (support both space and comma separated)
- if args.backends:
- backends = []
- for backend_group in args.backends:
- backends.extend([b.strip() for b in backend_group.replace(',', ' ').split() if b.strip()])
- args.backends = backends
- # Load configuration
- try:
- config = load_config(args.config)
- except Exception as e:
- print(f"❌ {e}")
- sys.exit(1)
- try:
- if args.list:
- list_keys(config)
- elif args.remove:
- remove_key(config, args.config, args.remove, args.remove_file)
- elif args.interactive:
- add_key_interactive(config, args.config)
- elif args.key_id and args.file and args.backends:
- add_key_programmatic(config, args.config, args)
- else:
- parser.print_help()
- print("\n❌ Missing required arguments for key addition")
- print(" Use --interactive for guided setup")
- print(" Or provide --key-id, --file, and --backends")
- except Exception as e:
- print(f"❌ {e}")
- sys.exit(1)
- if __name__ == '__main__':
- main()
|