From 6027f0d0e1ebeab406b57d0657f8d9cd2f89918a Mon Sep 17 00:00:00 2001 From: z66 <1415243231@qq.com> Date: Fri, 12 Sep 2025 10:48:17 +0800 Subject: [PATCH] =?UTF-8?q?minio=E5=AF=B9=E8=B1=A1=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E9=93=BE=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 16 + config/__init__.py | 273 ------------ config/settings.py | 409 ------------------ doc/{任务调度操作 => 任务调度操作.md} | 0 logs/application.log | 4 + readme.md | 107 +++-- system_management/scheduler/task_scheduler.py | 2 +- test/对象存储数据库链接测试.py | 171 ++++++++ tools/SQL.sql | 1 + utils/minio_agent.py | 383 ++++++++++++++++ 10 files changed, 651 insertions(+), 715 deletions(-) create mode 100644 config.py delete mode 100644 config/__init__.py delete mode 100644 config/settings.py rename doc/{任务调度操作 => 任务调度操作.md} (100%) create mode 100644 test/对象存储数据库链接测试.py create mode 100644 tools/SQL.sql create mode 100644 utils/minio_agent.py diff --git a/config.py b/config.py new file mode 100644 index 0000000..9d56e06 --- /dev/null +++ b/config.py @@ -0,0 +1,16 @@ +class Config: + + MYSQL_CONFIG = { + 'host': 'localhost', + 'port': 3306, + 'user': 'root', + 'password': '123123', + 'max_connections': 10 + } + + MINIO_CONFIG = { + 'endpoint': '127.0.0.1:9005', + 'access_key': 'admin', + 'secret_key': 'abc88888888', + 'secure': False # 社区版默认不启用SSL + } \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py deleted file mode 100644 index 6c574fc..0000000 --- a/config/__init__.py +++ /dev/null @@ -1,273 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -配置初始化模块 -功能: -1. 自动生成默认配置文件 -2. 多环境配置支持(dev/test/prod) -3. 敏感信息加密存储 -4. 配置完整性检查与修复 -""" - -import os -import json -import platform -from pathlib import Path -from typing import Dict, Any, Optional -import logging -from cryptography.fernet import Fernet -import hashlib - -# 初始化日志 -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger('config_init') - -class ConfigInitializer: - """配置初始化工具类""" - - def __init__(self, app_name: str = "intelligence_system"): - self.system = platform.system().lower() - self.app_name = app_name - self.config_dir = self._get_config_dir() - self.config_file = self.config_dir / "config.json" - self.secret_key_file = self.config_dir / ".secret.key" - self._fernet = None - - # 确保配置目录存在 - self.config_dir.mkdir(parents=True, exist_ok=True) - - # 设置文件权限(非Windows) - if self.system != 'windows': - os.chmod(self.config_dir, 0o700) - - def _get_config_dir(self) -> Path: - """获取适合当前平台的配置目录路径""" - if self.system == 'windows': - return Path(os.environ['APPDATA']) / self.app_name - elif self.system == 'darwin': # macOS - return Path.home() / "Library" / "Application Support" / self.app_name - else: # Linux及其他Unix-like - xdg_config = os.getenv('XDG_CONFIG_HOME', '~/.config') - return Path(xdg_config).expanduser() / self.app_name - - def _init_encryption(self): - """初始化加密模块""" - if not self.secret_key_file.exists(): - self.secret_key_file.write_bytes(Fernet.generate_key()) - if self.system != 'windows': - self.secret_key_file.chmod(0o600) # 仅用户可读写 - - self._fernet = Fernet(self.secret_key_file.read_bytes()) - - def encrypt_value(self, plaintext: str) -> str: - """加密敏感信息""" - if not self._fernet: - self._init_encryption() - return self._fernet.encrypt(plaintext.encode()).decode() - - def decrypt_value(self, ciphertext: str) -> str: - """解密信息""" - if not self._fernet: - self._init_encryption() - return self._fernet.decrypt(ciphertext.encode()).decode() - - def _get_default_config(self) -> Dict[str, Any]: - """获取默认配置模板""" - return { - "system": { - "env": "dev", # dev/test/prod - "log_level": "INFO", - "max_threads": max(1, os.cpu_count() or 4), - "data_dir": str(self.config_dir / "data") - }, - "api": { - "newsapi": { - "endpoint": "https://newsapi.org/v2", - "key": "" # 需加密存储 - }, - "weibo": { - "version": "2", - "access_token": "" # 需加密存储 - } - }, - "database": { - "type": "sqlite", - "path": str(self.config_dir / "data.db") - }, - "network": { - "timeout": 30, - "retries": 3, - "proxy": "" # 示例: http://user:pass@proxy:port - } - } - - def _migrate_old_config(self, config: Dict[str, Any]) -> Dict[str, Any]: - """旧配置迁移(兼容性处理)""" - # 示例:将旧版api_key迁移到新版结构 - if 'api_key' in config: - config.setdefault('api', {})['newsapi'] = { - 'key': config.pop('api_key') - } - return config - - def _validate_config(self, config: Dict[str, Any]) -> bool: - """验证配置完整性""" - required_keys = { - "system": ["env", "log_level"], - "api/newsapi": ["endpoint"] - } - - for path, keys in required_keys.items(): - current = config - for part in path.split('/'): - current = current.get(part, {}) - if not isinstance(current, dict): - return False - - for key in keys: - if key not in current: - return False - return True - - def _repair_config(self, config: Dict[str, Any]) -> Dict[str, Any]: - """自动修复缺失的配置项""" - default_config = self._get_default_config() - - def _merge(current, default): - for key, value in default.items(): - if key not in current: - current[key] = value - elif isinstance(value, dict): - _merge(current[key], value) - return current - - return _merge(config, default_config) - - def init_config(self, force: bool = False) -> bool: - """ - 初始化配置文件 - 参数: - force: 是否强制重新生成配置 - 返回: - bool: 是否创建了新配置 - """ - config = None - - # 已有配置文件且不强制重置 - if self.config_file.exists() and not force: - try: - with open(self.config_file, 'r', encoding='utf-8') as f: - config = json.load(f) - - # 配置迁移和修复 - config = self._migrate_old_config(config) - if not self._validate_config(config): - config = self._repair_config(config) - logger.warning("自动修复不完整的配置文件") - - except Exception as e: - logger.error(f"加载现有配置失败: {str(e)}") - config = None - - # 需要创建新配置 - if config is None: - config = self._get_default_config() - logger.info("创建新的配置文件") - - # 加密敏感字段 - self._init_encryption() - for field in [ - "api/newsapi/key", - "api/weibo/access_token", - "network/proxy" - ]: - parts = field.split('/') - current = config - for part in parts[:-1]: - current = current.setdefault(part, {}) - - if parts[-1] in current and current[parts[-1]]: - current[parts[-1]] = self.encrypt_value(current[parts[-1]]) - - # 保存配置 - with open(self.config_file, 'w', encoding='utf-8') as f: - json.dump(config, f, indent=2, ensure_ascii=False) - - # 设置文件权限(非Windows) - if self.system != 'windows': - os.chmod(self.config_file, 0o600) - - return True - - def get_config_hash(self) -> str: - """获取配置文件哈希值(用于检测变更)""" - if not self.config_file.exists(): - return "" - - with open(self.config_file, 'rb') as f: - return hashlib.sha256(f.read()).hexdigest() - - def create_env_specific_config(self, env: str = None) -> bool: - """ - 创建环境特定配置 - 参数: - env: 环境类型(dev/test/prod) - """ - if not self.config_file.exists(): - self.init_config() - - with open(self.config_file, 'r', encoding='utf-8') as f: - base_config = json.load(f) - - env = env or base_config['system']['env'] - env_config = { - f"env_{env}": { - "api": { - "newsapi": {"endpoint": self._get_env_endpoint(env)} - }, - "database": { - "path": str(self.config_dir / f"data_{env}.db") - } - } - } - - env_file = self.config_dir / f"config.{env}.json" - with open(env_file, 'w', encoding='utf-8') as f: - json.dump(env_config, f, indent=2) - - return True - - def _get_env_endpoint(self, env: str) -> str: - """获取环境特定的API端点""" - endpoints = { - "dev": "http://dev-api.example.com", - "test": "https://test-api.example.com", - "prod": "https://api.example.com" - } - return endpoints.get(env, endpoints['dev']) - -# 快捷初始化函数 -def init_app_config(app_name: str = None, force: bool = False) -> bool: - """ - 快速初始化应用配置 - 参数: - app_name: 应用名称 - force: 是否强制重新初始化 - """ - return ConfigInitializer(app_name).init_config(force) - -# 测试代码 -if __name__ == "__main__": - # 初始化配置 - initializer = ConfigInitializer() - if initializer.init_config(): - print("配置文件已生成:", initializer.config_file) - - # 创建环境配置示例 - initializer.create_env_specific_config("prod") - print("生产环境配置已生成") - - # 加密演示 - encrypted = initializer.encrypt_value("my_secret_key") - print("加密示例:", encrypted) - print("解密测试:", initializer.decrypt_value(encrypted)) \ No newline at end of file diff --git a/config/settings.py b/config/settings.py deleted file mode 100644 index f119cfb..0000000 --- a/config/settings.py +++ /dev/null @@ -1,409 +0,0 @@ -import os -import sys -import platform -import pandas as pd -import pymysql -from pymysql import cursors -from pymysql.err import MySQLError -from dbutils.pooled_db import PooledDB -from typing import Union, List, Dict, Any, Optional, Tuple -import threading -from datetime import datetime -import numpy as np -from pathlib import Path - -# 导入您的日志系统 -from utils.logger import log as logger - -class MySQLAgent: - """ - 全平台兼容的MySQL数据库操作类 - 支持Windows/macOS/Linux系统 - """ - - _instance = None - _lock = threading.Lock() - - # 各平台特定的配置 - PLATFORM_CONFIG = { - 'Windows': { - 'socket_timeout': 30, - 'connect_timeout': 10, - 'ssl': None - }, - 'Darwin': { # macOS - 'socket_timeout': 60, - 'connect_timeout': 15, - 'ssl': {'ca': '/usr/local/etc/openssl/cert.pem'} - }, - 'Linux': { - 'socket_timeout': 60, - 'connect_timeout': 15, - 'ssl': None - } - } - - def __new__(cls, *args, **kwargs): - if not cls._instance: - with cls._lock: - if not cls._instance: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self, config: dict = None): - if hasattr(self, '_pool') and self._pool: - return - - if not config: - from config.settings import DATABASE_CONFIG - config = DATABASE_CONFIG - - # 获取当前平台配置 - current_platform = platform.system() - platform_config = self.PLATFORM_CONFIG.get(current_platform, {}) - - # 基础配置 - self.config = { - 'host': config.get('host', 'localhost'), - 'port': config.get('port', 3306), - 'user': config.get('user', 'root'), - 'password': config.get('password', ''), - 'database': config.get('database', 'intelligence_system'), - 'charset': config.get('charset', 'utf8mb4'), - 'cursorclass': cursors.DictCursor, - 'autocommit': True, - **platform_config # 合并平台特定配置 - } - - # 处理各平台路径差异 - if current_platform == 'Windows': - self.config['ssl'] = None # Windows通常不需要SSL配置 - - # macOS特殊处理 - elif current_platform == 'Darwin': - if not os.path.exists(self.config['ssl']['ca']): - self.config['ssl'] = None - logger.warning("macOS SSL certificate not found, disabling SSL") - - self.pool_size = config.get('max_connections', 5) - self._pool = self._create_pool() - self.logger = logger.bind(module=f"MySQLAgent({current_platform})") - - def _create_pool(self) -> PooledDB: - """创建跨平台兼容的连接池""" - try: - # 各平台连接池参数调整 - pool_config = { - 'creator': pymysql, - 'maxconnections': self.pool_size, - 'mincached': 1, - 'maxcached': 3, - 'blocking': True, - 'ping': 1, # 定期检查连接有效性 - **self.config - } - - # Windows平台需要更短的超时时间 - if platform.system() == 'Windows': - pool_config['ping'] = 0 # Windows上ping有时不稳定 - - pool = PooledDB(**pool_config) - self.logger.info(f"Connection pool created for {platform.system()}") - return pool - - except Exception as e: - self.logger.critical("Failed to create connection pool", - error=str(e), - exc_info=True) - raise - - def _handle_path(self, path: str) -> str: - """处理跨平台路径问题""" - if platform.system() == 'Windows': - return path.replace('/', '\\') - return path - - def get_connection(self) -> pymysql.connections.Connection: - """ - 获取数据库连接(跨平台兼容) - - Returns: - pymysql.connections.Connection: 数据库连接 - - Raises: - MySQLError: 如果连接失败 - """ - try: - conn = self._pool.connection() - - # macOS需要特殊处理SSL - if platform.system() == 'Darwin' and self.config.get('ssl'): - conn.ping(reconnect=True) - - self.logger.trace("Connection obtained") - return conn - - except Exception as e: - error_msg = str(e) - - # Windows特定错误处理 - if platform.system() == 'Windows' and "timed out" in error_msg: - self.logger.warning("Windows connection timeout, retrying...") - return self._retry_connection() - - self.logger.error("Connection failed", - error=error_msg, - exc_info=True) - raise - - def _retry_connection(self, max_retries: int = 3) -> pymysql.connections.Connection: - """Windows平台连接重试机制""" - for attempt in range(max_retries): - try: - conn = self._pool.connection() - self.logger.info(f"Connection established after {attempt+1} attempts") - return conn - except Exception: - if attempt == max_retries - 1: - raise - import time - time.sleep(1) - - def query_to_df(self, sql: str, params: Union[tuple, dict, None] = None, - parse_dates: Union[List[str], bool] = True) -> pd.DataFrame: - """ - 跨平台兼容的SQL查询 - - Args: - sql (str): SQL语句 - params (Union[tuple, dict, None]): 参数 - parse_dates (Union[List[str], bool]): 日期解析 - - Returns: - pd.DataFrame: 查询结果 - """ - try: - with self.get_connection() as conn: - # Linux/macOS需要更长的查询超时 - if platform.system() != 'Windows': - conn.cursor().execute("SET SESSION wait_timeout=600") - - df = pd.read_sql(sql, conn, params=params, parse_dates=parse_dates) - - # Windows平台需要手动关闭游标 - if platform.system() == 'Windows': - conn.cursor().close() - - self.logger.info("Query executed", rows=len(df)) - return df - - except Exception as e: - self.logger.error("Query failed", - sql=sql, - params=params, - error=str(e), - exc_info=True) - raise - - def insert_from_df(self, table_name: str, df: pd.DataFrame, - chunk_size: int = 1000, replace: bool = False) -> int: - """ - 跨平台数据插入 - - Args: - table_name (str): 表名 - df (pd.DataFrame): 数据 - chunk_size (int): 分批大小 - replace (bool): 是否替换 - - Returns: - int: 插入行数 - """ - if df.empty: - self.logger.warning("Empty DataFrame", table=table_name) - return 0 - - try: - method = 'replace' if replace else 'append' - total_rows = 0 - - with self.get_connection() as conn: - # 各平台不同的分批策略 - if platform.system() == 'Windows': - chunk_size = min(chunk_size, 500) # Windows上减小批次 - - for i in range(0, len(df), chunk_size): - chunk = df.iloc[i:i + chunk_size] - - # macOS需要特殊处理datetime - if platform.system() == 'Darwin': - for col in chunk.select_dtypes(include=['datetime64']): - chunk[col] = chunk[col].dt.strftime('%Y-%m-%d %H:%M:%S') - - chunk.to_sql( - table_name, - conn, - if_exists=method, - index=False, - method='multi' - ) - total_rows += len(chunk) - method = 'append' - - self.logger.info("Data inserted", table=table_name, rows=total_rows) - return total_rows - - except Exception as e: - self.logger.error("Insert failed", - table=table_name, - error=str(e), - exc_info=True) - raise - - def execute_sql(self, sql: str, params: Union[tuple, dict, None] = None, - fetch: bool = False) -> Union[int, List[Dict[str, Any]]]: - """ - 跨平台SQL执行 - - Args: - sql (str): SQL语句 - params (Union[tuple, dict, None]): 参数 - fetch (bool): 是否获取结果 - - Returns: - Union[int, List[Dict[str, Any]]]: 结果 - """ - conn = None - cursor = None - try: - conn = self.get_connection() - cursor = conn.cursor() - - # Linux/macOS需要更长的执行时间 - if platform.system() != 'Windows': - cursor.execute("SET SESSION max_execution_time=600000") - - cursor.execute(sql, params) - - if fetch: - result = cursor.fetchall() - self.logger.debug("Query executed", rows=len(result)) - return result - else: - affected_rows = cursor.rowcount - self.logger.debug("Update executed", affected_rows=affected_rows) - return affected_rows - - except Exception as e: - self.logger.error("SQL execution failed", - sql=sql, - params=params, - error=str(e), - exc_info=True) - raise - finally: - if cursor: - cursor.close() - if conn: - conn.close() - - def begin_transaction(self) -> pymysql.connections.Connection: - """开始事务(跨平台兼容)""" - try: - conn = self.get_connection() - conn.autocommit(False) - - # macOS需要特殊处理事务隔离级别 - if platform.system() == 'Darwin': - conn.cursor().execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") - - self.logger.debug("Transaction started") - return conn - except Exception as e: - self.logger.error("Begin transaction failed", error=str(e)) - raise - - def commit_transaction(self, conn: pymysql.connections.Connection) -> None: - """提交事务(跨平台兼容)""" - try: - conn.commit() - self.logger.debug("Transaction committed") - except Exception as e: - self.logger.error("Commit failed", error=str(e)) - raise - finally: - conn.close() - - def rollback_transaction(self, conn: pymysql.connections.Connection) -> None: - """回滚事务(跨平台兼容)""" - try: - conn.rollback() - self.logger.warning("Transaction rolled back") - except Exception as e: - self.logger.error("Rollback failed", error=str(e)) - finally: - conn.close() - - def __del__(self): - """析构函数(跨平台资源清理)""" - if hasattr(self, '_pool'): - try: - self._pool.close() - self.logger.info("Connection pool closed") - except Exception as e: - self.logger.error("Failed to close pool", error=str(e)) - - -# 平台特定的默认配置 -def get_default_config(): - """获取各平台默认配置""" - current_platform = platform.system() - - base_config = { - 'host': 'localhost', - 'port': 3306, - 'user': 'root', - 'password': '', - 'database': 'intelligence_system', - 'max_connections': 5 - } - - if current_platform == 'Windows': - return { - **base_config, - 'connect_timeout': 10, - 'read_timeout': 30, - 'write_timeout': 30 - } - elif current_platform == 'Darwin': - return { - **base_config, - 'connect_timeout': 15, - 'read_timeout': 60, - 'write_timeout': 60, - 'ssl': {'ca': '/usr/local/etc/openssl/cert.pem'} - } - else: # Linux和其他平台 - return { - **base_config, - 'connect_timeout': 15, - 'read_timeout': 60, - 'write_timeout': 60 - } - - -# 使用示例 -if __name__ == "__main__": - # 自动获取适合当前平台的配置 - config = get_default_config() - - # 初始化数据库连接 - db = MySQLAgent(config) - - # 测试查询 - try: - df = db.query_to_df("SELECT VERSION() as version") - print(f"Database version: {df['version'].iloc[0]}") - print(f"Running on: {platform.system()} {platform.release()}") - except Exception as e: - print(f"Error: {str(e)}") \ No newline at end of file diff --git a/doc/任务调度操作 b/doc/任务调度操作.md similarity index 100% rename from doc/任务调度操作 rename to doc/任务调度操作.md diff --git a/logs/application.log b/logs/application.log index 317b32b..f74f80d 100644 --- a/logs/application.log +++ b/logs/application.log @@ -106,3 +106,7 @@ 2025-08-06 17:25:57.519 | CRITICAL | mysql_agent:107 - Failed to create connection pool 2025-08-06 17:26:18.578 | INFO | mysql_agent:103 - Connection pool created 2025-08-06 17:28:09.242 | INFO | mysql_agent:103 - Connection pool created +2025-09-12 10:45:53.283 | INFO | minio_agent:83 - MinIO客户端创建成功 +2025-09-12 10:45:53.291 | INFO | minio_agent:94 - 成功连接到MinIO服务:127.0.0.1:9005 +2025-09-12 10:45:53.297 | INFO | minio_agent:112 - 存储桶创建成功:test-bucket-20250912104553 +2025-09-12 10:45:53.302 | INFO | minio_agent:331 - 对象列表查询成功 diff --git a/readme.md b/readme.md index 358cc39..a259c3f 100644 --- a/readme.md +++ b/readme.md @@ -6,11 +6,6 @@ https://alidocs.dingtalk.com/i/nodes/NZQYprEoWoexdo1ohPdxXvDbJ1waOeDk?utm_scene= ### 程序框架 ```angular2html intelligence_system/ -├── config/ # 系统配置中心 -│ ├── __init__.py # 配置包初始化 -│ ├── config.py # 配置加载与管理 -│ └── constants.py # 系统常量定义 - ├── data_collection/ # 数据采集层 │ ├── spiders/ # 网络爬虫子系统 │ │ ├── weibo_spider.py # 黑猫爬虫 @@ -20,7 +15,7 @@ intelligence_system/ │ │ │ └── internal/ # 内部数据收集 │ ├── jian_dao_cloud.py # 简道云表单收集器 - +│ ├── data_processing/ # 数据处理层 │ ├── structured/ # 结构化数据处理 │ │ ├── data_cleaner.py # 数据清洗(去重/标准化) @@ -35,7 +30,7 @@ intelligence_system/ │ ├── nlp_processor.py # 自然语言处理引擎 │ ├── sentiment_analyzer.py # 情感分析模型 │ └── topic_modeler.py # LDA主题建模工具 - +│ ├── services/ # 应用服务层 │ ├── monitoring/ # 舆情监控 │ │ ├── opinion_monitor.py # 实时舆情追踪 @@ -52,7 +47,7 @@ intelligence_system/ │ └── alert/ # 预警服务 │ ├── alert_trigger.py # 动态阈值告警 │ └── notification_center.py # 邮件/短信通知 - +│ ├── system_management/ # 系统管理层 │ ├── scheduler/ # 任务调度 │ │ └── task_scheduler.py # 任务调度器 @@ -60,13 +55,14 @@ intelligence_system/ │ └── monitor/ # 系统监控 │ ├── health_monitor.py # 服务健康检测 │ └── performance_watcher.py # 资源占用监控 - +│ ├── utils/ # 工具库 │ ├── file_handler.py # 通用文件操作 │ ├── logger.py # 日志系统 │ ├── mysql_agent.py # MySQL读写管理器 │ └── datetime_parser.py # 时间格式处理 - +│ +├── config.py # 配置加载与管理 └── main.py # 系统入口(启动所有服务) ``` @@ -74,6 +70,7 @@ intelligence_system/ 1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行 2. 配置需要在配置类中定义 3. 密钥等信息直接放在配置类中 +4. 数据存储遵循"结构化存MySQL,非结构化存MinIO"原则,通过元数据关联 ### 主程序设计 主程序需要一次启动,一直运行,启动时运行一次(在代码中可取消),之后每天定时生成一次报告 @@ -87,32 +84,78 @@ intelligence_system/ 主程序应包含日报、周报等生成,根据时间定时生成报告,报告需要存储 ### 日志设计 -日志系统应兼容多个平台,如win、mac和linux,日志需要保存为log文件,并且在日志大于20mb时自动压缩 +日志系统兼容Windows、Mac、Linux平台,以`log`文件形式存储,超过20MB自动压缩。新增存储相关日志内容: +- MySQL操作:批量插入行数、表结构变更、事务状态 +- MinIO操作:文件上传/下载状态、路径、大小、耗时 +- 关联日志:MySQL记录与MinIO对象的绑定关系(如"ID:123 关联文件: collector/images/xxx.jpg") +- 异常日志:MySQL连接失败、MinIO上传超时、数据关联不一致等告警信息 -### 数据库链接设计 -数据存储放在数据库中,数据库类型为mysql,数据库名称为intelligence_system +### 存储系统设计(MinIO+MySQL) +#### 核心存储分工 +| 存储类型 | 适用数据 | 核心作用 | +|----------|----------|----------| +| MySQL | 结构化数据、元数据、关系型数据 | 存储业务逻辑数据、非结构化数据的索引信息、任务调度信息等 | +| MinIO | 非结构化数据 | 存储图片、视频、PDF文档、原始爬取文件等二进制/大文件数据 | -数据库表的命名规则与目录一致,数据采集类以collector_为开头,数据处理类以processor_为开 -头,数据存储类以storage_为开头,应用层类以application_为开头 -依次类推。 -数据库链接为通用配置,要求数据采集或处理类等,可以直接调用封装好的数据库 -链接,不必每次都重新写, -该链接包含表的增删改查功能,以及执行sql语句功能 +#### 核心存储配置 +1. **MySQL配置** + - 数据库名称:`intelligence_system` + - 连接管理:通过`utils/mysql_agent.py`封装线程安全的连接池,提供结构化数据的增删改查及SQL执行能力 + - 适配特性:支持多平台(Windows/macOS/Linux)的超时配置和批处理优化 -数据库结构: -1. collector_news_api:新闻api数据表 -2. collector_complaint_spider:投诉数据表 -3. processor_text_processor:文本处理数据表 -4. processor_image_processor:图片处理数据表 -5. main_task 任务调度表 -6. application_reporter_daily:日报数据表 -7. application_reporter_monthly:周报数据表 +2. **MinIO配置** + - 存储桶命名规则:按数据类型划分,如`collector-images`(采集层图片)、`processor-videos`(处理层视频) + - 连接管理:通过`utils/minio_agent.py`封装客户端,提供对象上传、下载、删除、查询URL等能力 + - 路径规则:`{数据层}/{来源}/{时间戳}_{唯一ID}.{后缀}`(例:`collector/weibo_spider/20240520_12345.jpg`) + + +#### 表命名规则(扩展) +- 数据采集类:以`collector_`为前缀(存储采集到的结构化数据及MinIO对象元数据) +- 数据处理类:以`processor_`为前缀(存储处理结果的结构化数据及MinIO处理后对象的元数据) +- 数据存储类:以`storage_`为前缀(存储MinIO对象的索引信息,如哈希、大小、访问权限等) +- 应用层类:以`application_`为前缀(对应业务应用数据) +- 系统类:如任务调度表等采用功能命名(如`main_task`) + + +#### 核心表结构 +1. `collector_news_api`:新闻API采集数据表(存储新闻标题、内容等结构化数据) +2. `collector_complaint_spider`:投诉信息爬虫数据表(含投诉文本、附件MinIO路径`attachment_minio_path`等) +3. `collector_image_source`:采集层图片元数据表(存储图片URL、MinIO路径、格式、大小等) +4. `processor_text_processor`:文本处理结果表(存储NLP分析结果、关联原文ID等) +5. `processor_image_processor`:图片处理结果表(存储识别标签、特征向量、处理后图片MinIO路径`result_minio_path`等) +6. `storage_object_index`:MinIO对象索引表(存储所有对象的MinIO路径、哈希值、创建时间、过期时间等) +7. `main_task`:任务调度表(存储任务名称、路径、执行频率、上次/下次执行时间等) +8. `application_reporter_daily`:日报数据表(存储日报结构化内容、报表文件MinIO路径等) +9. `application_reporter_monthly`:月报数据表(存储月报结构化内容、报表文件MinIO路径等) + + +#### 数据交互特性 +1. **MySQL交互** + - 支持DataFrame直接读写,提供分块处理(`chunksize`)和批量插入能力 + - 自动适配平台特性(如Windows小批次写入优化) + - 完善的事务机制确保结构化数据一致性 + +2. **MinIO交互** + - 支持大文件分片上传、断点续传 + +3. **联动机制** + - 非结构化数据存储时,先上传至MinIO获取路径,再将路径及元数据写入MySQL + - 读取非结构化数据时,先从MySQL获取MinIO路径,再通过路径从MinIO下载 + - 日志同步记录MySQL操作和MinIO对象操作(如"上传文件至MinIO: {path},关联MySQL记录ID: {id}") ### 数据采集设计 -每一个数据采集均为独立python文件,里面执行主程序均为main,以方便调度 -每一个数据采集均会根据规则创建数据库表,数据处理类以processor_为开头,(或者统一维护到一个表中,按来源去区分) +1. 结构化数据(如新闻文本、投诉内容):直接写入对应`collector_`前缀表 +2. 非结构化数据(如爬取的图片、附件): + - 调用`minio_agent.py`上传至对应存储桶 + - 将MinIO路径、文件大小、格式等元数据写入`collector_`前缀表或`storage_object_index`表 +3. 每个采集模块(独立py文件,`main`方法入口)需同时处理MySQL和MinIO交互,确保数据关联完整 -### 数据处理 -从多个数据库库表中获取数据,对数据进行处理,处理完成后将结果保存到数据库中,处理结果可能存储在多个表中 -数据处理数据库表以processor_为开头 + +### 数据处理设计 +1. 结构化数据处理:从MySQL读取原始数据,处理后写入`processor_`前缀表 +2. 非结构化数据处理: + - 从MySQL获取MinIO路径,通过`minio_agent.py`下载原始文件 + - 处理后(如图片识别、视频帧提取)将结果文件上传至MinIO(处理层存储桶) + - 将处理结果的结构化信息(如识别标签)和处理后文件的MinIO路径写入`processor_`前缀表 +3. 支持多表关联存储,通过`source_id`关联原始数据与处理结果 diff --git a/system_management/scheduler/task_scheduler.py b/system_management/scheduler/task_scheduler.py index f04c1be..fc92ce0 100644 --- a/system_management/scheduler/task_scheduler.py +++ b/system_management/scheduler/task_scheduler.py @@ -6,7 +6,7 @@ import croniter import pytz from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd -from storage.mysql_agent import MySQLAgent +from utils.mysql_agent import MySQLAgent from utils.logger import CrossPlatformLog # 初始化调度器日志 diff --git a/test/对象存储数据库链接测试.py b/test/对象存储数据库链接测试.py new file mode 100644 index 0000000..ef37e9c --- /dev/null +++ b/test/对象存储数据库链接测试.py @@ -0,0 +1,171 @@ +import unittest +import os +import tempfile +import hashlib +from datetime import datetime +from utils.minio_agent import MinIOAgent # 导入之前的MinIO操作类 + + +class TestMinIOAgent(unittest.TestCase): + # 测试配置 - 本地MinIO社区版 + MINIO_CONFIG = { + 'endpoint': '127.0.0.1:9005', + 'access_key': 'admin', # 默认账号 + 'secret_key': 'abc88888888', # 默认密码 + 'secure': False # 社区版默认不启用SSL + } + + @classmethod + def setUpClass(cls): + """初始化测试环境""" + # 创建唯一测试桶(避免冲突) + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + cls.test_bucket = f"test-bucket-{timestamp}" + cls.test_object = "test-data/sample.txt" + cls.test_content = b"this is MinIO test data: 1234567890" + + # 初始化客户端 + cls.minio_agent = MinIOAgent(cls.MINIO_CONFIG) + + # 确保测试桶存在 + cls.minio_agent.create_bucket(cls.test_bucket) + + @classmethod + def tearDownClass(cls): + """清理测试环境""" + try: + # 列出并删除桶内所有对象 + objects = cls.minio_agent.list_objects(cls.test_bucket) + for obj in objects: + cls.minio_agent.delete_object(cls.test_bucket, obj['object_name']) + + # 删除测试桶(MinIO要求桶为空才能删除) + cls.minio_agent._client.remove_bucket(cls.test_bucket) + print(f"\n测试清理完成,已删除桶: {cls.test_bucket}") + except Exception as e: + print(f"清理测试环境失败: {str(e)}") + + def test_01_create_bucket(self): + """测试创建存储桶""" + new_bucket = f"temp-bucket-{datetime.now().microsecond}" + result = self.minio_agent.create_bucket(new_bucket) + self.assertTrue(result, "存储桶创建失败") + + # 验证桶是否存在 + exists = self.minio_agent._client.bucket_exists(new_bucket) + self.assertTrue(exists, "存储桶创建后未检测到存在") + + # 清理临时桶 + self.minio_agent._client.remove_bucket(new_bucket) + + def test_02_upload_download(self): + """测试上传与下载功能""" + # 上传数据 + upload_meta = self.minio_agent.upload_bytes( + bucket=self.test_bucket, + object_name=self.test_object, + data=self.test_content + ) + + # 验证上传结果 + self.assertEqual(upload_meta['size'], len(self.test_content), "上传数据大小不匹配") + self.assertEqual(upload_meta['local_hash'], hashlib.md5(self.test_content).hexdigest(), "本地哈希校验失败") + + # 下载数据到临时文件 + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_path = temp_file.name + + download_meta = self.minio_agent.download_file( + bucket=self.test_bucket, + object_name=self.test_object, + local_path=temp_path + ) + + # 验证下载内容 + with open(temp_path, 'rb') as f: + downloaded_content = f.read() + + self.assertEqual(downloaded_content, self.test_content, "下载数据与原始数据不匹配") + self.assertEqual(download_meta['size'], len(self.test_content), "下载文件大小不匹配") + + # 清理临时文件 + os.unlink(temp_path) + + def test_03_presigned_url(self): + """测试生成预签名URL""" + # 先上传测试文件 + self.minio_agent.upload_bytes( + self.test_bucket, + self.test_object, + self.test_content + ) + + # 生成URL(有效期30秒) + url_info = self.minio_agent.get_presigned_url( + bucket=self.test_bucket, + object_name=self.test_object, + expires=30 + ) + + # 验证URL格式 + self.assertIn("http://127.0.0.1:9005", url_info['presigned_url'], "预签名URL格式不正确") + self.assertEqual(url_info['expires_in'], 30, "过期时间设置不正确") + + def test_04_list_objects(self): + """测试列出对象功能""" + # 上传多个测试对象 + test_objects = [ + "test-folder/file1.txt", + "test-folder/file2.csv", + "another-folder/image.jpg" + ] + + for obj in test_objects: + self.minio_agent.upload_bytes( + self.test_bucket, + obj, + b"tese_list_obj" + ) + + # 列出所有对象 + all_objects = self.minio_agent.list_objects(self.test_bucket) + self.assertEqual(len(all_objects), len(test_objects) + 1, "列出对象数量不匹配") # +1是之前的test_object + + # 按前缀筛选 + filtered_objects = self.minio_agent.list_objects( + self.test_bucket, + prefix="test-folder/" + ) + self.assertEqual(len(filtered_objects), 2, "按前缀筛选结果不正确") + + def test_05_delete_object(self): + """测试删除对象功能""" + # 创建测试对象 + delete_obj = "to-delete/temp.txt" + self.minio_agent.upload_bytes( + self.test_bucket, + delete_obj, + b"will be delete" + ) + + # 执行删除 + result = self.minio_agent.delete_object(self.test_bucket, delete_obj) + self.assertTrue(result, "删除对象失败") + + # 验证删除 + objects = self.minio_agent.list_objects(self.test_bucket, prefix="to-delete/") + self.assertEqual(len(objects), 0, "对象删除后仍存在") + + def test_06_upload_empty_data(self): + """测试上传空数据的异常处理""" + with self.assertRaises(ValueError, msg="未捕获空数据上传异常"): + self.minio_agent.upload_bytes( + self.test_bucket, + "empty.txt", + b"" + ) + + +if __name__ == "__main__": + # 执行测试并显示详细结果 + unittest.main(verbosity=2) \ No newline at end of file diff --git a/tools/SQL.sql b/tools/SQL.sql new file mode 100644 index 0000000..a7fac10 --- /dev/null +++ b/tools/SQL.sql @@ -0,0 +1 @@ +DROP DATABASE test \ No newline at end of file diff --git a/utils/minio_agent.py b/utils/minio_agent.py new file mode 100644 index 0000000..690aafb --- /dev/null +++ b/utils/minio_agent.py @@ -0,0 +1,383 @@ +import os +import sys +import platform +import threading +from typing import List, Dict, Optional, BinaryIO, Tuple, Any +from datetime import datetime, timedelta +import hashlib +from io import BytesIO +from minio import Minio +from minio.error import S3Error, MinioException +from utils.logger import log + + +class MinIOAgent: + """ + 全平台兼容的MinIO对象存储操作类 + 支持Windows/macOS/Linux系统,提供对象存储的上传、下载、查询等功能 + 专注于二进制数据处理,返回元数据用于与MySQL关联 + """ + _instance = None # 单例模式实例 + _lock = threading.Lock() # 线程锁,保证单例线程安全 + + def __new__(cls, *args, **kwargs): + """单例模式实现,确保全局只有一个实例""" + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, config: dict): + """ + 初始化MinIO连接 + + 参数: + config (dict): MinIO配置字典,包含以下键: + - endpoint: 服务端点(例:'localhost:9000') + - access_key: 访问密钥 + - secret_key: 密钥 + - [可选] secure: 是否使用SSL(默认False) + - [可选] region: 区域 + - [可选] timeout: 超时时间(秒,默认30) + """ + # 避免重复初始化 + if hasattr(self, '_client') and self._client: + return + + # 验证必要配置参数 + required_keys = ['endpoint', 'access_key', 'secret_key'] + if not all(key in config for key in required_keys): + raise ValueError(f"MinIO配置缺少必要参数,需要: {required_keys}") + + # 整合配置,设置默认值 + self.config = { + 'endpoint': config['endpoint'], + 'access_key': config['access_key'], + 'secret_key': config['secret_key'], + 'secure': config.get('secure', False), + 'region': config.get('region'), + 'timeout': config.get('timeout', 30) + } + + # 初始化日志,绑定当前平台信息 + current_platform = platform.system() + self.log = log.bind(module=f"MinIOAgent({current_platform})") + + # 创建客户端实例 + self._client = self._create_client() + + # 验证连接是否有效 + self._verify_connection() + + def _create_client(self) -> Minio: + """创建MinIO客户端实例""" + try: + client = Minio( + endpoint=self.config['endpoint'], + access_key=self.config['access_key'], + secret_key=self.config['secret_key'], + secure=self.config['secure'], + region=self.config['region'] + ) + self.log.info("MinIO客户端创建成功") + return client + except Exception as e: + self.log.critical("创建MinIO客户端失败", 错误=str(e), exc_info=True) + raise + + def _verify_connection(self) -> None: + """验证与MinIO服务的连接是否正常""" + try: + # 通过列出存储桶来验证连接 + self._client.list_buckets() + self.log.info(f"成功连接到MinIO服务:{self.config['endpoint']}") + except Exception as e: + self.log.critical("连接验证失败", 错误=str(e), exc_info=True) + raise + + def create_bucket(self, bucket_name: str) -> bool: + """ + 创建存储桶(如不存在) + + 参数: + bucket_name: 存储桶名称 + + 返回: + 是否成功创建(或已存在) + """ + try: + if not self._client.bucket_exists(bucket_name): + self._client.make_bucket(bucket_name) + self.log.info(f"存储桶创建成功:{bucket_name}") + return True + self.log.debug(f"存储桶已存在:{bucket_name}") + return True + except MinioException as e: + self.log.error(f"创建存储桶 {bucket_name} 失败", 错误=str(e), exc_info=True) + return False + + def upload_bytes(self, bucket: str, object_name: str, data: bytes) -> Dict[str, Any]: + """ + 上传二进制数据至MinIO + + 参数: + bucket: 存储桶名称 + object_name: 对象名称(路径) + data: 二进制数据 + + 返回: + 包含元数据的字典: + - bucket: 存储桶名称 + - object_name: 对象路径 + - size: 数据大小(字节) + - etag: 服务器生成的哈希值 + - content_type: 内容类型 + - upload_time: 上传时间(UTC) + - local_hash: 本地计算的MD5哈希 + """ + if not data: + raise ValueError("上传数据不能为空") + + # 确保存储桶存在 + self.create_bucket(bucket) + + try: + # 计算本地哈希(用于数据完整性校验) + local_hash = hashlib.md5(data).hexdigest() + + # 上传数据 + result = self._client.put_object( + bucket_name=bucket, + object_name=object_name, + data=BytesIO(data), + length=len(data), + content_type=self._guess_content_type(object_name) + ) + + # 构建元数据 + metadata = { + 'bucket': bucket, + 'object_name': object_name, + 'size': len(data), + 'etag': result.etag, + 'content_type': result.content_type, + 'upload_time': datetime.utcfromtimestamp(result.last_modified.timestamp()), + 'local_hash': local_hash + } + + self.log.info( + "文件上传成功", + 存储桶=bucket, + 对象名称=object_name, + 大小=len(data) + ) + return metadata + + except MinioException as e: + self.log.error( + "文件上传失败", + 存储桶=bucket, + 对象名称=object_name, + 错误=str(e), + exc_info=True + ) + raise + + def download_file(self, bucket: str, object_name: str, local_path: str) -> Dict[str, Any]: + """ + 从MinIO下载文件至本地 + + 参数: + bucket: 存储桶名称 + object_name: 对象名称(路径) + local_path: 本地保存路径 + + 返回: + 包含下载信息的字典: + - local_path: 本地路径 + - size: 文件大小 + - download_time: 下载时间 + """ + try: + # 创建父目录(如果不存在) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + + # 下载文件 + start_time = datetime.now() + self._client.fget_object(bucket, object_name, local_path) + download_time = datetime.now() - start_time + + # 获取文件信息 + stat = os.stat(local_path) + + result = { + 'local_path': local_path, + 'size': stat.st_size, + 'download_time': download_time.total_seconds(), + 'downloaded_at': datetime.now() + } + + self.log.info( + "文件下载成功", + 存储桶=bucket, + 对象名称=object_name, + 本地路径=local_path, + 大小=stat.st_size + ) + return result + + except MinioException as e: + self.log.error( + "文件下载失败", + 存储桶=bucket, + 对象名称=object_name, + 错误=str(e), + exc_info=True + ) + raise + except IOError as e: + self.log.error( + "本地文件操作失败", + 本地路径=local_path, + 错误=str(e), + exc_info=True + ) + raise + + def get_presigned_url(self, bucket: str, object_name: str, expires: int = 3600) -> Dict[str, str]: + """ + 生成临时访问URL + + 参数: + bucket: 存储桶名称 + object_name: 对象名称(路径) + expires: 过期时间(秒),默认3600秒 + + 返回: + 包含URL和过期信息的字典 + """ + try: + url = self._client.presigned_get_object( + bucket_name=bucket, + object_name=object_name, + expires=expires + ) + + result = { + 'presigned_url': url, + 'expires_in': expires, + 'expires_at': datetime.now() + timedelta(seconds=expires), + 'bucket': bucket, + 'object_name': object_name + } + + self.log.debug( + "预签名URL生成成功", + 存储桶=bucket, + 对象名称=object_name, + 过期时间=expires + ) + return result + + except MinioException as e: + self.log.error( + "生成预签名URL失败", + 存储桶=bucket, + 对象名称=object_name, + 错误=str(e), + exc_info=True + ) + raise + + def list_objects(self, bucket: str, prefix: str = "") -> List[Dict[str, Any]]: + """ + 查询指定前缀的对象列表及元数据 + + 参数: + bucket: 存储桶名称 + prefix: 对象路径前缀 + + 返回: + 对象信息列表,每个对象包含: + - bucket: 存储桶 + - object_name: 对象名称 + - size: 大小 + - last_modified: 最后修改时间 + - etag: 哈希值 + - content_type: 内容类型 + """ + try: + objects = self._client.list_objects( + bucket_name=bucket, + prefix=prefix, + recursive=True + ) + + result = [] + for obj in objects: + # 获取详细元数据 + stat = self._client.stat_object(bucket, obj.object_name) + + result.append({ + 'bucket': bucket, + 'object_name': obj.object_name, + 'size': obj.size, + 'last_modified': obj.last_modified, + 'etag': stat.etag, + 'content_type': stat.content_type + }) + + self.log.info( + "对象列表查询成功", + 存储桶=bucket, + 前缀=prefix, + 数量=len(result) + ) + return result + + except MinioException as e: + self.log.error( + "查询对象列表失败", + 存储桶=bucket, + 前缀=prefix, + 错误=str(e), + exc_info=True + ) + raise + + def delete_object(self, bucket: str, object_name: str) -> bool: + """ + 删除指定对象 + + 参数: + bucket: 存储桶名称 + object_name: 对象名称(路径) + + 返回: + 是否删除成功 + """ + try: + self._client.remove_object(bucket, object_name) + self.log.info( + "对象删除成功", + 存储桶=bucket, + 对象名称=object_name + ) + return True + except MinioException as e: + self.log.error( + "删除对象失败", + 存储桶=bucket, + 对象名称=object_name, + 错误=str(e), + exc_info=True + ) + return False + + @staticmethod + def _guess_content_type(object_name: str) -> str: + """根据文件名猜测内容类型""" + from mimetypes import guess_type + mime_type, _ = guess_type(object_name) + return mime_type or 'application/octet-stream' # 默认二进制流类型 \ No newline at end of file