#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 配置初始化模块 功能: 1. 自动生成默认配置文件 2. 多环境配置支持(dev/test/prod) 3. 敏感信息加密存储 4. 配置完整性检查与修复 """ import os import json import platform from pathlib import Path from typing import Dict, Any, Optional import logging from cryptography.fernet import Fernet import hashlib # 初始化日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger('config_init') class ConfigInitializer: """配置初始化工具类""" def __init__(self, app_name: str = "intelligence_system"): self.system = platform.system().lower() self.app_name = app_name self.config_dir = self._get_config_dir() self.config_file = self.config_dir / "config.json" self.secret_key_file = self.config_dir / ".secret.key" self._fernet = None # 确保配置目录存在 self.config_dir.mkdir(parents=True, exist_ok=True) # 设置文件权限(非Windows) if self.system != 'windows': os.chmod(self.config_dir, 0o700) def _get_config_dir(self) -> Path: """获取适合当前平台的配置目录路径""" if self.system == 'windows': return Path(os.environ['APPDATA']) / self.app_name elif self.system == 'darwin': # macOS return Path.home() / "Library" / "Application Support" / self.app_name else: # Linux及其他Unix-like xdg_config = os.getenv('XDG_CONFIG_HOME', '~/.config') return Path(xdg_config).expanduser() / self.app_name def _init_encryption(self): """初始化加密模块""" if not self.secret_key_file.exists(): self.secret_key_file.write_bytes(Fernet.generate_key()) if self.system != 'windows': self.secret_key_file.chmod(0o600) # 仅用户可读写 self._fernet = Fernet(self.secret_key_file.read_bytes()) def encrypt_value(self, plaintext: str) -> str: """加密敏感信息""" if not self._fernet: self._init_encryption() return self._fernet.encrypt(plaintext.encode()).decode() def decrypt_value(self, ciphertext: str) -> str: """解密信息""" if not self._fernet: self._init_encryption() return self._fernet.decrypt(ciphertext.encode()).decode() def _get_default_config(self) -> Dict[str, Any]: """获取默认配置模板""" return { "system": { "env": "dev", # dev/test/prod "log_level": "INFO", "max_threads": max(1, os.cpu_count() or 4), "data_dir": str(self.config_dir / "data") }, "api": { "newsapi": { "endpoint": "https://newsapi.org/v2", "key": "" # 需加密存储 }, "weibo": { "version": "2", "access_token": "" # 需加密存储 } }, "database": { "type": "sqlite", "path": str(self.config_dir / "data.db") }, "network": { "timeout": 30, "retries": 3, "proxy": "" # 示例: http://user:pass@proxy:port } } def _migrate_old_config(self, config: Dict[str, Any]) -> Dict[str, Any]: """旧配置迁移(兼容性处理)""" # 示例:将旧版api_key迁移到新版结构 if 'api_key' in config: config.setdefault('api', {})['newsapi'] = { 'key': config.pop('api_key') } return config def _validate_config(self, config: Dict[str, Any]) -> bool: """验证配置完整性""" required_keys = { "system": ["env", "log_level"], "api/newsapi": ["endpoint"] } for path, keys in required_keys.items(): current = config for part in path.split('/'): current = current.get(part, {}) if not isinstance(current, dict): return False for key in keys: if key not in current: return False return True def _repair_config(self, config: Dict[str, Any]) -> Dict[str, Any]: """自动修复缺失的配置项""" default_config = self._get_default_config() def _merge(current, default): for key, value in default.items(): if key not in current: current[key] = value elif isinstance(value, dict): _merge(current[key], value) return current return _merge(config, default_config) def init_config(self, force: bool = False) -> bool: """ 初始化配置文件 参数: force: 是否强制重新生成配置 返回: bool: 是否创建了新配置 """ config = None # 已有配置文件且不强制重置 if self.config_file.exists() and not force: try: with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) # 配置迁移和修复 config = self._migrate_old_config(config) if not self._validate_config(config): config = self._repair_config(config) logger.warning("自动修复不完整的配置文件") except Exception as e: logger.error(f"加载现有配置失败: {str(e)}") config = None # 需要创建新配置 if config is None: config = self._get_default_config() logger.info("创建新的配置文件") # 加密敏感字段 self._init_encryption() for field in [ "api/newsapi/key", "api/weibo/access_token", "network/proxy" ]: parts = field.split('/') current = config for part in parts[:-1]: current = current.setdefault(part, {}) if parts[-1] in current and current[parts[-1]]: current[parts[-1]] = self.encrypt_value(current[parts[-1]]) # 保存配置 with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) # 设置文件权限(非Windows) if self.system != 'windows': os.chmod(self.config_file, 0o600) return True def get_config_hash(self) -> str: """获取配置文件哈希值(用于检测变更)""" if not self.config_file.exists(): return "" with open(self.config_file, 'rb') as f: return hashlib.sha256(f.read()).hexdigest() def create_env_specific_config(self, env: str = None) -> bool: """ 创建环境特定配置 参数: env: 环境类型(dev/test/prod) """ if not self.config_file.exists(): self.init_config() with open(self.config_file, 'r', encoding='utf-8') as f: base_config = json.load(f) env = env or base_config['system']['env'] env_config = { f"env_{env}": { "api": { "newsapi": {"endpoint": self._get_env_endpoint(env)} }, "database": { "path": str(self.config_dir / f"data_{env}.db") } } } env_file = self.config_dir / f"config.{env}.json" with open(env_file, 'w', encoding='utf-8') as f: json.dump(env_config, f, indent=2) return True def _get_env_endpoint(self, env: str) -> str: """获取环境特定的API端点""" endpoints = { "dev": "http://dev-api.example.com", "test": "https://test-api.example.com", "prod": "https://api.example.com" } return endpoints.get(env, endpoints['dev']) # 快捷初始化函数 def init_app_config(app_name: str = None, force: bool = False) -> bool: """ 快速初始化应用配置 参数: app_name: 应用名称 force: 是否强制重新初始化 """ return ConfigInitializer(app_name).init_config(force) # 测试代码 if __name__ == "__main__": # 初始化配置 initializer = ConfigInitializer() if initializer.init_config(): print("配置文件已生成:", initializer.config_file) # 创建环境配置示例 initializer.create_env_specific_config("prod") print("生产环境配置已生成") # 加密演示 encrypted = initializer.encrypt_value("my_secret_key") print("加密示例:", encrypted) print("解密测试:", initializer.decrypt_value(encrypted))