ai初期模板

This commit is contained in:
2025-08-05 15:00:46 +08:00
commit 71e9c7c5bc
21 changed files with 1446 additions and 0 deletions
+273
View File
@@ -0,0 +1,273 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置初始化模块
功能:
1. 自动生成默认配置文件
2. 多环境配置支持(dev/test/prod
3. 敏感信息加密存储
4. 配置完整性检查与修复
"""
import os
import json
import platform
from pathlib import Path
from typing import Dict, Any, Optional
import logging
from cryptography.fernet import Fernet
import hashlib
# 初始化日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('config_init')
class ConfigInitializer:
"""配置初始化工具类"""
def __init__(self, app_name: str = "intelligence_system"):
self.system = platform.system().lower()
self.app_name = app_name
self.config_dir = self._get_config_dir()
self.config_file = self.config_dir / "config.json"
self.secret_key_file = self.config_dir / ".secret.key"
self._fernet = None
# 确保配置目录存在
self.config_dir.mkdir(parents=True, exist_ok=True)
# 设置文件权限(非Windows
if self.system != 'windows':
os.chmod(self.config_dir, 0o700)
def _get_config_dir(self) -> Path:
"""获取适合当前平台的配置目录路径"""
if self.system == 'windows':
return Path(os.environ['APPDATA']) / self.app_name
elif self.system == 'darwin': # macOS
return Path.home() / "Library" / "Application Support" / self.app_name
else: # Linux及其他Unix-like
xdg_config = os.getenv('XDG_CONFIG_HOME', '~/.config')
return Path(xdg_config).expanduser() / self.app_name
def _init_encryption(self):
"""初始化加密模块"""
if not self.secret_key_file.exists():
self.secret_key_file.write_bytes(Fernet.generate_key())
if self.system != 'windows':
self.secret_key_file.chmod(0o600) # 仅用户可读写
self._fernet = Fernet(self.secret_key_file.read_bytes())
def encrypt_value(self, plaintext: str) -> str:
"""加密敏感信息"""
if not self._fernet:
self._init_encryption()
return self._fernet.encrypt(plaintext.encode()).decode()
def decrypt_value(self, ciphertext: str) -> str:
"""解密信息"""
if not self._fernet:
self._init_encryption()
return self._fernet.decrypt(ciphertext.encode()).decode()
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置模板"""
return {
"system": {
"env": "dev", # dev/test/prod
"log_level": "INFO",
"max_threads": max(1, os.cpu_count() or 4),
"data_dir": str(self.config_dir / "data")
},
"api": {
"newsapi": {
"endpoint": "https://newsapi.org/v2",
"key": "" # 需加密存储
},
"weibo": {
"version": "2",
"access_token": "" # 需加密存储
}
},
"database": {
"type": "sqlite",
"path": str(self.config_dir / "data.db")
},
"network": {
"timeout": 30,
"retries": 3,
"proxy": "" # 示例: http://user:pass@proxy:port
}
}
def _migrate_old_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""旧配置迁移(兼容性处理)"""
# 示例:将旧版api_key迁移到新版结构
if 'api_key' in config:
config.setdefault('api', {})['newsapi'] = {
'key': config.pop('api_key')
}
return config
def _validate_config(self, config: Dict[str, Any]) -> bool:
"""验证配置完整性"""
required_keys = {
"system": ["env", "log_level"],
"api/newsapi": ["endpoint"]
}
for path, keys in required_keys.items():
current = config
for part in path.split('/'):
current = current.get(part, {})
if not isinstance(current, dict):
return False
for key in keys:
if key not in current:
return False
return True
def _repair_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""自动修复缺失的配置项"""
default_config = self._get_default_config()
def _merge(current, default):
for key, value in default.items():
if key not in current:
current[key] = value
elif isinstance(value, dict):
_merge(current[key], value)
return current
return _merge(config, default_config)
def init_config(self, force: bool = False) -> bool:
"""
初始化配置文件
参数:
force: 是否强制重新生成配置
返回:
bool: 是否创建了新配置
"""
config = None
# 已有配置文件且不强制重置
if self.config_file.exists() and not force:
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# 配置迁移和修复
config = self._migrate_old_config(config)
if not self._validate_config(config):
config = self._repair_config(config)
logger.warning("自动修复不完整的配置文件")
except Exception as e:
logger.error(f"加载现有配置失败: {str(e)}")
config = None
# 需要创建新配置
if config is None:
config = self._get_default_config()
logger.info("创建新的配置文件")
# 加密敏感字段
self._init_encryption()
for field in [
"api/newsapi/key",
"api/weibo/access_token",
"network/proxy"
]:
parts = field.split('/')
current = config
for part in parts[:-1]:
current = current.setdefault(part, {})
if parts[-1] in current and current[parts[-1]]:
current[parts[-1]] = self.encrypt_value(current[parts[-1]])
# 保存配置
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# 设置文件权限(非Windows
if self.system != 'windows':
os.chmod(self.config_file, 0o600)
return True
def get_config_hash(self) -> str:
"""获取配置文件哈希值(用于检测变更)"""
if not self.config_file.exists():
return ""
with open(self.config_file, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
def create_env_specific_config(self, env: str = None) -> bool:
"""
创建环境特定配置
参数:
env: 环境类型(dev/test/prod
"""
if not self.config_file.exists():
self.init_config()
with open(self.config_file, 'r', encoding='utf-8') as f:
base_config = json.load(f)
env = env or base_config['system']['env']
env_config = {
f"env_{env}": {
"api": {
"newsapi": {"endpoint": self._get_env_endpoint(env)}
},
"database": {
"path": str(self.config_dir / f"data_{env}.db")
}
}
}
env_file = self.config_dir / f"config.{env}.json"
with open(env_file, 'w', encoding='utf-8') as f:
json.dump(env_config, f, indent=2)
return True
def _get_env_endpoint(self, env: str) -> str:
"""获取环境特定的API端点"""
endpoints = {
"dev": "http://dev-api.example.com",
"test": "https://test-api.example.com",
"prod": "https://api.example.com"
}
return endpoints.get(env, endpoints['dev'])
# 快捷初始化函数
def init_app_config(app_name: str = None, force: bool = False) -> bool:
"""
快速初始化应用配置
参数:
app_name: 应用名称
force: 是否强制重新初始化
"""
return ConfigInitializer(app_name).init_config(force)
# 测试代码
if __name__ == "__main__":
# 初始化配置
initializer = ConfigInitializer()
if initializer.init_config():
print("配置文件已生成:", initializer.config_file)
# 创建环境配置示例
initializer.create_env_specific_config("prod")
print("生产环境配置已生成")
# 加密演示
encrypted = initializer.encrypt_value("my_secret_key")
print("加密示例:", encrypted)
print("解密测试:", initializer.decrypt_value(encrypted))