minio对象存储数据库链接

This commit is contained in:
z66
2025-09-12 10:48:17 +08:00
parent 76beaa60bc
commit 6027f0d0e1
10 changed files with 651 additions and 715 deletions
+16
View File
@@ -0,0 +1,16 @@
class Config:
MYSQL_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '123123',
'max_connections': 10
}
MINIO_CONFIG = {
'endpoint': '127.0.0.1:9005',
'access_key': 'admin',
'secret_key': 'abc88888888',
'secure': False # 社区版默认不启用SSL
}
-273
View File
@@ -1,273 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置初始化模块
功能:
1. 自动生成默认配置文件
2. 多环境配置支持(dev/test/prod
3. 敏感信息加密存储
4. 配置完整性检查与修复
"""
import os
import json
import platform
from pathlib import Path
from typing import Dict, Any, Optional
import logging
from cryptography.fernet import Fernet
import hashlib
# 初始化日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('config_init')
class ConfigInitializer:
"""配置初始化工具类"""
def __init__(self, app_name: str = "intelligence_system"):
self.system = platform.system().lower()
self.app_name = app_name
self.config_dir = self._get_config_dir()
self.config_file = self.config_dir / "config.json"
self.secret_key_file = self.config_dir / ".secret.key"
self._fernet = None
# 确保配置目录存在
self.config_dir.mkdir(parents=True, exist_ok=True)
# 设置文件权限(非Windows
if self.system != 'windows':
os.chmod(self.config_dir, 0o700)
def _get_config_dir(self) -> Path:
"""获取适合当前平台的配置目录路径"""
if self.system == 'windows':
return Path(os.environ['APPDATA']) / self.app_name
elif self.system == 'darwin': # macOS
return Path.home() / "Library" / "Application Support" / self.app_name
else: # Linux及其他Unix-like
xdg_config = os.getenv('XDG_CONFIG_HOME', '~/.config')
return Path(xdg_config).expanduser() / self.app_name
def _init_encryption(self):
"""初始化加密模块"""
if not self.secret_key_file.exists():
self.secret_key_file.write_bytes(Fernet.generate_key())
if self.system != 'windows':
self.secret_key_file.chmod(0o600) # 仅用户可读写
self._fernet = Fernet(self.secret_key_file.read_bytes())
def encrypt_value(self, plaintext: str) -> str:
"""加密敏感信息"""
if not self._fernet:
self._init_encryption()
return self._fernet.encrypt(plaintext.encode()).decode()
def decrypt_value(self, ciphertext: str) -> str:
"""解密信息"""
if not self._fernet:
self._init_encryption()
return self._fernet.decrypt(ciphertext.encode()).decode()
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置模板"""
return {
"system": {
"env": "dev", # dev/test/prod
"log_level": "INFO",
"max_threads": max(1, os.cpu_count() or 4),
"data_dir": str(self.config_dir / "data")
},
"api": {
"newsapi": {
"endpoint": "https://newsapi.org/v2",
"key": "" # 需加密存储
},
"weibo": {
"version": "2",
"access_token": "" # 需加密存储
}
},
"database": {
"type": "sqlite",
"path": str(self.config_dir / "data.db")
},
"network": {
"timeout": 30,
"retries": 3,
"proxy": "" # 示例: http://user:pass@proxy:port
}
}
def _migrate_old_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""旧配置迁移(兼容性处理)"""
# 示例:将旧版api_key迁移到新版结构
if 'api_key' in config:
config.setdefault('api', {})['newsapi'] = {
'key': config.pop('api_key')
}
return config
def _validate_config(self, config: Dict[str, Any]) -> bool:
"""验证配置完整性"""
required_keys = {
"system": ["env", "log_level"],
"api/newsapi": ["endpoint"]
}
for path, keys in required_keys.items():
current = config
for part in path.split('/'):
current = current.get(part, {})
if not isinstance(current, dict):
return False
for key in keys:
if key not in current:
return False
return True
def _repair_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""自动修复缺失的配置项"""
default_config = self._get_default_config()
def _merge(current, default):
for key, value in default.items():
if key not in current:
current[key] = value
elif isinstance(value, dict):
_merge(current[key], value)
return current
return _merge(config, default_config)
def init_config(self, force: bool = False) -> bool:
"""
初始化配置文件
参数:
force: 是否强制重新生成配置
返回:
bool: 是否创建了新配置
"""
config = None
# 已有配置文件且不强制重置
if self.config_file.exists() and not force:
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# 配置迁移和修复
config = self._migrate_old_config(config)
if not self._validate_config(config):
config = self._repair_config(config)
logger.warning("自动修复不完整的配置文件")
except Exception as e:
logger.error(f"加载现有配置失败: {str(e)}")
config = None
# 需要创建新配置
if config is None:
config = self._get_default_config()
logger.info("创建新的配置文件")
# 加密敏感字段
self._init_encryption()
for field in [
"api/newsapi/key",
"api/weibo/access_token",
"network/proxy"
]:
parts = field.split('/')
current = config
for part in parts[:-1]:
current = current.setdefault(part, {})
if parts[-1] in current and current[parts[-1]]:
current[parts[-1]] = self.encrypt_value(current[parts[-1]])
# 保存配置
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# 设置文件权限(非Windows
if self.system != 'windows':
os.chmod(self.config_file, 0o600)
return True
def get_config_hash(self) -> str:
"""获取配置文件哈希值(用于检测变更)"""
if not self.config_file.exists():
return ""
with open(self.config_file, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
def create_env_specific_config(self, env: str = None) -> bool:
"""
创建环境特定配置
参数:
env: 环境类型(dev/test/prod
"""
if not self.config_file.exists():
self.init_config()
with open(self.config_file, 'r', encoding='utf-8') as f:
base_config = json.load(f)
env = env or base_config['system']['env']
env_config = {
f"env_{env}": {
"api": {
"newsapi": {"endpoint": self._get_env_endpoint(env)}
},
"database": {
"path": str(self.config_dir / f"data_{env}.db")
}
}
}
env_file = self.config_dir / f"config.{env}.json"
with open(env_file, 'w', encoding='utf-8') as f:
json.dump(env_config, f, indent=2)
return True
def _get_env_endpoint(self, env: str) -> str:
"""获取环境特定的API端点"""
endpoints = {
"dev": "http://dev-api.example.com",
"test": "https://test-api.example.com",
"prod": "https://api.example.com"
}
return endpoints.get(env, endpoints['dev'])
# 快捷初始化函数
def init_app_config(app_name: str = None, force: bool = False) -> bool:
"""
快速初始化应用配置
参数:
app_name: 应用名称
force: 是否强制重新初始化
"""
return ConfigInitializer(app_name).init_config(force)
# 测试代码
if __name__ == "__main__":
# 初始化配置
initializer = ConfigInitializer()
if initializer.init_config():
print("配置文件已生成:", initializer.config_file)
# 创建环境配置示例
initializer.create_env_specific_config("prod")
print("生产环境配置已生成")
# 加密演示
encrypted = initializer.encrypt_value("my_secret_key")
print("加密示例:", encrypted)
print("解密测试:", initializer.decrypt_value(encrypted))
-409
View File
@@ -1,409 +0,0 @@
import os
import sys
import platform
import pandas as pd
import pymysql
from pymysql import cursors
from pymysql.err import MySQLError
from dbutils.pooled_db import PooledDB
from typing import Union, List, Dict, Any, Optional, Tuple
import threading
from datetime import datetime
import numpy as np
from pathlib import Path
# 导入您的日志系统
from utils.logger import log as logger
class MySQLAgent:
"""
全平台兼容的MySQL数据库操作类
支持Windows/macOS/Linux系统
"""
_instance = None
_lock = threading.Lock()
# 各平台特定的配置
PLATFORM_CONFIG = {
'Windows': {
'socket_timeout': 30,
'connect_timeout': 10,
'ssl': None
},
'Darwin': { # macOS
'socket_timeout': 60,
'connect_timeout': 15,
'ssl': {'ca': '/usr/local/etc/openssl/cert.pem'}
},
'Linux': {
'socket_timeout': 60,
'connect_timeout': 15,
'ssl': None
}
}
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, config: dict = None):
if hasattr(self, '_pool') and self._pool:
return
if not config:
from config.settings import DATABASE_CONFIG
config = DATABASE_CONFIG
# 获取当前平台配置
current_platform = platform.system()
platform_config = self.PLATFORM_CONFIG.get(current_platform, {})
# 基础配置
self.config = {
'host': config.get('host', 'localhost'),
'port': config.get('port', 3306),
'user': config.get('user', 'root'),
'password': config.get('password', ''),
'database': config.get('database', 'intelligence_system'),
'charset': config.get('charset', 'utf8mb4'),
'cursorclass': cursors.DictCursor,
'autocommit': True,
**platform_config # 合并平台特定配置
}
# 处理各平台路径差异
if current_platform == 'Windows':
self.config['ssl'] = None # Windows通常不需要SSL配置
# macOS特殊处理
elif current_platform == 'Darwin':
if not os.path.exists(self.config['ssl']['ca']):
self.config['ssl'] = None
logger.warning("macOS SSL certificate not found, disabling SSL")
self.pool_size = config.get('max_connections', 5)
self._pool = self._create_pool()
self.logger = logger.bind(module=f"MySQLAgent({current_platform})")
def _create_pool(self) -> PooledDB:
"""创建跨平台兼容的连接池"""
try:
# 各平台连接池参数调整
pool_config = {
'creator': pymysql,
'maxconnections': self.pool_size,
'mincached': 1,
'maxcached': 3,
'blocking': True,
'ping': 1, # 定期检查连接有效性
**self.config
}
# Windows平台需要更短的超时时间
if platform.system() == 'Windows':
pool_config['ping'] = 0 # Windows上ping有时不稳定
pool = PooledDB(**pool_config)
self.logger.info(f"Connection pool created for {platform.system()}")
return pool
except Exception as e:
self.logger.critical("Failed to create connection pool",
error=str(e),
exc_info=True)
raise
def _handle_path(self, path: str) -> str:
"""处理跨平台路径问题"""
if platform.system() == 'Windows':
return path.replace('/', '\\')
return path
def get_connection(self) -> pymysql.connections.Connection:
"""
获取数据库连接(跨平台兼容)
Returns:
pymysql.connections.Connection: 数据库连接
Raises:
MySQLError: 如果连接失败
"""
try:
conn = self._pool.connection()
# macOS需要特殊处理SSL
if platform.system() == 'Darwin' and self.config.get('ssl'):
conn.ping(reconnect=True)
self.logger.trace("Connection obtained")
return conn
except Exception as e:
error_msg = str(e)
# Windows特定错误处理
if platform.system() == 'Windows' and "timed out" in error_msg:
self.logger.warning("Windows connection timeout, retrying...")
return self._retry_connection()
self.logger.error("Connection failed",
error=error_msg,
exc_info=True)
raise
def _retry_connection(self, max_retries: int = 3) -> pymysql.connections.Connection:
"""Windows平台连接重试机制"""
for attempt in range(max_retries):
try:
conn = self._pool.connection()
self.logger.info(f"Connection established after {attempt+1} attempts")
return conn
except Exception:
if attempt == max_retries - 1:
raise
import time
time.sleep(1)
def query_to_df(self, sql: str, params: Union[tuple, dict, None] = None,
parse_dates: Union[List[str], bool] = True) -> pd.DataFrame:
"""
跨平台兼容的SQL查询
Args:
sql (str): SQL语句
params (Union[tuple, dict, None]): 参数
parse_dates (Union[List[str], bool]): 日期解析
Returns:
pd.DataFrame: 查询结果
"""
try:
with self.get_connection() as conn:
# Linux/macOS需要更长的查询超时
if platform.system() != 'Windows':
conn.cursor().execute("SET SESSION wait_timeout=600")
df = pd.read_sql(sql, conn, params=params, parse_dates=parse_dates)
# Windows平台需要手动关闭游标
if platform.system() == 'Windows':
conn.cursor().close()
self.logger.info("Query executed", rows=len(df))
return df
except Exception as e:
self.logger.error("Query failed",
sql=sql,
params=params,
error=str(e),
exc_info=True)
raise
def insert_from_df(self, table_name: str, df: pd.DataFrame,
chunk_size: int = 1000, replace: bool = False) -> int:
"""
跨平台数据插入
Args:
table_name (str): 表名
df (pd.DataFrame): 数据
chunk_size (int): 分批大小
replace (bool): 是否替换
Returns:
int: 插入行数
"""
if df.empty:
self.logger.warning("Empty DataFrame", table=table_name)
return 0
try:
method = 'replace' if replace else 'append'
total_rows = 0
with self.get_connection() as conn:
# 各平台不同的分批策略
if platform.system() == 'Windows':
chunk_size = min(chunk_size, 500) # Windows上减小批次
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i + chunk_size]
# macOS需要特殊处理datetime
if platform.system() == 'Darwin':
for col in chunk.select_dtypes(include=['datetime64']):
chunk[col] = chunk[col].dt.strftime('%Y-%m-%d %H:%M:%S')
chunk.to_sql(
table_name,
conn,
if_exists=method,
index=False,
method='multi'
)
total_rows += len(chunk)
method = 'append'
self.logger.info("Data inserted", table=table_name, rows=total_rows)
return total_rows
except Exception as e:
self.logger.error("Insert failed",
table=table_name,
error=str(e),
exc_info=True)
raise
def execute_sql(self, sql: str, params: Union[tuple, dict, None] = None,
fetch: bool = False) -> Union[int, List[Dict[str, Any]]]:
"""
跨平台SQL执行
Args:
sql (str): SQL语句
params (Union[tuple, dict, None]): 参数
fetch (bool): 是否获取结果
Returns:
Union[int, List[Dict[str, Any]]]: 结果
"""
conn = None
cursor = None
try:
conn = self.get_connection()
cursor = conn.cursor()
# Linux/macOS需要更长的执行时间
if platform.system() != 'Windows':
cursor.execute("SET SESSION max_execution_time=600000")
cursor.execute(sql, params)
if fetch:
result = cursor.fetchall()
self.logger.debug("Query executed", rows=len(result))
return result
else:
affected_rows = cursor.rowcount
self.logger.debug("Update executed", affected_rows=affected_rows)
return affected_rows
except Exception as e:
self.logger.error("SQL execution failed",
sql=sql,
params=params,
error=str(e),
exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def begin_transaction(self) -> pymysql.connections.Connection:
"""开始事务(跨平台兼容)"""
try:
conn = self.get_connection()
conn.autocommit(False)
# macOS需要特殊处理事务隔离级别
if platform.system() == 'Darwin':
conn.cursor().execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
self.logger.debug("Transaction started")
return conn
except Exception as e:
self.logger.error("Begin transaction failed", error=str(e))
raise
def commit_transaction(self, conn: pymysql.connections.Connection) -> None:
"""提交事务(跨平台兼容)"""
try:
conn.commit()
self.logger.debug("Transaction committed")
except Exception as e:
self.logger.error("Commit failed", error=str(e))
raise
finally:
conn.close()
def rollback_transaction(self, conn: pymysql.connections.Connection) -> None:
"""回滚事务(跨平台兼容)"""
try:
conn.rollback()
self.logger.warning("Transaction rolled back")
except Exception as e:
self.logger.error("Rollback failed", error=str(e))
finally:
conn.close()
def __del__(self):
"""析构函数(跨平台资源清理)"""
if hasattr(self, '_pool'):
try:
self._pool.close()
self.logger.info("Connection pool closed")
except Exception as e:
self.logger.error("Failed to close pool", error=str(e))
# 平台特定的默认配置
def get_default_config():
"""获取各平台默认配置"""
current_platform = platform.system()
base_config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '',
'database': 'intelligence_system',
'max_connections': 5
}
if current_platform == 'Windows':
return {
**base_config,
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30
}
elif current_platform == 'Darwin':
return {
**base_config,
'connect_timeout': 15,
'read_timeout': 60,
'write_timeout': 60,
'ssl': {'ca': '/usr/local/etc/openssl/cert.pem'}
}
else: # Linux和其他平台
return {
**base_config,
'connect_timeout': 15,
'read_timeout': 60,
'write_timeout': 60
}
# 使用示例
if __name__ == "__main__":
# 自动获取适合当前平台的配置
config = get_default_config()
# 初始化数据库连接
db = MySQLAgent(config)
# 测试查询
try:
df = db.query_to_df("SELECT VERSION() as version")
print(f"Database version: {df['version'].iloc[0]}")
print(f"Running on: {platform.system()} {platform.release()}")
except Exception as e:
print(f"Error: {str(e)}")
+4
View File
@@ -106,3 +106,7 @@
2025-08-06 17:25:57.519 | CRITICAL | mysql_agent:107 - Failed to create connection pool
2025-08-06 17:26:18.578 | INFO | mysql_agent:103 - Connection pool created
2025-08-06 17:28:09.242 | INFO | mysql_agent:103 - Connection pool created
2025-09-12 10:45:53.283 | INFO | minio_agent:83 - MinIO客户端创建成功
2025-09-12 10:45:53.291 | INFO | minio_agent:94 - 成功连接到MinIO服务:127.0.0.1:9005
2025-09-12 10:45:53.297 | INFO | minio_agent:112 - 存储桶创建成功:test-bucket-20250912104553
2025-09-12 10:45:53.302 | INFO | minio_agent:331 - 对象列表查询成功
+75 -32
View File
@@ -6,11 +6,6 @@ https://alidocs.dingtalk.com/i/nodes/NZQYprEoWoexdo1ohPdxXvDbJ1waOeDk?utm_scene=
### 程序框架
```angular2html
intelligence_system/
├── config/ # 系统配置中心
│ ├── __init__.py # 配置包初始化
│ ├── config.py # 配置加载与管理
│ └── constants.py # 系统常量定义
├── data_collection/ # 数据采集层
│ ├── spiders/ # 网络爬虫子系统
│ │ ├── weibo_spider.py # 黑猫爬虫
@@ -20,7 +15,7 @@ intelligence_system/
│ │
│ └── internal/ # 内部数据收集
│ ├── jian_dao_cloud.py # 简道云表单收集器
├── data_processing/ # 数据处理层
│ ├── structured/ # 结构化数据处理
│ │ ├── data_cleaner.py # 数据清洗(去重/标准化)
@@ -35,7 +30,7 @@ intelligence_system/
│ ├── nlp_processor.py # 自然语言处理引擎
│ ├── sentiment_analyzer.py # 情感分析模型
│ └── topic_modeler.py # LDA主题建模工具
├── services/ # 应用服务层
│ ├── monitoring/ # 舆情监控
│ │ ├── opinion_monitor.py # 实时舆情追踪
@@ -52,7 +47,7 @@ intelligence_system/
│ └── alert/ # 预警服务
│ ├── alert_trigger.py # 动态阈值告警
│ └── notification_center.py # 邮件/短信通知
├── system_management/ # 系统管理层
│ ├── scheduler/ # 任务调度
│ │ └── task_scheduler.py # 任务调度器
@@ -60,13 +55,14 @@ intelligence_system/
│ └── monitor/ # 系统监控
│ ├── health_monitor.py # 服务健康检测
│ └── performance_watcher.py # 资源占用监控
├── utils/ # 工具库
│ ├── file_handler.py # 通用文件操作
│ ├── logger.py # 日志系统
│ ├── mysql_agent.py # MySQL读写管理器
│ └── datetime_parser.py # 时间格式处理
├── config.py # 配置加载与管理
└── main.py # 系统入口(启动所有服务)
```
@@ -74,6 +70,7 @@ intelligence_system/
1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行
2. 配置需要在配置类中定义
3. 密钥等信息直接放在配置类中
4. 数据存储遵循"结构化存MySQL,非结构化存MinIO"原则,通过元数据关联
### 主程序设计
主程序需要一次启动,一直运行,启动时运行一次(在代码中可取消),之后每天定时生成一次报告
@@ -87,32 +84,78 @@ intelligence_system/
主程序应包含日报、周报等生成,根据时间定时生成报告,报告需要存储
### 日志设计
日志系统兼容多个平台,如win、mac和linux,日志需要保存为log文件,并且在日志大于20mb时自动压缩
日志系统兼容Windows、Mac、Linux平台,以`log`文件形式存储,超过20MB自动压缩。新增存储相关日志内容:
- MySQL操作:批量插入行数、表结构变更、事务状态
- MinIO操作:文件上传/下载状态、路径、大小、耗时
- 关联日志:MySQL记录与MinIO对象的绑定关系(如"ID:123 关联文件: collector/images/xxx.jpg"
- 异常日志:MySQL连接失败、MinIO上传超时、数据关联不一致等告警信息
### 数据库链接设计
数据存储放在数据库中,数据库类型为mysql,数据库名称为intelligence_system
### 存储系统设计(MinIO+MySQL
#### 核心存储分工
| 存储类型 | 适用数据 | 核心作用 |
|----------|----------|----------|
| MySQL | 结构化数据、元数据、关系型数据 | 存储业务逻辑数据、非结构化数据的索引信息、任务调度信息等 |
| MinIO | 非结构化数据 | 存储图片、视频、PDF文档、原始爬取文件等二进制/大文件数据 |
数据库表的命名规则与目录一致,数据采集类以collector_为开头,数据处理类以processor_为开
头,数据存储类以storage_为开头,应用层类以application_为开头
依次类推。
数据库链接为通用配置,要求数据采集或处理类等,可以直接调用封装好的数据库
链接,不必每次都重新写,
该链接包含表的增删改查功能,以及执行sql语句功能
#### 核心存储配置
1. **MySQL配置**
- 数据库名称:`intelligence_system`
- 连接管理:通过`utils/mysql_agent.py`封装线程安全的连接池,提供结构化数据的增删改查及SQL执行能力
- 适配特性:支持多平台(Windows/macOS/Linux)的超时配置和批处理优化
数据库结构:
1. collector_news_api:新闻api数据表
2. collector_complaint_spider:投诉数据表
3. processor_text_processor:文本处理数据表
4. processor_image_processor:图片处理数据表
5. main_task 任务调度表
6. application_reporter_daily:日报数据表
7. application_reporter_monthly:周报数据
2. **MinIO配置**
- 存储桶命名规则:按数据类型划分,如`collector-images`(采集层图片)、`processor-videos`(处理层视频)
- 连接管理:通过`utils/minio_agent.py`封装客户端,提供对象上传、下载、删除、查询URL等能力
- 路径规则:`{数据层}/{来源}/{时间戳}_{唯一ID}.{后缀}`(例:`collector/weibo_spider/20240520_12345.jpg`
#### 表命名规则(扩展)
- 数据采集类:以`collector_`为前缀(存储采集到的结构化数据及MinIO对象元数据
- 数据处理类:以`processor_`为前缀(存储处理结果的结构化数据及MinIO处理后对象的元数据)
- 数据存储类:以`storage_`为前缀(存储MinIO对象的索引信息,如哈希、大小、访问权限等)
- 应用层类:以`application_`为前缀(对应业务应用数据)
- 系统类:如任务调度表等采用功能命名(如`main_task`
#### 核心表结构
1. `collector_news_api`:新闻API采集数据表(存储新闻标题、内容等结构化数据)
2. `collector_complaint_spider`:投诉信息爬虫数据表(含投诉文本、附件MinIO路径`attachment_minio_path`等)
3. `collector_image_source`:采集层图片元数据表(存储图片URL、MinIO路径、格式、大小等)
4. `processor_text_processor`:文本处理结果表(存储NLP分析结果、关联原文ID等)
5. `processor_image_processor`:图片处理结果表(存储识别标签、特征向量、处理后图片MinIO路径`result_minio_path`等)
6. `storage_object_index`:MinIO对象索引表(存储所有对象的MinIO路径、哈希值、创建时间、过期时间等)
7. `main_task`:任务调度表(存储任务名称、路径、执行频率、上次/下次执行时间等)
8. `application_reporter_daily`:日报数据表(存储日报结构化内容、报表文件MinIO路径等)
9. `application_reporter_monthly`:月报数据表(存储月报结构化内容、报表文件MinIO路径等)
#### 数据交互特性
1. **MySQL交互**
- 支持DataFrame直接读写,提供分块处理(`chunksize`)和批量插入能力
- 自动适配平台特性(如Windows小批次写入优化)
- 完善的事务机制确保结构化数据一致性
2. **MinIO交互**
- 支持大文件分片上传、断点续传
3. **联动机制**
- 非结构化数据存储时,先上传至MinIO获取路径,再将路径及元数据写入MySQL
- 读取非结构化数据时,先从MySQL获取MinIO路径,再通过路径从MinIO下载
- 日志同步记录MySQL操作和MinIO对象操作(如"上传文件至MinIO: {path},关联MySQL记录ID: {id}"
### 数据采集设计
每一个数据采集均为独立python文件,里面执行主程序均为main,以方便调度
每一个数据采集均会根据规则创建数据库表,数据处理类以processor_为开头,(或者统一维护到一个表中,按来源去区分)
1. 结构化数据(如新闻文本、投诉内容):直接写入对应`collector_`前缀表
2. 非结构化数据(如爬取的图片、附件):
- 调用`minio_agent.py`上传至对应存储桶
- 将MinIO路径、文件大小、格式等元数据写入`collector_`前缀表或`storage_object_index`
3. 每个采集模块(独立py文件,`main`方法入口)需同时处理MySQL和MinIO交互,确保数据关联完整
### 数据处理
从多个数据库库表中获取数据,对数据进行处理,处理完成后将结果保存到数据库中,处理结果可能存储在多个表中
数据处理数据库表以processor_为开头
### 数据处理设计
1. 结构化数据处理:从MySQL读取原始数据,处理后写入`processor_`前缀表
2. 非结构化数据处理:
- 从MySQL获取MinIO路径,通过`minio_agent.py`下载原始文件
- 处理后(如图片识别、视频帧提取)将结果文件上传至MinIO(处理层存储桶)
- 将处理结果的结构化信息(如识别标签)和处理后文件的MinIO路径写入`processor_`前缀表
3. 支持多表关联存储,通过`source_id`关联原始数据与处理结果
@@ -6,7 +6,7 @@ import croniter
import pytz
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
from storage.mysql_agent import MySQLAgent
from utils.mysql_agent import MySQLAgent
from utils.logger import CrossPlatformLog
# 初始化调度器日志
+171
View File
@@ -0,0 +1,171 @@
import unittest
import os
import tempfile
import hashlib
from datetime import datetime
from utils.minio_agent import MinIOAgent # 导入之前的MinIO操作类
class TestMinIOAgent(unittest.TestCase):
# 测试配置 - 本地MinIO社区版
MINIO_CONFIG = {
'endpoint': '127.0.0.1:9005',
'access_key': 'admin', # 默认账号
'secret_key': 'abc88888888', # 默认密码
'secure': False # 社区版默认不启用SSL
}
@classmethod
def setUpClass(cls):
"""初始化测试环境"""
# 创建唯一测试桶(避免冲突)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
cls.test_bucket = f"test-bucket-{timestamp}"
cls.test_object = "test-data/sample.txt"
cls.test_content = b"this is MinIO test data: 1234567890"
# 初始化客户端
cls.minio_agent = MinIOAgent(cls.MINIO_CONFIG)
# 确保测试桶存在
cls.minio_agent.create_bucket(cls.test_bucket)
@classmethod
def tearDownClass(cls):
"""清理测试环境"""
try:
# 列出并删除桶内所有对象
objects = cls.minio_agent.list_objects(cls.test_bucket)
for obj in objects:
cls.minio_agent.delete_object(cls.test_bucket, obj['object_name'])
# 删除测试桶(MinIO要求桶为空才能删除)
cls.minio_agent._client.remove_bucket(cls.test_bucket)
print(f"\n测试清理完成,已删除桶: {cls.test_bucket}")
except Exception as e:
print(f"清理测试环境失败: {str(e)}")
def test_01_create_bucket(self):
"""测试创建存储桶"""
new_bucket = f"temp-bucket-{datetime.now().microsecond}"
result = self.minio_agent.create_bucket(new_bucket)
self.assertTrue(result, "存储桶创建失败")
# 验证桶是否存在
exists = self.minio_agent._client.bucket_exists(new_bucket)
self.assertTrue(exists, "存储桶创建后未检测到存在")
# 清理临时桶
self.minio_agent._client.remove_bucket(new_bucket)
def test_02_upload_download(self):
"""测试上传与下载功能"""
# 上传数据
upload_meta = self.minio_agent.upload_bytes(
bucket=self.test_bucket,
object_name=self.test_object,
data=self.test_content
)
# 验证上传结果
self.assertEqual(upload_meta['size'], len(self.test_content), "上传数据大小不匹配")
self.assertEqual(upload_meta['local_hash'], hashlib.md5(self.test_content).hexdigest(), "本地哈希校验失败")
# 下载数据到临时文件
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_path = temp_file.name
download_meta = self.minio_agent.download_file(
bucket=self.test_bucket,
object_name=self.test_object,
local_path=temp_path
)
# 验证下载内容
with open(temp_path, 'rb') as f:
downloaded_content = f.read()
self.assertEqual(downloaded_content, self.test_content, "下载数据与原始数据不匹配")
self.assertEqual(download_meta['size'], len(self.test_content), "下载文件大小不匹配")
# 清理临时文件
os.unlink(temp_path)
def test_03_presigned_url(self):
"""测试生成预签名URL"""
# 先上传测试文件
self.minio_agent.upload_bytes(
self.test_bucket,
self.test_object,
self.test_content
)
# 生成URL(有效期30秒)
url_info = self.minio_agent.get_presigned_url(
bucket=self.test_bucket,
object_name=self.test_object,
expires=30
)
# 验证URL格式
self.assertIn("http://127.0.0.1:9005", url_info['presigned_url'], "预签名URL格式不正确")
self.assertEqual(url_info['expires_in'], 30, "过期时间设置不正确")
def test_04_list_objects(self):
"""测试列出对象功能"""
# 上传多个测试对象
test_objects = [
"test-folder/file1.txt",
"test-folder/file2.csv",
"another-folder/image.jpg"
]
for obj in test_objects:
self.minio_agent.upload_bytes(
self.test_bucket,
obj,
b"tese_list_obj"
)
# 列出所有对象
all_objects = self.minio_agent.list_objects(self.test_bucket)
self.assertEqual(len(all_objects), len(test_objects) + 1, "列出对象数量不匹配") # +1是之前的test_object
# 按前缀筛选
filtered_objects = self.minio_agent.list_objects(
self.test_bucket,
prefix="test-folder/"
)
self.assertEqual(len(filtered_objects), 2, "按前缀筛选结果不正确")
def test_05_delete_object(self):
"""测试删除对象功能"""
# 创建测试对象
delete_obj = "to-delete/temp.txt"
self.minio_agent.upload_bytes(
self.test_bucket,
delete_obj,
b"will be delete"
)
# 执行删除
result = self.minio_agent.delete_object(self.test_bucket, delete_obj)
self.assertTrue(result, "删除对象失败")
# 验证删除
objects = self.minio_agent.list_objects(self.test_bucket, prefix="to-delete/")
self.assertEqual(len(objects), 0, "对象删除后仍存在")
def test_06_upload_empty_data(self):
"""测试上传空数据的异常处理"""
with self.assertRaises(ValueError, msg="未捕获空数据上传异常"):
self.minio_agent.upload_bytes(
self.test_bucket,
"empty.txt",
b""
)
if __name__ == "__main__":
# 执行测试并显示详细结果
unittest.main(verbosity=2)
+1
View File
@@ -0,0 +1 @@
DROP DATABASE test
+383
View File
@@ -0,0 +1,383 @@
import os
import sys
import platform
import threading
from typing import List, Dict, Optional, BinaryIO, Tuple, Any
from datetime import datetime, timedelta
import hashlib
from io import BytesIO
from minio import Minio
from minio.error import S3Error, MinioException
from utils.logger import log
class MinIOAgent:
"""
全平台兼容的MinIO对象存储操作类
支持Windows/macOS/Linux系统,提供对象存储的上传、下载、查询等功能
专注于二进制数据处理,返回元数据用于与MySQL关联
"""
_instance = None # 单例模式实例
_lock = threading.Lock() # 线程锁,保证单例线程安全
def __new__(cls, *args, **kwargs):
"""单例模式实现,确保全局只有一个实例"""
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, config: dict):
"""
初始化MinIO连接
参数:
config (dict): MinIO配置字典,包含以下键:
- endpoint: 服务端点(例:'localhost:9000'
- access_key: 访问密钥
- secret_key: 密钥
- [可选] secure: 是否使用SSL(默认False
- [可选] region: 区域
- [可选] timeout: 超时时间(秒,默认30)
"""
# 避免重复初始化
if hasattr(self, '_client') and self._client:
return
# 验证必要配置参数
required_keys = ['endpoint', 'access_key', 'secret_key']
if not all(key in config for key in required_keys):
raise ValueError(f"MinIO配置缺少必要参数,需要: {required_keys}")
# 整合配置,设置默认值
self.config = {
'endpoint': config['endpoint'],
'access_key': config['access_key'],
'secret_key': config['secret_key'],
'secure': config.get('secure', False),
'region': config.get('region'),
'timeout': config.get('timeout', 30)
}
# 初始化日志,绑定当前平台信息
current_platform = platform.system()
self.log = log.bind(module=f"MinIOAgent({current_platform})")
# 创建客户端实例
self._client = self._create_client()
# 验证连接是否有效
self._verify_connection()
def _create_client(self) -> Minio:
"""创建MinIO客户端实例"""
try:
client = Minio(
endpoint=self.config['endpoint'],
access_key=self.config['access_key'],
secret_key=self.config['secret_key'],
secure=self.config['secure'],
region=self.config['region']
)
self.log.info("MinIO客户端创建成功")
return client
except Exception as e:
self.log.critical("创建MinIO客户端失败", 错误=str(e), exc_info=True)
raise
def _verify_connection(self) -> None:
"""验证与MinIO服务的连接是否正常"""
try:
# 通过列出存储桶来验证连接
self._client.list_buckets()
self.log.info(f"成功连接到MinIO服务:{self.config['endpoint']}")
except Exception as e:
self.log.critical("连接验证失败", 错误=str(e), exc_info=True)
raise
def create_bucket(self, bucket_name: str) -> bool:
"""
创建存储桶(如不存在)
参数:
bucket_name: 存储桶名称
返回:
是否成功创建(或已存在)
"""
try:
if not self._client.bucket_exists(bucket_name):
self._client.make_bucket(bucket_name)
self.log.info(f"存储桶创建成功:{bucket_name}")
return True
self.log.debug(f"存储桶已存在:{bucket_name}")
return True
except MinioException as e:
self.log.error(f"创建存储桶 {bucket_name} 失败", 错误=str(e), exc_info=True)
return False
def upload_bytes(self, bucket: str, object_name: str, data: bytes) -> Dict[str, Any]:
"""
上传二进制数据至MinIO
参数:
bucket: 存储桶名称
object_name: 对象名称(路径)
data: 二进制数据
返回:
包含元数据的字典:
- bucket: 存储桶名称
- object_name: 对象路径
- size: 数据大小(字节)
- etag: 服务器生成的哈希值
- content_type: 内容类型
- upload_time: 上传时间(UTC)
- local_hash: 本地计算的MD5哈希
"""
if not data:
raise ValueError("上传数据不能为空")
# 确保存储桶存在
self.create_bucket(bucket)
try:
# 计算本地哈希(用于数据完整性校验)
local_hash = hashlib.md5(data).hexdigest()
# 上传数据
result = self._client.put_object(
bucket_name=bucket,
object_name=object_name,
data=BytesIO(data),
length=len(data),
content_type=self._guess_content_type(object_name)
)
# 构建元数据
metadata = {
'bucket': bucket,
'object_name': object_name,
'size': len(data),
'etag': result.etag,
'content_type': result.content_type,
'upload_time': datetime.utcfromtimestamp(result.last_modified.timestamp()),
'local_hash': local_hash
}
self.log.info(
"文件上传成功",
存储桶=bucket,
对象名称=object_name,
大小=len(data)
)
return metadata
except MinioException as e:
self.log.error(
"文件上传失败",
存储桶=bucket,
对象名称=object_name,
错误=str(e),
exc_info=True
)
raise
def download_file(self, bucket: str, object_name: str, local_path: str) -> Dict[str, Any]:
"""
从MinIO下载文件至本地
参数:
bucket: 存储桶名称
object_name: 对象名称(路径)
local_path: 本地保存路径
返回:
包含下载信息的字典:
- local_path: 本地路径
- size: 文件大小
- download_time: 下载时间
"""
try:
# 创建父目录(如果不存在)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
start_time = datetime.now()
self._client.fget_object(bucket, object_name, local_path)
download_time = datetime.now() - start_time
# 获取文件信息
stat = os.stat(local_path)
result = {
'local_path': local_path,
'size': stat.st_size,
'download_time': download_time.total_seconds(),
'downloaded_at': datetime.now()
}
self.log.info(
"文件下载成功",
存储桶=bucket,
对象名称=object_name,
本地路径=local_path,
大小=stat.st_size
)
return result
except MinioException as e:
self.log.error(
"文件下载失败",
存储桶=bucket,
对象名称=object_name,
错误=str(e),
exc_info=True
)
raise
except IOError as e:
self.log.error(
"本地文件操作失败",
本地路径=local_path,
错误=str(e),
exc_info=True
)
raise
def get_presigned_url(self, bucket: str, object_name: str, expires: int = 3600) -> Dict[str, str]:
"""
生成临时访问URL
参数:
bucket: 存储桶名称
object_name: 对象名称(路径)
expires: 过期时间(秒),默认3600秒
返回:
包含URL和过期信息的字典
"""
try:
url = self._client.presigned_get_object(
bucket_name=bucket,
object_name=object_name,
expires=expires
)
result = {
'presigned_url': url,
'expires_in': expires,
'expires_at': datetime.now() + timedelta(seconds=expires),
'bucket': bucket,
'object_name': object_name
}
self.log.debug(
"预签名URL生成成功",
存储桶=bucket,
对象名称=object_name,
过期时间=expires
)
return result
except MinioException as e:
self.log.error(
"生成预签名URL失败",
存储桶=bucket,
对象名称=object_name,
错误=str(e),
exc_info=True
)
raise
def list_objects(self, bucket: str, prefix: str = "") -> List[Dict[str, Any]]:
"""
查询指定前缀的对象列表及元数据
参数:
bucket: 存储桶名称
prefix: 对象路径前缀
返回:
对象信息列表,每个对象包含:
- bucket: 存储桶
- object_name: 对象名称
- size: 大小
- last_modified: 最后修改时间
- etag: 哈希值
- content_type: 内容类型
"""
try:
objects = self._client.list_objects(
bucket_name=bucket,
prefix=prefix,
recursive=True
)
result = []
for obj in objects:
# 获取详细元数据
stat = self._client.stat_object(bucket, obj.object_name)
result.append({
'bucket': bucket,
'object_name': obj.object_name,
'size': obj.size,
'last_modified': obj.last_modified,
'etag': stat.etag,
'content_type': stat.content_type
})
self.log.info(
"对象列表查询成功",
存储桶=bucket,
前缀=prefix,
数量=len(result)
)
return result
except MinioException as e:
self.log.error(
"查询对象列表失败",
存储桶=bucket,
前缀=prefix,
错误=str(e),
exc_info=True
)
raise
def delete_object(self, bucket: str, object_name: str) -> bool:
"""
删除指定对象
参数:
bucket: 存储桶名称
object_name: 对象名称(路径)
返回:
是否删除成功
"""
try:
self._client.remove_object(bucket, object_name)
self.log.info(
"对象删除成功",
存储桶=bucket,
对象名称=object_name
)
return True
except MinioException as e:
self.log.error(
"删除对象失败",
存储桶=bucket,
对象名称=object_name,
错误=str(e),
exc_info=True
)
return False
@staticmethod
def _guess_content_type(object_name: str) -> str:
"""根据文件名猜测内容类型"""
from mimetypes import guess_type
mime_type, _ = guess_type(object_name)
return mime_type or 'application/octet-stream' # 默认二进制流类型