Compare commits

..

6 Commits

Author SHA1 Message Date
panda c894e344aa test 2025-10-29 10:45:08 +08:00
panda 5d1155bd20 test 2025-10-29 10:44:53 +08:00
panda fc18fa74c3 娣诲姞RSS鏁版嵁澶勭悊鍣ㄥ拰浠诲姟璋冨害鍔熻兘锛屾洿鏂伴厤缃拰鏃ュ織鏂囦欢 2025-10-29 10:39:13 +08:00
panda c5f6e8288d ai提取rss相关数据 2025-10-28 13:43:06 +08:00
panda e1db06dd79 rss订阅数据爬取及数据处理 2025-10-23 17:18:49 +08:00
panda fd67231866 优化任务调度说明 2025-10-17 17:59:28 +08:00
41 changed files with 141581 additions and 130957 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
+28 -1
View File
@@ -1,11 +1,23 @@
import os
class Config:
MYSQL_CONFIG = {
'host': '123.60.167.249',
'port': 3306,
'user': 'intelligence',
'password': '123123',
'database': "intelligence_system",
'max_connections': 10
}
OFFLINE_MYSQL_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '123123',
'database':"intelligence_system",
'database': "intelligence_system",
'max_connections': 10
}
@@ -15,3 +27,18 @@ class Config:
'secret_key': 'abc88888888',
'secure': False # 社区版默认不启用SSL
}
# 百度AI API配置(千帆平台)
# 优先从环境变量读取,如果没有则使用默认值(需要用户自行配置)
BAIDU_AI_CONFIG = {
'api_key': os.getenv('BAIDU_API_KEY', 'bce-v3/ALTAK-SFA4vEP3uBYLsyqCZcERg/1f43596d40d9a2c8318b13d5888a5e8e4e7a7f30'), # 百度千帆API Key
'model': 'ernie-x1-turbo-32k', # 使用的模型
}
# AI处理器配置
AI_PROCESSOR_CONFIG = {
'batch_size': 10, # 批量处理的默认大小
'delay': 1.5, # 每条记录之间的延迟(秒),避免API限流
'source_table': 'processed_rss_data', # 源数据表
'result_table': 'ai_processor_rss_analysis', # AI分析结果表
}
+2
View File
@@ -0,0 +1,2 @@
## 开发进度
###
+133613 -130378
View File
File diff suppressed because it is too large Load Diff
+5804
View File
File diff suppressed because it is too large Load Diff
+65 -5
View File
@@ -11,11 +11,17 @@ log = CrossPlatformLog.get_logger("Main")
class IntelligenceSystem:
def __init__(self, db_config=None):
"""初始化系统(仅作为容器,不包含业务逻辑)"""
def __init__(self, db_config=None, run_all_on_startup=False):
"""初始化系统(仅作为容器,不包含业务逻辑)
Args:
db_config: 数据库配置
run_all_on_startup: 启动时是否立即执行所有到期任务(默认False)
"""
self.scheduler = TaskScheduler(Config.MYSQL_CONFIG, max_workers=5)
self._running = False
log.info("情报系统已初始化(Cron模式)")
self.run_all_on_startup = run_all_on_startup
log.info(f"情报系统已初始化(Cron模式),启动时执行任务: {run_all_on_startup}")
def start(self):
"""启动系统主入口"""
@@ -23,11 +29,40 @@ class IntelligenceSystem:
self._setup_signal_handlers()
log.info("系统启动 - 运行在Cron调度模式")
# 启动时执行所有到期任务(如果开关开启)
if self.run_all_on_startup:
print(f"\n{'='*60}")
print("🚀 启动时执行所有到期任务...")
print(f"{'='*60}\n")
log.info("启动时执行所有到期任务")
result = self.scheduler.check_and_run_tasks(print_empty_status=True)
print(f"\n启动任务执行完成: 总数={result['总任务数']}, 成功={result['成功']}, 失败={result['失败']}\n")
# 时间追踪变量
last_status_print_time = time.time() # 上次打印状态的时间
last_hourly_report_time = time.time() # 上次小时统计的时间
status_print_interval = 60 # 每分钟打印一次状态(60秒)
hourly_report_interval = 3600 # 每小时统计一次(3600秒)
try:
# 主循环 - 仅负责定期检查任务
while self._running:
current_time = time.time()
# 判断是否需要打印状态(每分钟一次)
should_print_status = (current_time - last_status_print_time) >= status_print_interval
# 检查并执行到期任务
self.scheduler.check_and_run_tasks()
self.scheduler.check_and_run_tasks(print_empty_status=should_print_status)
# 更新最后打印时间
if should_print_status:
last_status_print_time = current_time
# 检查是否需要进行小时统计(每小时一次)
if (current_time - last_hourly_report_time) >= hourly_report_interval:
self._print_hourly_stats()
last_hourly_report_time = current_time
# 短间隔轮询(每10秒检查一次,保证Cron时间精度)
time.sleep(10)
@@ -48,6 +83,29 @@ class IntelligenceSystem:
log.info(f"收到关闭信号 {signum},开始关闭系统")
self._running = False
def _print_hourly_stats(self):
"""打印并重置小时统计信息"""
stats = self.scheduler.get_and_reset_hourly_stats()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'='*60}")
print(f"📊 小时任务统计报告 - {now}")
print(f"{'='*60}")
print(f" 总任务数: {stats['总数']}")
print(f" 成功: {stats['成功']}")
print(f" 失败: {stats['失败']}")
if stats['总数'] > 0:
success_rate = (stats['成功'] / stats['总数']) * 100
print(f" 成功率: {success_rate:.1f}%")
print(f"{'='*60}\n")
log.info(
"小时任务统计",
总任务数=stats['总数'],
成功=stats['成功'],
失败=stats['失败']
)
def shutdown(self):
"""优雅关闭系统"""
log.info("开始优雅关闭系统")
@@ -67,7 +125,9 @@ class IntelligenceSystem:
if __name__ == "__main__":
try:
# 启动系统 - 仅作为入口,不包含调度逻辑
system = IntelligenceSystem()
# run_all_on_startup=True: 启动时立即执行所有到期任务
# run_all_on_startup=False: 启动时不执行任务,等待下次调度周期
system = IntelligenceSystem(run_all_on_startup=False)
system.start()
except Exception as e:
log.critical("情报系统启动失败", exc_info=True)
Binary file not shown.
@@ -0,0 +1,453 @@
# RSS数据AI处理模块
import os
import sys
import json
import time
import pandas as pd
from typing import List, Dict, Any, Optional
from datetime import datetime
from openai import OpenAI
# 添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from utils.mysql_agent import MySQLAgent
from utils.logger import log
from config import Config
class RSSDataAIProcessor:
"""RSS数据AI处理主类
负责:
- 从数据库加载未处理的RSS数据
- 调用AI进行分析
- 保存分析结果
- 更新处理状态
"""
def __init__(self):
"""初始化AI处理器"""
self.log = log.bind(module="RSSDataAIProcessor")
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
# 从Config读取配置
self.source_table = Config.AI_PROCESSOR_CONFIG['source_table']
self.ai_table = Config.AI_PROCESSOR_CONFIG['result_table']
self.default_batch_size = Config.AI_PROCESSOR_CONFIG['batch_size']
self.default_delay = Config.AI_PROCESSOR_CONFIG['delay']
# 初始化百度千帆API客户端
self.api_key = Config.BAIDU_AI_CONFIG.get('api_key')
if self.api_key:
self.ai_client = OpenAI(
base_url='https://qianfan.baidubce.com/v2',
api_key=self.api_key
)
self.model = Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k')
self.log.info("RSS数据AI处理器初始化完成")
else:
self.ai_client = None
self.log.warning("百度AI未配置,AI处理功能将不可用")
self.log.warning("请在config.py中配置 BAIDU_AI_CONFIG['api_key']")
def is_configured(self) -> bool:
"""检查是否已配置API"""
return self.ai_client is not None
def main(self, batch_size: Optional[int] = 200, delay: Optional[float] = None) -> Dict[str, Any]:
"""主程序:批量处理RSS数据的完整流程
Args:
batch_size: 批量处理的记录数,None则使用配置的默认值
delay: 每条记录之间的延迟(秒),None则使用配置的默认值
Returns:
dict: 处理结果统计信息
"""
# 使用传入参数或默认配置
batch_size = batch_size or self.default_batch_size
delay = delay or self.default_delay
try:
# 1. 检查配置
if not self.is_configured():
error_msg = "百度AI未配置,请在config.py中配置 BAIDU_AI_CONFIG['api_key']"
self.log.error(error_msg)
return {
'success': False,
'message': error_msg,
'processed_count': 0,
'failed_count': 0
}
self.log.info(f"开始批量处理数据,批次大小: {batch_size}, 延迟: {delay}")
# 2. 准备数据库表结构
self.ensure_ai_processed_column()
if not self.db_agent.table_exists(self.ai_table):
self.create_ai_result_table()
# 3. 加载未处理的数据
df = self.load_unprocessed_data(batch_size)
if df.empty:
self.log.info("没有需要处理的数据")
return {
'success': True,
'message': '没有需要处理的数据',
'processed_count': 0,
'failed_count': 0
}
# 4. 处理每条记录
results = []
processed_ids = []
failed_count = 0
for idx, record in df.iterrows():
try:
self.log.debug(f"处理记录 {record['id']} ({idx + 1}/{len(df)})")
result = self.process_single_record(record.to_dict())
if result:
results.append(result)
processed_ids.append(record['id'])
else:
failed_count += 1
# 延迟,避免API限流
if delay > 0 and idx < len(df) - 1:
time.sleep(delay)
except Exception as e:
self.log.error(f"处理记录 {record['id']} 异常: {str(e)}", exc_info=True)
failed_count += 1
# 5. 保存结果
saved_count = 0
if results:
saved_count = self.save_ai_results(results)
# 6. 标记为已处理
if processed_ids:
self.mark_as_processed(processed_ids)
# 7. 返回统计信息
stats = {
'success': True,
'message': 'AI处理完成',
'total_count': len(df),
'processed_count': len(processed_ids),
'saved_count': saved_count,
'failed_count': failed_count,
'relevant_count': sum(1 for r in results if r.get('是否相关')),
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.log.info("批量处理完成", **stats)
return stats
except Exception as e:
error_msg = f"批量处理失败: {str(e)}"
self.log.error(error_msg, exc_info=True)
return {
'success': False,
'message': error_msg,
'processed_count': 0,
'failed_count': 0
}
def ensure_ai_processed_column(self):
"""确保processed_rss_data表有"是否ai处理"字段"""
try:
# 检查字段是否存在
check_sql = """
SELECT COUNT(*) as count
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
AND COLUMN_NAME = '是否ai处理'
"""
result = self.db_agent.execute_sql(
check_sql,
params=(Config.MYSQL_CONFIG['database'], self.source_table),
fetch=True
)
if result[0][0] == 0:
# 字段不存在,添加字段
alter_sql = f"""
ALTER TABLE {self.source_table}
ADD COLUMN 是否ai处理 TINYINT(1) DEFAULT 0 COMMENT 'AI处理标记:0-未处理,1-已处理'
"""
self.db_agent.execute_sql(alter_sql)
self.log.info(f"成功为表 {self.source_table} 添加 '是否ai处理' 字段")
else:
self.log.debug(f"{self.source_table} 已存在 '是否ai处理' 字段")
except Exception as e:
self.log.error(f"检查/添加字段失败: {str(e)}", exc_info=True)
raise
def create_ai_result_table(self):
"""创建AI处理结果表"""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.ai_table} (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
source_id INT NOT NULL COMMENT '来源数据IDprocessed_rss_data.id',
文章标题 TEXT COMMENT '文章标题',
文章摘要 TEXT COMMENT '文章摘要',
发布时间 DATETIME COMMENT '发布时间',
来源URL VARCHAR(1024) COMMENT '来源URL',
文章链接 VARCHAR(1024) COMMENT '文章链接',
是否相关 BOOLEAN COMMENT 'AI判断是否与汽车后市场相关',
相关度评分 INT COMMENT '相关度评分(0-100',
标签 TEXT COMMENT 'AI生成的标签(JSON数组)',
分类 VARCHAR(100) COMMENT 'AI判断的主要分类',
分析说明 TEXT COMMENT 'AI分析说明',
处理时间 DATETIME COMMENT 'AI处理时间',
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
INDEX idx_source_id (source_id),
INDEX idx_是否相关 (是否相关),
INDEX idx_分类 (分类),
INDEX idx_处理时间 (处理时间)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='RSS数据AI分析结果表'
"""
try:
self.db_agent.execute_sql(create_sql)
self.log.info(f"成功创建AI结果表: {self.ai_table}")
except Exception as e:
self.log.error(f"创建AI结果表失败: {str(e)}", exc_info=True)
raise
def load_unprocessed_data(self, limit: int = 100) -> pd.DataFrame:
"""加载未经AI处理的数据
Args:
limit: 每次处理的记录数量
Returns:
未处理的数据DataFrame
"""
try:
sql = f"""
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
FROM {self.source_table}
WHERE 是否ai处理 = 0 OR 是否ai处理 IS NULL
ORDER BY 创建时间 DESC
LIMIT %s
"""
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
self.log.info(f"成功加载 {len(df)} 条未处理的数据")
return df
except Exception as e:
self.log.error(f"加载未处理数据失败: {str(e)}", exc_info=True)
return pd.DataFrame()
def analyze_news(self, title: str, summary: str) -> Dict[str, Any]:
"""调用AI分析新闻(保留原有提示词)"""
# 构建提示词(保留原有格式)
prompt = f"""分析以下新闻是否与汽车后市场相关,返回JSON格式:
标题:{title}
摘要:{summary}
返回格式:
{{
"is_relevant": true/false,
"relevance_score": 0-100,
"tags": ["标签1", "标签2"],
"category": "分类(配件/维修/保养/改装/美容/装饰/二手车/金融/保险/其他)",
"analysis": "简要说明"
}}
注意:只返回JSON格式的结果,不要包含其他说明文字。"""
try:
# 调用百度千帆API
response = self.ai_client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": prompt
}]
)
# 获取响应内容
raw_content = response.choices[0].message.content
# 解析JSON(处理markdown包裹)
if '```json' in raw_content:
json_str = raw_content.split('```json')[1].split('```')[0].strip()
elif '```' in raw_content:
json_str = raw_content.split('```')[1].split('```')[0].strip()
else:
json_str = raw_content.strip()
result = json.loads(json_str)
# 补充缺失字段
return {
'is_relevant': result.get('is_relevant', False),
'relevance_score': result.get('relevance_score', 0),
'tags': result.get('tags', []),
'category': result.get('category', '其他'),
'analysis': result.get('analysis', '')
}
except json.JSONDecodeError as e:
self.log.warning(f"JSON解析失败: {str(e)}, 原始响应: {raw_content[:200]}")
return {
'is_relevant': False,
'relevance_score': 0,
'tags': [],
'category': '其他',
'analysis': f"解析失败: {raw_content[:100]}"
}
except Exception as e:
self.log.error(f"AI调用异常: {str(e)}", exc_info=True)
return {
'is_relevant': False,
'relevance_score': 0,
'tags': [],
'category': '其他',
'analysis': f"处理异常: {str(e)}"
}
def process_single_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理单条记录
Args:
record: 记录字典
Returns:
处理结果字典
"""
if not self.is_configured():
self.log.error("AI客户端未配置,无法处理数据")
return None
try:
title = str(record.get('文章标题', '')).strip()
summary = str(record.get('文章摘要', '')).strip()
if not title and not summary:
self.log.warning(f"记录 {record.get('id')} 标题和摘要均为空,跳过处理")
return None
# 调用AI分析
analysis_result = self.analyze_news(title, summary)
# 构建结果记录
result = {
'source_id': record['id'],
'文章标题': title,
'文章摘要': summary,
'发布时间': record.get('发布时间'),
'来源URL': record.get('来源URL'),
'文章链接': record.get('文章链接'),
'是否相关': analysis_result.get('is_relevant', False),
'相关度评分': analysis_result.get('relevance_score', 0),
'标签': json.dumps(analysis_result.get('tags', []), ensure_ascii=False),
'分类': analysis_result.get('category', '其他'),
'分析说明': analysis_result.get('analysis', ''),
'处理时间': datetime.now()
}
return result
except Exception as e:
self.log.error(f"处理记录 {record.get('id')} 失败: {str(e)}", exc_info=True)
return None
def save_ai_results(self, results: List[Dict[str, Any]]) -> int:
"""保存AI处理结果
Args:
results: 处理结果列表
Returns:
成功保存的记录数
"""
if not results:
return 0
try:
df = pd.DataFrame(results)
inserted = self.db_agent.insert_from_df(
table_name=self.ai_table,
df=df,
ignore_duplicates=True
)
self.log.info(f"成功保存 {inserted} 条AI处理结果")
return inserted
except Exception as e:
self.log.error(f"保存AI处理结果失败: {str(e)}", exc_info=True)
return 0
def mark_as_processed(self, ids: List[int]) -> bool:
"""标记记录为已处理
Args:
ids: 记录ID列表
Returns:
是否成功
"""
if not ids:
return True
try:
id_placeholders = ','.join(['%s'] * len(ids))
sql = f"""
UPDATE {self.source_table}
SET 是否ai处理 = 1
WHERE id IN ({id_placeholders})
"""
self.db_agent.execute_sql(sql, params=ids)
self.log.info(f"成功标记 {len(ids)} 条记录为已处理")
return True
except Exception as e:
self.log.error(f"标记记录为已处理失败: {str(e)}", exc_info=True)
return False
if __name__ == "__main__":
"""命令行直接运行"""
# 实例化处理器并调用main方法
processor = RSSDataAIProcessor()
result = processor.main()
# 输出结果
if result['success']:
print("\n" + "=" * 60)
print("✓ AI处理完成")
print("=" * 60)
print(f"总记录数: {result.get('total_count', 0)}")
print(f"成功处理: {result.get('processed_count', 0)}")
print(f"保存记录: {result.get('saved_count', 0)}")
print(f"失败记录: {result.get('failed_count', 0)}")
print(f"相关记录: {result.get('relevant_count', 0)}")
print(f"处理时间: {result.get('processing_time', '')}")
print("=" * 60 + "\n")
else:
print("\n" + "=" * 60)
print("✗ 处理失败")
print("=" * 60)
print(f"错误信息: {result['message']}")
print("\n提示: 请设置环境变量")
print(" Windows: $env:BAIDU_API_KEY = 'your_key'")
print(" Linux/Mac: export BAIDU_API_KEY='your_key'")
print("=" * 60 + "\n")
View File
+37
View File
@@ -0,0 +1,37 @@
汽车配件
汽车维修
汽车保养
汽车改装
汽车美容
汽车装饰
轮胎
机油
刹车片
火花塞
滤清器
蓄电池
车灯
保险杠
车门
座椅
方向盘
仪表盘
音响
导航
汽车用品
车载设备
汽车电子
汽车安全
汽车保险
二手车
汽车交易
汽车金融
汽车租赁
汽车服务
4S店
汽修店
汽车后市场
汽车产业链
汽车供应链
汽车
+409
View File
@@ -0,0 +1,409 @@
# RSS数据处理模块 - 汽车后市场新闻分词和过滤
import pandas as pd
import jieba
import jieba.posseg as pseg
import os
import sys
from typing import List, Dict, Any, Optional
from datetime import datetime
# 添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from utils.mysql_agent import MySQLAgent
from utils.logger import log
from config import Config
class RSSDataProcessor:
"""RSS数据处理器 - 专门处理汽车后市场相关新闻"""
def __init__(self):
"""初始化处理器"""
self.log = log.bind(module="RSSDataProcessor")
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
self.table_name = "collector_rss_subscriptions"
self.processed_table_name = "processed_rss_data"
# 获取项目根目录
current_dir = os.path.dirname(os.path.abspath(__file__))
self.project_root = os.path.dirname(current_dir)
# 设置文件路径(相对于项目根目录)
self.keywords_file = os.path.join(self.project_root, "processors", "keywords.txt")
self.stopwords_file = os.path.join(self.project_root, "processors", "stopwords.txt")
# 汽车后市场相关关键词(默认值,实际从文件加载)
self.auto_aftermarket_keywords = {
'汽车配件', '汽车维修', '汽车保养', '汽车改装', '汽车美容', '汽车装饰',
'轮胎', '机油', '刹车片', '火花塞', '滤清器', '蓄电池', '车灯',
'保险杠', '车门', '座椅', '方向盘', '仪表盘', '音响', '导航',
'汽车用品', '车载设备', '汽车电子', '汽车安全', '汽车保险',
'二手车', '汽车交易', '汽车金融', '汽车租赁', '汽车服务',
'4S店', '汽修店', '汽车后市场', '汽车产业链', '汽车供应链', '汽车', ''
}
# 停用词表(默认值,实际从文件加载)
self.stopwords = {
'', '', '', '', '', '', '', '', '', '', '', '', '一个',
'', '', '', '', '', '', '', '', '', '', '没有', '', '',
'自己', '', '', '', '', '', '我们', '你们', '他们', '什么', '怎么',
'为什么', '因为', '所以', '但是', '然后', '如果', '虽然', '而且', '或者',
'可以', '应该', '必须', '需要', '想要', '希望', '觉得', '认为', '知道',
'了解', '明白', '清楚', '简单', '容易', '困难', '重要', '主要', '基本',
'一般', '特别', '非常', '十分', '相当', '比较', '更加', '', '',
'已经', '正在', '将要', '可能', '也许', '大概', '大约', '左右', '上下',
'今天', '明天', '昨天', '现在', '以前', '以后', '时候', '时间', '地方',
'这里', '那里', '这样', '那样', '如此', '这样', '那样', '如何', '怎样'
}
# 缓存关键词,避免重复加载
self._cached_keywords = None
self.log.info("RSS数据处理器初始化完成")
def load_keywords(self, keywords_file: Optional[str] = None) -> set:
"""从文件加载汽车后市场关键词(带缓存)"""
# 如果已经缓存,直接返回
if self._cached_keywords is not None:
return self._cached_keywords
# 使用默认路径(项目根目录下的文件)
if keywords_file is None:
keywords_file = self.keywords_file
keywords = set()
try:
if os.path.exists(keywords_file):
with open(keywords_file, 'r', encoding='utf-8') as f:
keywords = set(line.strip() for line in f if line.strip())
self.log.info(f"成功加载汽车后市场关键词,共 {len(keywords)}")
else:
self.log.warning(f"关键词文件不存在: {keywords_file}")
# 使用默认关键词
keywords = self.auto_aftermarket_keywords
except Exception as e:
self.log.error(f"加载关键词失败: {str(e)}")
keywords = self.auto_aftermarket_keywords
# 缓存关键词
self._cached_keywords = keywords
return keywords
def load_rss_data(self, limit: int = 1000) -> pd.DataFrame:
"""从数据库加载未处理的RSS数据"""
try:
sql = f"""
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
FROM {self.table_name}
WHERE 是否已处理 = 0
ORDER BY 发布时间 DESC
LIMIT %s
"""
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
self.log.info(f"成功加载 {len(df)} 条未处理的RSS数据")
return df
except Exception as e:
self.log.error(f"加载RSS数据失败: {str(e)}", exc_info=True)
return pd.DataFrame()
def mark_as_processed(self, ids: List[int]) -> bool:
"""标记指定ID的数据为已处理"""
if not ids:
return True
try:
# 将ID列表转换为字符串格式用于SQL IN语句
id_placeholders = ','.join(['%s'] * len(ids))
sql = f"""
UPDATE {self.table_name}
SET 是否已处理 = 1
WHERE id IN ({id_placeholders})
"""
result = self.db_agent.execute_sql(sql, params=ids)
self.log.info(f"成功标记 {len(ids)} 条数据为已处理")
return True
except Exception as e:
self.log.error(f"标记数据为已处理失败: {str(e)}", exc_info=True)
return False
def load_stopwords(self, stopwords_file: Optional[str] = None) -> set:
"""加载停用词表"""
# 使用默认路径(项目根目录下的文件)
if stopwords_file is None:
stopwords_file = self.stopwords_file
try:
if os.path.exists(stopwords_file):
with open(stopwords_file, 'r', encoding='utf-8') as f:
stopwords = set(line.strip() for line in f if line.strip())
self.log.info(f"成功加载停用词表,共 {len(stopwords)} 个词")
return stopwords
else:
self.log.warning(f"停用词文件不存在: {stopwords_file},使用默认停用词")
return self.stopwords
except Exception as e:
self.log.error(f"加载停用词表失败: {str(e)}")
return self.stopwords
def add_custom_dict(self, custom_dict_file: Optional[str] = None):
"""添加自定义词典"""
if custom_dict_file and os.path.exists(custom_dict_file):
try:
jieba.load_userdict(custom_dict_file)
self.log.info("成功加载自定义词典")
except Exception as e:
self.log.warning(f"加载自定义词典失败: {str(e)}")
# 从文件加载汽车后市场关键词并添加到jieba词典
keywords = self.load_keywords()
for keyword in keywords:
jieba.add_word(keyword, freq=1000, tag='n')
def segment_and_pos(self, text: str, stopwords: set) -> List[str]:
"""分词并标注词性,过滤停用词"""
if not text or pd.isna(text):
return []
words = pseg.cut(str(text))
result = []
# 汽车后市场相关的词性标签
allowed_flags = {'n', 'vn', 'np', 'ns', 'nr', 'nt'} # 名词、动词、动名词、名词短语、处所词、人名、机构名
for word, flag in words:
word = word.strip()
if (len(word) >= 1 and
word not in stopwords and
flag in allowed_flags and
not word.isdigit()): # 过滤纯数字
result.append(word)
return result
def is_auto_aftermarket_related(self, text: str) -> bool:
"""判断文本是否与汽车后市场相关"""
if not text:
return False
text_lower = str(text).lower()
# 从文件加载关键词
keywords = self.load_keywords()
# 检查是否包含汽车后市场关键词
for keyword in keywords:
if keyword in text_lower:
return True
# 检查分词结果中是否包含相关词汇
words = self.segment_and_pos(text, self.stopwords)
for word in words:
if word in keywords:
return True
return False
def process_dataframe(self, df: pd.DataFrame, stopwords: set) -> pd.DataFrame:
"""处理整个DataFrame,进行分词和过滤"""
if df.empty:
self.log.warning("输入的DataFrame为空")
return df
# 确保所有文本都是字符串,并处理NaN值
df['文章标题'] = df['文章标题'].fillna('').astype(str)
df['文章摘要'] = df['文章摘要'].fillna('').astype(str)
# 合并标题和摘要进行分词
df['combined_text'] = df['文章标题'] + ' ' + df['文章摘要']
# 分词处理
df['segmented_words'] = df['combined_text'].apply(lambda x: self.segment_and_pos(x, stopwords))
# 判断是否与汽车后市场相关(只要出现关键词就入库)
df['is_auto_related'] = df['combined_text'].apply(self.is_auto_aftermarket_related)
df['is_filtered'] = df['is_auto_related']
# 添加处理时间
df['processed_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.log.info(f"数据处理完成,共处理 {len(df)} 条记录")
return df
def filter_auto_aftermarket_news(self, df: pd.DataFrame) -> pd.DataFrame:
"""过滤出汽车后市场相关的新闻"""
if df.empty:
return df
# 过滤出包含关键词的文章
filtered_df = df[df['is_filtered'] == True].copy()
self.log.info(f"过滤出 {len(filtered_df)} 条汽车后市场相关新闻")
return filtered_df
def save_to_database(self, df: pd.DataFrame) -> bool:
"""保存处理结果到数据库"""
if df.empty:
self.log.warning("没有数据需要保存")
return False
try:
# 准备保存的数据
save_df = df[['文章标题', '文章摘要', '发布时间', '来源URL', '文章链接',
'segmented_words', 'is_auto_related', 'processed_time']].copy()
# 将分词结果转换为字符串
save_df['分词结果'] = save_df['segmented_words'].apply(lambda x: ' '.join(x))
# 重命名列名为中文
save_df = save_df.rename(columns={
'is_auto_related': '是否汽车相关',
'processed_time': '处理时间'
})
# 删除不需要的列
save_df = save_df.drop('segmented_words', axis=1)
# 检查目标表是否存在,不存在则创建
if not self.db_agent.table_exists(self.processed_table_name):
self.create_processed_table()
# 插入数据
inserted_rows = self.db_agent.insert_from_df(
table_name=self.processed_table_name,
df=save_df,
ignore_duplicates=True
)
self.log.info(f"成功保存 {inserted_rows} 条处理结果到数据库")
return True
except Exception as e:
self.log.error(f"保存到数据库失败: {str(e)}", exc_info=True)
return False
def create_processed_table(self):
"""创建处理结果表"""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.processed_table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
文章标题 TEXT,
文章摘要 TEXT,
发布时间 DATETIME,
来源URL VARCHAR(1024),
文章链接 VARCHAR(1024),
分词结果 TEXT,
是否汽车相关 BOOLEAN,
处理时间 DATETIME,
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"""
try:
self.db_agent.execute_sql(create_sql)
self.log.info(f"成功创建处理结果表: {self.processed_table_name}")
except Exception as e:
self.log.error(f"创建表失败: {str(e)}", exc_info=True)
raise
def get_processing_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""获取处理统计信息"""
if df.empty:
return {}
total_count = len(df)
filtered_count = len(df[df['is_filtered'] == True])
stats = {
'total_articles': total_count,
'filtered_articles': filtered_count,
'filter_rate': filtered_count / total_count if total_count > 0 else 0,
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return stats
def process_rss_data(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]:
"""处理RSS数据的主函数"""
try:
self.log.info("开始处理RSS数据...")
# 1. 加载RSS数据
df = self.load_rss_data(limit)
if df.empty:
self.log.warning("没有加载到RSS数据")
return {'success': False, 'message': '没有数据可处理'}
# 2. 加载停用词表
stopwords = self.load_stopwords()
# 3. 添加自定义词典
self.add_custom_dict()
# 4. 处理数据
processed_df = self.process_dataframe(df, stopwords)
# 5. 过滤汽车后市场相关新闻
filtered_df = self.filter_auto_aftermarket_news(processed_df)
# 6. 获取统计信息
stats = self.get_processing_statistics(processed_df)
# 7. 保存到数据库
if save_to_db and not filtered_df.empty:
save_success = self.save_to_database(filtered_df)
stats['save_success'] = save_success
# 8. 标记数据为已处理
if not df.empty and 'id' in df.columns:
processed_ids = df['id'].tolist()
mark_success = self.mark_as_processed(processed_ids)
stats['mark_success'] = mark_success
if not mark_success:
self.log.warning("部分数据标记为已处理失败")
# 9. 输出结果
self.log.info("RSS数据处理完成", **stats)
return {
'success': True,
'message': 'RSS数据处理完成',
'statistics': stats,
'filtered_data': filtered_df
}
except Exception as e:
self.log.error(f"RSS数据处理失败: {str(e)}", exc_info=True)
return {'success': False, 'message': f'处理失败: {str(e)}'}
def main():
"""主函数入口"""
try:
# 创建处理器实例
processor = RSSDataProcessor()
# 处理RSS数据
result = processor.process_rss_data(
limit=5000, # 处理最近5000条数据
save_to_db=True # 保存到数据库
)
if result['success']:
print("RSS数据处理完成!")
print(f"处理统计: {result['statistics']}")
else:
print(f"处理失败: {result['message']}")
except Exception as e:
print(f"程序运行出错: {str(e)}")
if __name__ == "__main__":
main()
+100
View File
@@ -0,0 +1,100 @@
一个
没有
自己
我们
你们
他们
什么
怎么
为什么
因为
所以
但是
然后
如果
虽然
而且
或者
可以
应该
必须
需要
想要
希望
觉得
认为
知道
了解
明白
清楚
简单
容易
困难
重要
主要
基本
一般
特别
非常
十分
相当
比较
更加
已经
正在
将要
可能
也许
大概
大约
左右
上下
今天
明天
昨天
现在
以前
以后
时候
时间
地方
这里
那里
这样
那样
如此
这样
那样
如何
怎样
View File
+91 -104
View File
@@ -3,67 +3,57 @@
### 参考文档
https://alidocs.dingtalk.com/i/nodes/NZQYprEoWoexdo1ohPdxXvDbJ1waOeDk?utm_scene=team_space
### 程序框架
### 程序框架(当前实现)
```angular2html
intelligence_system/
├── data_collection/ # 数据采集层
│ ├── spiders/ # 网络爬虫子系统
│ ├── weibo_spider.py # 黑猫爬虫
├── api_integration/ # API接口子系统
│ │ ├── news_api.py # 新闻接口
│ │
│ └── internal/ # 内部数据收集
│ ├── jian_dao_cloud.py # 简道云表单收集器
├── collectors/ # 数据采集层
│ ├── complaint_spider.py # 投诉信息爬虫(结构化入库/附件走MinIO)
│ ├── rss_subscriptions.py # RSS 订阅抓取
└── internal/ # 内部数据收集(保留)
└── jian_dao_cloud.py # 简道云表单收集器(示例/占位)
├── data_processing/ # 数据处理层
│ ├── structured/ # 结构化数据处理
│ ├── data_cleaner.py # 数据清洗(去重/标准化
│ └── schema_mapper.py # 数据结构转换器
├── unstructured/ # 非结构化数据处理
│ │ ├── text_parser.py # 文本解析(PDF/HTML等)
│ │ ├── image_analyzer.py # 图像识别(OpenCV集成)
│ │ └── video_processor.py # 音视频分离分析
│ │
│ └── ai_engine/ # AI分析核心
│ ├── nlp_processor.py # 自然语言处理引擎
│ ├── sentiment_analyzer.py # 情感分析模型
│ └── topic_modeler.py # LDA主题建模工具
├── processors/ # 数据处理层
│ ├── processor_rss_data.py # RSS数据清洗、分词、过滤与入库
│ ├── keywords.txt # 行业关键词(用于分词/过滤
├── stopwords.txt # 停用词
└── ai_engine/
└── ai_proessor_rss_data # 预留(AI分析扩展占位)
├── services/ # 应用服务层
│ ├── monitoring/ # 舆情监控
│ │ ├── opinion_monitor.py # 实时舆情追踪
│ │ └── brand_reputation.py # 品牌口碑分析
│ ├── analysis/ # 竞品分析
│ │ ── competitor_tracker.py # 竞品动态监控
│ └── swot_generator.py # SWOT分析报告
│ │
├── reporting/ # 报告服务
│ ├── daily_reporter.py # 自动化日报生成
── weekly_digest.py # 周报汇编系统
│ └── alert/ # 预警服务
│ ├── alert_trigger.py # 动态阈值告警
│ └── notification_center.py # 邮件/短信通知
├── services/ # 应用服务层(保留)
│ ├── monitoring/ # 舆情监控
│ │ ├── opinion_monitor.py # 实时舆情追踪(占位)
│ │ └── brand_reputation.py # 品牌口碑分析(占位)
├── analysis/ # 竞品分析
│ ├── competitor_tracker.py # 竞品动态监控(占位)
│ │ ── swot_generator.py # SWOT分析报告(占位)
├── reporting/ # 报告服务
│ │ ├── daily_reporter.py # 自动化日报生成(占位)
│ └── weekly_digest.py # 周报汇编系统(占位)
└── alert/ # 预警服务
── alert_trigger.py # 动态阈值告警(占位)
└── notification_center.py # 邮件/短信通知(占位)
├── system_management/ # 系统管理
│ ├── scheduler/ # 任务调度
│ └── task_scheduler.py # 任务调度器
│ └── monitor/ # 系统监控
│ ├── health_monitor.py # 服务健康检测
│ └── performance_watcher.py # 资源占用监控
├── applications/ # 应用
│ ├── alert.py # 告警触发/通知(占位/实现中)
│ └── reporter/
├── daily.py # 日报生成
└── monthly.py # 月报生成
├── utils/ # 工具库
│ ├── file_handler.py # 通用文件操作
│ ├── logger.py # 日志系统
├── mysql_agent.py # MySQL读写管理器
│ └── datetime_parser.py # 时间格式处理
├── system_management/ # 系统管理层
│ ├── scheduler/
│ ├── task_scheduler.py # 任务调度器(Cron表达式 + 线程池)
│ └── task_management.py # 任务管理辅助
│ └── monitor/ # 系统监控(目录占位)
├── config.py # 配置加载与管理
└── main.py # 系统入口(启动所有服务)
├── utils/ # 工具库
│ ├── file_handler.py # 通用文件操作
│ ├── logger.py # 跨平台日志系统(Loguru)
│ ├── mysql_agent.py # MySQL读写管理器
│ └── minio_agent.py # MinIO对象存储客户端
├── config.py # 配置加载与管理(含数据库/存储配置)
├── main.py # 系统入口(Cron轮询 + 调度执行)
└── requirements.txt # 依赖清单
```
### 程序设计原则
@@ -72,23 +62,32 @@ intelligence_system/
3. 密钥等信息直接放在配置类中
4. 数据存储遵循"结构化存MySQL,非结构化存MinIO"原则,通过元数据关联
### 主程序设计
主程序需要一次启动,一直运行,启动时运行一次(在代码中可取消),之后每天定时生成一次报告
### 主程序与调度设计(已实现)
主程序以长运行进程方式启动,进入轻量轮询循环(每10秒)。调度器按Cron表达式在`main_task`表中拉取到期任务,使用线程池异步执行,并在每分钟输出运行状态、每小时汇总统计。
主程序包含爬虫/api调度器。该调度器通过查询mysql中任务调度情况按需执行,db文件中应包含任务名称、
任务路径、任务执行频率(支持按天、按周,按分钟)、上次执行时间、下次执行时间等信息
- 调度器能力:
- 基于`croniter`解析Cron表达式,支持时区(默认`Asia/Shanghai`
- 线程池并发执行,信号量限制最大并发(与`max_workers`一致)
- 任务入口动态解析:支持`package.module``package.module.ClassName.main``package.module.func` 等形式
- 成功/失败后自动计算`next_run_time`或设置15分钟后重试
- 关键字段自动更新:`is_running``last_run_time``last_run_status``run_count``next_run_time`
主程序应包含数据处理调度器,根据数据类别分别处理,如文本数据处理调度器、图片数据处理调度器等,
每天定时拉取db获取到的原始数据,分别进行处理,处理完成后将结果保存到mysql中
- 主循环:
- 每10秒检查一次待运行任务
- 每分钟打印当前周期统计;每小时写入累计统计日志
- 支持`SIGINT/SIGTERM`优雅关闭,等待正在运行的任务完成
主程序应包含日报、周报等生成,根据时间定时生成报告,报告需要存储
### 日志设计(已实现)
跨平台日志系统(Loguru)输出至`logs/`目录:
### 日志设计
日志系统兼容Windows、Mac、Linux平台,以`log`文件形式存储,超过20MB自动压缩。新增存储相关日志内容:
- MySQL操作:批量插入行数、表结构变更、事务状态
- MinIO操作:文件上传/下载状态、路径、大小、耗时
- 关联日志:MySQL记录与MinIO对象的绑定关系(如"ID:123 关联文件: collector/images/xxx.jpg"
- 异常日志:MySQL连接失败、MinIO上传超时、数据关联不一致等告警信息
- application.log:主日志,`rotation = 20MB`,达到阈值后压缩为`application.log.YYYYMMDD.zip``retention = 30天`
- errors.log:错误日志(ERROR及以上),`rotation = 10MB``retention = 90天`
- 结构化扩展字段:日志支持`extra`键值对,自动美化并对长字段(如`sql``params`)截断
建议记录的业务事件:
- MySQL读写操作要点(表名、影响行数、事务状态)
- MinIO对象操作(对象路径、大小、耗时、状态)
- 任务执行上下文(task_id、task_name、module_path、耗时、状态)
### 存储系统设计(MinIO+MySQL
#### 核心存储分工
@@ -118,44 +117,32 @@ intelligence_system/
- 系统类:如任务调度表等采用功能命名(如`main_task`
#### 核心表结构
1. `collector_news_api`:新闻API采集数据表(存储新闻标题、内容等结构化数据
2. `collector_complaint_spider`:投诉信息爬虫数据表(含投诉文本、附件MinIO路径`attachment_minio_path`等)
3. `collector_image_source`:采集层图片元数据表(存储图片URL、MinIO路径、格式、大小等)
4. `processor_text_processor`:文本处理结果表(存储NLP分析结果、关联原文ID等)
5. `processor_image_processor`:图片处理结果表(存储识别标签、特征向量、处理后图片MinIO路径`result_minio_path`
6. `storage_object_index`:MinIO对象索引表(存储所有对象的MinIO路径、哈希值、创建时间、过期时间等)
7. `main_task`:任务调度表(存储任务名称、路径、执行频率、上次/下次执行时间等)
8. `application_reporter_daily`:日报数据表(存储日报结构化内容、报表文件MinIO路径等)
9. `application_reporter_monthly`:月报数据表(存储月报结构化内容、报表文件MinIO路径等)
#### 数据交互特性
1. **MySQL交互**
- 支持DataFrame直接读写,提供分块处理(`chunksize`)和批量插入能力
- 自动适配平台特性(如Windows小批次写入优化)
- 完善的事务机制确保结构化数据一致性
2. **MinIO交互**
- 支持大文件分片上传、断点续传
3. **联动机制**
- 非结构化数据存储时,先上传至MinIO获取路径,再将路径及元数据写入MySQL
- 读取非结构化数据时,先从MySQL获取MinIO路径,再通过路径从MinIO下载
- 日志同步记录MySQL操作和MinIO对象操作(如"上传文件至MinIO: {path},关联MySQL记录ID: {id}"
#### 核心表结构(当前落地)
1. `main_task`:任务调度表(`task_name``task_type``module_path``cron_expression``time_zone``run_count``is_running``last_run_time``last_run_status``next_run_time``is_active`
2. `collector_rss_subscriptions`RSS源采集数据(`文章标题``文章摘要``发布时间``来源URL``文章链接``是否已处理` 等)
3. `processed_rss_data`RSS处理结果(`分词结果``是否汽车相关``处理时间` 等)
4. `collector_complaint_spider`:投诉信息爬虫数据(含文本与附件MinIO路径`attachment_minio_path`等)
5. 可选:`storage_object_index`(建议用于统一索引MinIO对象元数据
### 数据采集设计
1. 结构化数据(如新闻文本、投诉内容):直接写入对应`collector_`前缀表
2. 非结构化数据(如爬取的图片、附件):
- `minio_agent.py`上传至对应存储桶
-MinIO路径、文件大小、格式等元数据写入`collector_`前缀表或`storage_object_index`
3. 每个采集模块(独立py文件,`main`方法入口)需同时处理MySQLMinIO交互,确保数据关联完整
1. 结构化数据(RSS、投诉文本):写入`collector_`前缀表
2. 非结构化数据(附件/图片等):
- 使`utils/minio_agent.py`上传至对应存储桶
-对象路径与元数据写入业务表或`storage_object_index`
3. 采集模块需同时处理MySQLMinIO交互,确保关联完整
### 数据处理设计(RSS流程已实现)
`processors/processor_rss_data.py`流程:
-`collector_rss_subscriptions`加载未处理数据(可配置`limit`
- 加载停用词与行业关键词(`stopwords.txt` / `keywords.txt`),并动态注入`jieba`词典
- 标注词性并过滤停用词,仅保留与汽车后市场相关的词汇
- 标记与过滤:出现任一行业关键词即视为相关,进入保存
- 将结果写入`processed_rss_data`,并回写源表`是否已处理 = 1`
- 输出处理统计(总量、命中量、命中率、时间)
### 数据处理设计
1. 结构化数据处理:从MySQL读取原始数据,处理后写入`processor_`前缀表
2. 非结构化数据处理:
- 从MySQL获取MinIO路径,通过`minio_agent.py`下载原始文件
- 处理后(如图片识别、视频帧提取)将结果文件上传至MinIO(处理层存储桶)
- 将处理结果的结构化信息(如识别标签)和处理后文件的MinIO路径写入`processor_`前缀表
3. 支持多表关联存储,通过`source_id`关联原始数据与处理结果
### 依赖与运行
- 依赖:见`requirements.txt`pandas、SQLAlchemy、PyMySQL、croniter、pytz、loguru、jieba、feedparser、beautifulsoup4、minio 等)
- 配置:在`config.py`中设置`MYSQL_CONFIG`与MinIO参数
- 运行:
- 启动主程序:`python main.py`
- 添加任务:向`main_task`插入记录,`module_path`可指向如`processors.processor_rss_data.main`
+1
View File
@@ -15,3 +15,4 @@ feedparser==6.0.11
Markdown==3.9
openai==1.107.3
tqdm==4.67.1
jieba==0.42.1
+2
View File
@@ -0,0 +1,2 @@
# Makes system_management a package
+3
View File
@@ -0,0 +1,3 @@
# Makes system_management.scheduler a package
from .task_scheduler import TaskScheduler
@@ -1,8 +1,8 @@
import argparse
from datetime import datetime
from system_management.scheduler.task_scheduler import TaskScheduler
from system_management.scheduler.task_manager import TaskManager
from config.config import ConfigManager
from system_management.scheduler.task_scheduler import TaskManager
from config import Config
from utils.logger import CrossPlatformLog
# 初始化日志
@@ -11,8 +11,7 @@ log = CrossPlatformLog.get_logger("TaskManagement")
def main():
# 初始化配置和组件
config = ConfigManager()
scheduler = TaskScheduler(config.get("database"))
scheduler = TaskScheduler(Config.MYSQL_CONFIG)
manager = TaskManager(scheduler)
# 解析命令行参数
+227 -47
View File
@@ -1,4 +1,5 @@
import importlib
import threading
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
@@ -19,10 +20,79 @@ class TaskScheduler:
"""初始化任务调度器(基于Cron表达式)"""
self.db = MySQLAgent(db_config or {})
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# 并发容量控制:限制同时运行的后台任务不超过 max_workers
self._running_semaphore = threading.Semaphore(max_workers)
# 任务统计
self.hourly_stats = {'成功': 0, '失败': 0, '总数': 0}
self.hourly_stats_lock = threading.Lock()
log.info(f"任务调度器已初始化,最大工作线程数: {max_workers}")
def check_and_run_tasks(self) -> Dict[str, int]:
"""检查并执行所有到期的任务,优化空任务处理和异常容错"""
def _resolve_callable(self, module_path: str):
"""解析模块路径,支持模块、模块内类/函数,并返回可调用对象
兼容以下形式:
- package.module -> 期望模块内存在 main()
- package.module.ClassName -> 调用 ClassName.main() 或实例化后调用 main()
- package.module.func_name -> 直接调用该函数
- package.module.ClassName.method_name -> 调用指定方法
"""
if not module_path or not isinstance(module_path, str):
raise ImportError("无效的模块路径")
parts = module_path.split('.')
last_import_error = None
# 从最长前缀开始尝试导入模块,逐步回退
for i in range(len(parts), 0, -1):
module_name = '.'.join(parts[:i])
try:
module = importlib.import_module(module_name)
attr_chain = parts[i:]
# 从模块开始逐级解析属性
target = module
for attr in attr_chain:
if not hasattr(target, attr):
raise AttributeError(f"{target} 中未找到属性: {attr}")
target = getattr(target, attr)
# 若目标是类,优先尝试类方法/实例方法 main
if isinstance(target, type):
# 类方法 main
if hasattr(target, 'main') and callable(getattr(target, 'main')):
return getattr(target, 'main')
# 实例方法 main
try:
instance = target()
if hasattr(instance, 'main') and callable(getattr(instance, 'main')):
return getattr(instance, 'main')
except Exception:
pass
# 不把“类本身”当作任务入口(否则只会构造实例不执行 main)
raise AttributeError(f"{target.__name__} 缺少可调用的 main() 作为任务入口")
# 目标非类:若本身可调用,则直接作为入口返回
if callable(target):
return target
# 否则尝试对象上的 main()
if hasattr(target, 'main') and callable(getattr(target, 'main')):
return getattr(target, 'main')
raise AttributeError(f"路径 {module_path} 未解析到可调用入口(缺少 main 或不可调用)")
except Exception as e:
last_import_error = e
continue
# 如果所有尝试均失败,则抛出最后的错误
raise ImportError(f"模块 {module_path} 导入/解析失败: {str(last_import_error)}")
def check_and_run_tasks(self, print_empty_status: bool = False) -> Dict[str, int]:
"""检查并执行所有到期的任务,优化空任务处理和异常容错
Args:
print_empty_status: 是否打印空任务状态(默认False,避免频繁输出)
"""
result = {'总任务数': 0, '成功': 0, '失败': 0}
try:
@@ -39,12 +109,13 @@ class TaskScheduler:
AND next_run_time <= %s
AND is_running = 0
ORDER BY next_run_time
""", params=(now,))
""", params=(now,),is_print=False)
result['总任务数'] = len(tasks_df)
if tasks_df.empty:
# 空任务时输出INFO级日志,明确提示状态
log.info("当前没有到期的任务,等待新任务加入...")
# 空任务时根据参数决定是否输出
if print_empty_status:
print(f"当前没有到期的任务,等待新任务加入...{now.strftime('%Y-%m-%d %H:%M:%S')}")
return result
# 并发执行任务
@@ -65,6 +136,12 @@ class TaskScheduler:
log.error(f"任务线程执行失败: {str(e)}", exc_info=True)
result['失败'] += 1
# 更新小时统计
with self.hourly_stats_lock:
self.hourly_stats['成功'] += result['成功']
self.hourly_stats['失败'] += result['失败']
self.hourly_stats['总数'] += result['总任务数']
log.info(
"任务调度周期完成",
总任务数=result['总任务数'],
@@ -88,6 +165,9 @@ class TaskScheduler:
task_log.info(f"开始执行任务: {task_name}")
try:
# 阻塞等待可用的执行槽位,保证同时运行的任务不超过最大工作线程数
self._running_semaphore.acquire()
# 标记任务为运行中(使用当前时间的时区感知对象)
tz = pytz.timezone(task.get('time_zone', 'Asia/Shanghai'))
current_time = datetime.now(tz).replace(tzinfo=None)
@@ -97,24 +177,10 @@ class TaskScheduler:
'last_run_time': current_time
})
# 执行任务逻辑
self._execute_task_logic(task)
# 计算下次运行时间(基于Cron表达式)
next_run_time = self._calculate_next_run_time(
cron_expr=task['cron_expression'],
time_zone=task.get('time_zone', 'Asia/Shanghai')
)
# 更新任务状态为成功
self._update_task_status(task_id, {
'last_run_status': 'success',
'is_running': 0,
'run_count': task['run_count'] + 1,
'next_run_time': next_run_time
})
task_log.info(f"任务执行成功: {task_name}")
return True
# 将任务主体放到后台线程执行,当前线程快速返回
self.executor.submit(self._run_task_async, task.copy())
task_log.debug("任务已提交至后台执行队列")
return True # 表示已成功提交
except Exception as e:
task_log.error(f"任务执行失败: {str(e)}", exc_info=True)
@@ -132,32 +198,138 @@ class TaskScheduler:
except Exception as update_err:
task_log.error(f"任务失败后状态更新失败: {str(update_err)}", exc_info=True)
# 若已占用并发槽位,释放之
try:
self._running_semaphore.release()
except Exception:
pass
return False
def _execute_task_logic(self, task: Dict[str, Any]) -> None:
"""执行任务的具体逻辑(动态导入模块)"""
start_time = time.time()
def _run_task_async(self, task: Dict[str, Any]) -> None:
"""在后台线程中执行任务主体,并在结束后更新状态"""
task_id = task['task_id']
module_path = task['module_path']
task_log = log.bind(task_id=task_id, module=module_path)
task_name = task['task_name']
task_log = log.bind(task_id=task_id, task_name=task_name)
try:
# 如果 module_path 指向类,先实例化以触发初始化日志,然后执行 main
self._execute_task_logic(task)
# 成功后计算下次运行时间
next_run_time = self._calculate_next_run_time(
cron_expr=task['cron_expression'],
time_zone=task.get('time_zone', 'Asia/Shanghai')
)
self._update_task_status(task_id, {
'last_run_status': 'success',
'is_running': 0,
'run_count': task['run_count'] + 1,
'next_run_time': next_run_time
})
task_log.info(f"任务执行成功: {task_name}")
except Exception:
task_log.error("任务后台执行失败", exc_info=True)
next_retry_time = datetime.now() + pd.Timedelta(minutes=15)
try:
self._update_task_status(task_id, {
'last_run_status': 'failed',
'is_running': 0,
'next_run_time': next_retry_time
})
except Exception:
task_log.error("任务失败后状态更新失败(后台)", exc_info=True)
finally:
# 释放并发槽位
try:
self._running_semaphore.release()
except Exception:
pass
def _execute_task_logic(self, task):
"""
执行任务逻辑的核心方法
支持类方法、静态方法和实例方法的调用
"""
module_path = task.get('module_path')
if not module_path:
raise ValueError("任务缺少 module_path 配置")
# 解析模块路径和类名
try:
path_parts = module_path.split('.')
if len(path_parts) < 2:
raise ValueError(f"无效的模块路径: {module_path}")
module_name = '.'.join(path_parts[:-1])
class_name = path_parts[-1]
method_name = 'main' # 默认方法名
except Exception as e:
raise ValueError(f"解析模块路径失败: {str(e)}")
# 动态导入模块
try:
import importlib
module = importlib.import_module(module_name)
except ImportError as e:
raise ImportError(f"无法导入模块 {module_name}: {str(e)}")
# 获取类和方法
if not hasattr(module, class_name):
raise AttributeError(f"模块 {module_name} 中未找到类 {class_name}")
cls = getattr(module, class_name)
# 检查是否存在指定方法
if not hasattr(cls, method_name):
raise AttributeError(f"{class_name} 中未找到方法 {method_name}")
method = getattr(cls, method_name)
# 根据方法类型决定如何调用
import inspect
callable_entry = None
# 判断是否为静态方法或类方法
if isinstance(method, staticmethod):
# 静态方法可以直接调用
callable_entry = method
elif isinstance(method, classmethod):
# 类方法需要传入类作为第一个参数
callable_entry = method
else:
# 实例方法或普通函数
try:
# 尝试检查方法签名
sig = inspect.signature(method)
params = list(sig.parameters.values())
# 如果第一个参数是self且没有默认值,则认为是实例方法
if params and params[0].name == 'self' and params[0].default == inspect.Parameter.empty:
# 创建实例并获取绑定方法
instance = cls()
callable_entry = getattr(instance, method_name)
else:
# 可能是普通函数或者是带有默认self参数的方法
callable_entry = method
except Exception:
# 如果检查签名失败,默认尝试创建实例
try:
instance = cls()
callable_entry = getattr(instance, method_name)
except Exception:
# 如果创建实例也失败,则直接调用方法(适用于不需要self的特殊情况)
callable_entry = method
# 执行任务
if not callable(callable_entry):
raise TypeError(f"{module_path}.{method_name} 不是可调用对象")
try:
# 动态导入任务模块(增加模块存在性检查)
try:
module = importlib.import_module(module_path)
except ImportError as e:
raise ImportError(f"模块 {module_path} 导入失败: {str(e)}")
# 检查main函数是否存在
if not hasattr(module, 'main') or not callable(module.main):
raise AttributeError(f"模块 {module_path} 中未找到可调用的 main() 函数")
task_log.debug("开始执行模块中的 main() 函数")
module.main() # 调用任务主函数
task_log.info(f"任务执行完成,耗时: {time.time() - start_time:.2f}")
# 执行任务逻辑
callable_entry()
except Exception as e:
task_log.error("任务逻辑执行失败", exc_info=True)
self.logger.error(f"任务逻辑执行失败: {str(e)}")
raise
def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime:
@@ -218,11 +390,11 @@ class TaskScheduler:
if not cron_expression:
raise ValueError("Cron表达式不能为空")
# 验证模块是否存在(提前检查,避免添加无效任务)
# 验证模块路径可解析(提前检查,避免添加无效任务)
try:
importlib.import_module(module_path)
except ImportError as e:
raise ValueError(f"模块 {module_path} 不存在: {str(e)}")
_ = self._resolve_callable(module_path)
except Exception as e:
raise ValueError(f"模块路径不可用: {module_path},错误: {str(e)}")
# 计算首次运行时间
first_run_time = self._calculate_next_run_time(cron_expression, time_zone)
@@ -302,3 +474,11 @@ class TaskScheduler:
except Exception as e:
log.error(f"查询待执行任务失败,将重试: {str(e)}", exc_info=True)
return []
def get_and_reset_hourly_stats(self) -> Dict[str, int]:
"""获取并重置小时统计数据(用于每小时统计)"""
with self.hourly_stats_lock:
stats = self.hourly_stats.copy()
# 重置统计
self.hourly_stats = {'成功': 0, '失败': 0, '总数': 0}
return stats
+1
View File
@@ -0,0 +1 @@
print("Hello, World!")
Binary file not shown.
+702 -397
View File
File diff suppressed because one or more lines are too long
+1 -1
View File
@@ -1 +1 @@
from .logger import CrossPlatformLog
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+35 -17
View File
@@ -1,10 +1,11 @@
import os
import shutil
import zipfile
import pickle
import pandas as pd
from datetime import datetime
from pathlib import Path, PurePath
from typing import Union, Optional, List, Dict, Any
from typing import Union, Optional, List, Dict, Any, Callable
from utils.logger import log
class FileHandler:
@@ -71,6 +72,17 @@ class FileHandler:
df = pd.read_excel(file_path, **kwargs)
elif ext == 'json':
df = pd.read_json(file_path, encoding=encoding, **kwargs)
elif ext in ['pkl', 'pickle']:
# 统一将pickle内容转为DataFrame返回
obj = pd.read_pickle(file_path)
if isinstance(obj, pd.DataFrame):
df = obj
elif isinstance(obj, list):
df = pd.DataFrame(obj)
elif isinstance(obj, dict):
df = pd.DataFrame([obj])
else:
df = pd.DataFrame({'content': [obj]})
elif ext == 'parquet':
df = pd.read_parquet(file_path, **kwargs)
else:
@@ -102,25 +114,31 @@ class FileHandler:
if not parent_dir.exists():
self.create_dir(parent_dir)
# 统一数据格式
if isinstance(data, pd.DataFrame):
df = data
else:
df = pd.DataFrame(data if isinstance(data, list) else [data])
# 根据扩展名选择写入方式
ext = self.get_file_extension(file_path)
if ext in ['csv', 'txt']:
df.to_csv(file_path, encoding=encoding, index=False, **kwargs)
elif ext in ['xls', 'xlsx']:
df.to_excel(file_path, index=False, **kwargs)
elif ext == 'json':
df.to_json(file_path, force_ascii=False, **kwargs)
elif ext == 'parquet':
df.to_parquet(file_path, **kwargs)
if ext in ['pkl', 'pickle']:
# 直接按原始对象进行pickle序列化
with open(file_path, 'wb') as f:
pickle.dump(data, f)
else:
with open(file_path, 'w', encoding=encoding) as f:
f.write(str(data))
# 统一数据格式到DataFrame
if isinstance(data, pd.DataFrame):
df = data
else:
df = pd.DataFrame(data if isinstance(data, list) else [data])
if ext in ['csv', 'txt']:
df.to_csv(file_path, encoding=encoding, index=False, **kwargs)
elif ext in ['xls', 'xlsx']:
df.to_excel(file_path, index=False, **kwargs)
elif ext == 'json':
df.to_json(file_path, force_ascii=False, **kwargs)
elif ext == 'parquet':
df.to_parquet(file_path, **kwargs)
else:
with open(file_path, 'w', encoding=encoding) as f:
f.write(str(data))
# 返回成功结果
return self._format_result(
+3 -2
View File
@@ -110,7 +110,7 @@ class MySQLAgent:
time.sleep(1)
def query_to_df(self, sql: str, params: Union[tuple, dict, None] = None,
parse_dates: Union[List[str], bool] = True) -> pd.DataFrame:
parse_dates: Union[List[str], bool] = True,is_print = True) -> pd.DataFrame:
"""执行SQL查询并返回DataFrame(原有逻辑完全保留)"""
try:
self.log.debug("执行SQL查询", sql=sql)
@@ -130,7 +130,8 @@ class MySQLAgent:
# 执行查询
df = pd.read_sql(sql, engine, params=params, parse_dates=parse_dates)
self.log.info("查询执行成功", 行数=len(df))
if is_print:
self.log.info("查询执行成功", 行数=len(df))
return df