ai提取rss相关数据

This commit is contained in:
z66
2025-10-28 13:43:06 +08:00
parent e1db06dd79
commit c5f6e8288d
8 changed files with 53336 additions and 40 deletions
@@ -0,0 +1,453 @@
# RSS数据AI处理模块
import os
import sys
import json
import time
import pandas as pd
from typing import List, Dict, Any, Optional
from datetime import datetime
from openai import OpenAI
# 添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from utils.mysql_agent import MySQLAgent
from utils.logger import log
from config import Config
class RSSDataAIProcessor:
"""RSS数据AI处理主类
负责:
- 从数据库加载未处理的RSS数据
- 调用AI进行分析
- 保存分析结果
- 更新处理状态
"""
def __init__(self):
"""初始化AI处理器"""
self.log = log.bind(module="RSSDataAIProcessor")
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
# 从Config读取配置
self.source_table = Config.AI_PROCESSOR_CONFIG['source_table']
self.ai_table = Config.AI_PROCESSOR_CONFIG['result_table']
self.default_batch_size = Config.AI_PROCESSOR_CONFIG['batch_size']
self.default_delay = Config.AI_PROCESSOR_CONFIG['delay']
# 初始化百度千帆API客户端
self.api_key = Config.BAIDU_AI_CONFIG.get('api_key')
if self.api_key:
self.ai_client = OpenAI(
base_url='https://qianfan.baidubce.com/v2',
api_key=self.api_key
)
self.model = Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k')
self.log.info("RSS数据AI处理器初始化完成")
else:
self.ai_client = None
self.log.warning("百度AI未配置,AI处理功能将不可用")
self.log.warning("请在config.py中配置 BAIDU_AI_CONFIG['api_key']")
def is_configured(self) -> bool:
"""检查是否已配置API"""
return self.ai_client is not None
def main(self, batch_size: Optional[int] = 200, delay: Optional[float] = None) -> Dict[str, Any]:
"""主程序:批量处理RSS数据的完整流程
Args:
batch_size: 批量处理的记录数,None则使用配置的默认值
delay: 每条记录之间的延迟(秒),None则使用配置的默认值
Returns:
dict: 处理结果统计信息
"""
# 使用传入参数或默认配置
batch_size = batch_size or self.default_batch_size
delay = delay or self.default_delay
try:
# 1. 检查配置
if not self.is_configured():
error_msg = "百度AI未配置,请在config.py中配置 BAIDU_AI_CONFIG['api_key']"
self.log.error(error_msg)
return {
'success': False,
'message': error_msg,
'processed_count': 0,
'failed_count': 0
}
self.log.info(f"开始批量处理数据,批次大小: {batch_size}, 延迟: {delay}")
# 2. 准备数据库表结构
self.ensure_ai_processed_column()
if not self.db_agent.table_exists(self.ai_table):
self.create_ai_result_table()
# 3. 加载未处理的数据
df = self.load_unprocessed_data(batch_size)
if df.empty:
self.log.info("没有需要处理的数据")
return {
'success': True,
'message': '没有需要处理的数据',
'processed_count': 0,
'failed_count': 0
}
# 4. 处理每条记录
results = []
processed_ids = []
failed_count = 0
for idx, record in df.iterrows():
try:
self.log.debug(f"处理记录 {record['id']} ({idx + 1}/{len(df)})")
result = self.process_single_record(record.to_dict())
if result:
results.append(result)
processed_ids.append(record['id'])
else:
failed_count += 1
# 延迟,避免API限流
if delay > 0 and idx < len(df) - 1:
time.sleep(delay)
except Exception as e:
self.log.error(f"处理记录 {record['id']} 异常: {str(e)}", exc_info=True)
failed_count += 1
# 5. 保存结果
saved_count = 0
if results:
saved_count = self.save_ai_results(results)
# 6. 标记为已处理
if processed_ids:
self.mark_as_processed(processed_ids)
# 7. 返回统计信息
stats = {
'success': True,
'message': 'AI处理完成',
'total_count': len(df),
'processed_count': len(processed_ids),
'saved_count': saved_count,
'failed_count': failed_count,
'relevant_count': sum(1 for r in results if r.get('是否相关')),
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.log.info("批量处理完成", **stats)
return stats
except Exception as e:
error_msg = f"批量处理失败: {str(e)}"
self.log.error(error_msg, exc_info=True)
return {
'success': False,
'message': error_msg,
'processed_count': 0,
'failed_count': 0
}
def ensure_ai_processed_column(self):
"""确保processed_rss_data表有"是否ai处理"字段"""
try:
# 检查字段是否存在
check_sql = """
SELECT COUNT(*) as count
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
AND COLUMN_NAME = '是否ai处理'
"""
result = self.db_agent.execute_sql(
check_sql,
params=(Config.MYSQL_CONFIG['database'], self.source_table),
fetch=True
)
if result[0][0] == 0:
# 字段不存在,添加字段
alter_sql = f"""
ALTER TABLE {self.source_table}
ADD COLUMN 是否ai处理 TINYINT(1) DEFAULT 0 COMMENT 'AI处理标记:0-未处理,1-已处理'
"""
self.db_agent.execute_sql(alter_sql)
self.log.info(f"成功为表 {self.source_table} 添加 '是否ai处理' 字段")
else:
self.log.debug(f"{self.source_table} 已存在 '是否ai处理' 字段")
except Exception as e:
self.log.error(f"检查/添加字段失败: {str(e)}", exc_info=True)
raise
def create_ai_result_table(self):
"""创建AI处理结果表"""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.ai_table} (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
source_id INT NOT NULL COMMENT '来源数据IDprocessed_rss_data.id',
文章标题 TEXT COMMENT '文章标题',
文章摘要 TEXT COMMENT '文章摘要',
发布时间 DATETIME COMMENT '发布时间',
来源URL VARCHAR(1024) COMMENT '来源URL',
文章链接 VARCHAR(1024) COMMENT '文章链接',
是否相关 BOOLEAN COMMENT 'AI判断是否与汽车后市场相关',
相关度评分 INT COMMENT '相关度评分(0-100',
标签 TEXT COMMENT 'AI生成的标签(JSON数组)',
分类 VARCHAR(100) COMMENT 'AI判断的主要分类',
分析说明 TEXT COMMENT 'AI分析说明',
处理时间 DATETIME COMMENT 'AI处理时间',
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
INDEX idx_source_id (source_id),
INDEX idx_是否相关 (是否相关),
INDEX idx_分类 (分类),
INDEX idx_处理时间 (处理时间)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='RSS数据AI分析结果表'
"""
try:
self.db_agent.execute_sql(create_sql)
self.log.info(f"成功创建AI结果表: {self.ai_table}")
except Exception as e:
self.log.error(f"创建AI结果表失败: {str(e)}", exc_info=True)
raise
def load_unprocessed_data(self, limit: int = 100) -> pd.DataFrame:
"""加载未经AI处理的数据
Args:
limit: 每次处理的记录数量
Returns:
未处理的数据DataFrame
"""
try:
sql = f"""
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
FROM {self.source_table}
WHERE 是否ai处理 = 0 OR 是否ai处理 IS NULL
ORDER BY 创建时间 DESC
LIMIT %s
"""
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
self.log.info(f"成功加载 {len(df)} 条未处理的数据")
return df
except Exception as e:
self.log.error(f"加载未处理数据失败: {str(e)}", exc_info=True)
return pd.DataFrame()
def analyze_news(self, title: str, summary: str) -> Dict[str, Any]:
"""调用AI分析新闻(保留原有提示词)"""
# 构建提示词(保留原有格式)
prompt = f"""分析以下新闻是否与汽车后市场相关,返回JSON格式:
标题:{title}
摘要:{summary}
返回格式:
{{
"is_relevant": true/false,
"relevance_score": 0-100,
"tags": ["标签1", "标签2"],
"category": "分类(配件/维修/保养/改装/美容/装饰/二手车/金融/保险/其他)",
"analysis": "简要说明"
}}
注意:只返回JSON格式的结果,不要包含其他说明文字。"""
try:
# 调用百度千帆API
response = self.ai_client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": prompt
}]
)
# 获取响应内容
raw_content = response.choices[0].message.content
# 解析JSON(处理markdown包裹)
if '```json' in raw_content:
json_str = raw_content.split('```json')[1].split('```')[0].strip()
elif '```' in raw_content:
json_str = raw_content.split('```')[1].split('```')[0].strip()
else:
json_str = raw_content.strip()
result = json.loads(json_str)
# 补充缺失字段
return {
'is_relevant': result.get('is_relevant', False),
'relevance_score': result.get('relevance_score', 0),
'tags': result.get('tags', []),
'category': result.get('category', '其他'),
'analysis': result.get('analysis', '')
}
except json.JSONDecodeError as e:
self.log.warning(f"JSON解析失败: {str(e)}, 原始响应: {raw_content[:200]}")
return {
'is_relevant': False,
'relevance_score': 0,
'tags': [],
'category': '其他',
'analysis': f"解析失败: {raw_content[:100]}"
}
except Exception as e:
self.log.error(f"AI调用异常: {str(e)}", exc_info=True)
return {
'is_relevant': False,
'relevance_score': 0,
'tags': [],
'category': '其他',
'analysis': f"处理异常: {str(e)}"
}
def process_single_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理单条记录
Args:
record: 记录字典
Returns:
处理结果字典
"""
if not self.is_configured():
self.log.error("AI客户端未配置,无法处理数据")
return None
try:
title = str(record.get('文章标题', '')).strip()
summary = str(record.get('文章摘要', '')).strip()
if not title and not summary:
self.log.warning(f"记录 {record.get('id')} 标题和摘要均为空,跳过处理")
return None
# 调用AI分析
analysis_result = self.analyze_news(title, summary)
# 构建结果记录
result = {
'source_id': record['id'],
'文章标题': title,
'文章摘要': summary,
'发布时间': record.get('发布时间'),
'来源URL': record.get('来源URL'),
'文章链接': record.get('文章链接'),
'是否相关': analysis_result.get('is_relevant', False),
'相关度评分': analysis_result.get('relevance_score', 0),
'标签': json.dumps(analysis_result.get('tags', []), ensure_ascii=False),
'分类': analysis_result.get('category', '其他'),
'分析说明': analysis_result.get('analysis', ''),
'处理时间': datetime.now()
}
return result
except Exception as e:
self.log.error(f"处理记录 {record.get('id')} 失败: {str(e)}", exc_info=True)
return None
def save_ai_results(self, results: List[Dict[str, Any]]) -> int:
"""保存AI处理结果
Args:
results: 处理结果列表
Returns:
成功保存的记录数
"""
if not results:
return 0
try:
df = pd.DataFrame(results)
inserted = self.db_agent.insert_from_df(
table_name=self.ai_table,
df=df,
ignore_duplicates=True
)
self.log.info(f"成功保存 {inserted} 条AI处理结果")
return inserted
except Exception as e:
self.log.error(f"保存AI处理结果失败: {str(e)}", exc_info=True)
return 0
def mark_as_processed(self, ids: List[int]) -> bool:
"""标记记录为已处理
Args:
ids: 记录ID列表
Returns:
是否成功
"""
if not ids:
return True
try:
id_placeholders = ','.join(['%s'] * len(ids))
sql = f"""
UPDATE {self.source_table}
SET 是否ai处理 = 1
WHERE id IN ({id_placeholders})
"""
self.db_agent.execute_sql(sql, params=ids)
self.log.info(f"成功标记 {len(ids)} 条记录为已处理")
return True
except Exception as e:
self.log.error(f"标记记录为已处理失败: {str(e)}", exc_info=True)
return False
if __name__ == "__main__":
"""命令行直接运行"""
# 实例化处理器并调用main方法
processor = RSSDataAIProcessor()
result = processor.main()
# 输出结果
if result['success']:
print("\n" + "=" * 60)
print("✓ AI处理完成")
print("=" * 60)
print(f"总记录数: {result.get('total_count', 0)}")
print(f"成功处理: {result.get('processed_count', 0)}")
print(f"保存记录: {result.get('saved_count', 0)}")
print(f"失败记录: {result.get('failed_count', 0)}")
print(f"相关记录: {result.get('relevant_count', 0)}")
print(f"处理时间: {result.get('processing_time', '')}")
print("=" * 60 + "\n")
else:
print("\n" + "=" * 60)
print("✗ 处理失败")
print("=" * 60)
print(f"错误信息: {result['message']}")
print("\n提示: 请设置环境变量")
print(" Windows: $env:BAIDU_API_KEY = 'your_key'")
print(" Linux/Mac: export BAIDU_API_KEY='your_key'")
print("=" * 60 + "\n")