432 lines
19 KiB
Python
432 lines
19 KiB
Python
# RSS数据处理模块 - 汽车后市场新闻分词和过滤
|
|
import pandas as pd
|
|
import jieba
|
|
import jieba.posseg as pseg
|
|
import os
|
|
import sys
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
|
|
# 添加项目根目录到路径
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.dirname(current_dir)
|
|
if parent_dir not in sys.path:
|
|
sys.path.insert(0, parent_dir)
|
|
|
|
from utils.mysql_agent import MySQLAgent
|
|
from utils.logger import log
|
|
from config import Config
|
|
|
|
class RSSDataProcessor:
|
|
"""RSS数据处理器 - 专门处理汽车后市场相关新闻"""
|
|
|
|
def __init__(self):
|
|
"""初始化处理器"""
|
|
self.log = log.bind(module="RSSDataProcessor")
|
|
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
|
|
self.table_name = "collector_rss_subscriptions"
|
|
self.processed_table_name = "processed_rss_data"
|
|
|
|
# 获取项目根目录
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
self.project_root = os.path.dirname(current_dir)
|
|
|
|
# 设置文件路径(相对于项目根目录)
|
|
self.keywords_file = os.path.join(self.project_root, "processors", "keywords.txt")
|
|
self.stopwords_file = os.path.join(self.project_root, "processors", "stopwords.txt")
|
|
|
|
# 汽车后市场相关关键词(默认值,实际从文件加载)
|
|
self.auto_aftermarket_keywords = {
|
|
'汽车配件', '汽车维修', '汽车保养', '汽车改装', '汽车美容', '汽车装饰',
|
|
'轮胎', '机油', '刹车片', '火花塞', '滤清器', '蓄电池', '车灯',
|
|
'保险杠', '车门', '座椅', '方向盘', '仪表盘', '音响', '导航',
|
|
'汽车用品', '车载设备', '汽车电子', '汽车安全', '汽车保险',
|
|
'二手车', '汽车交易', '汽车金融', '汽车租赁', '汽车服务',
|
|
'4S店', '汽修店', '汽车后市场', '汽车产业链', '汽车供应链', '汽车', '车'
|
|
}
|
|
|
|
# 停用词表(默认值,实际从文件加载)
|
|
self.stopwords = {
|
|
'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个',
|
|
'上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好',
|
|
'自己', '这', '那', '它', '他', '她', '我们', '你们', '他们', '什么', '怎么',
|
|
'为什么', '因为', '所以', '但是', '然后', '如果', '虽然', '而且', '或者',
|
|
'可以', '应该', '必须', '需要', '想要', '希望', '觉得', '认为', '知道',
|
|
'了解', '明白', '清楚', '简单', '容易', '困难', '重要', '主要', '基本',
|
|
'一般', '特别', '非常', '十分', '相当', '比较', '更加', '最', '更',
|
|
'已经', '正在', '将要', '可能', '也许', '大概', '大约', '左右', '上下',
|
|
'今天', '明天', '昨天', '现在', '以前', '以后', '时候', '时间', '地方',
|
|
'这里', '那里', '这样', '那样', '如此', '这样', '那样', '如何', '怎样'
|
|
}
|
|
|
|
# 缓存关键词,避免重复加载
|
|
self._cached_keywords = None
|
|
|
|
self.log.info("RSS数据处理器初始化完成")
|
|
|
|
def load_keywords(self, keywords_file: Optional[str] = None) -> set:
|
|
"""从文件加载汽车后市场关键词(带缓存)"""
|
|
# 如果已经缓存,直接返回
|
|
if self._cached_keywords is not None:
|
|
return self._cached_keywords
|
|
|
|
# 使用默认路径(项目根目录下的文件)
|
|
if keywords_file is None:
|
|
keywords_file = self.keywords_file
|
|
|
|
keywords = set()
|
|
try:
|
|
if os.path.exists(keywords_file):
|
|
with open(keywords_file, 'r', encoding='utf-8') as f:
|
|
keywords = set(line.strip() for line in f if line.strip())
|
|
self.log.info(f"成功加载汽车后市场关键词,共 {len(keywords)} 个")
|
|
else:
|
|
self.log.warning(f"关键词文件不存在: {keywords_file}")
|
|
# 使用默认关键词
|
|
keywords = self.auto_aftermarket_keywords
|
|
except Exception as e:
|
|
self.log.error(f"加载关键词失败: {str(e)}")
|
|
keywords = self.auto_aftermarket_keywords
|
|
|
|
# 缓存关键词
|
|
self._cached_keywords = keywords
|
|
return keywords
|
|
|
|
def load_rss_data(self, limit: int = 1000) -> pd.DataFrame:
|
|
"""从数据库加载未处理的RSS数据"""
|
|
try:
|
|
sql = f"""
|
|
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
|
|
FROM {self.table_name}
|
|
WHERE 是否已处理 = 0
|
|
ORDER BY 发布时间 DESC
|
|
LIMIT %s
|
|
"""
|
|
|
|
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
|
|
self.log.info(f"成功加载 {len(df)} 条未处理的RSS数据")
|
|
return df
|
|
|
|
except Exception as e:
|
|
self.log.error(f"加载RSS数据失败: {str(e)}", exc_info=True)
|
|
return pd.DataFrame()
|
|
|
|
def mark_as_processed(self, ids: List[int]) -> bool:
|
|
"""标记指定ID的数据为已处理"""
|
|
if not ids:
|
|
return True
|
|
|
|
try:
|
|
# 将ID列表转换为字符串格式用于SQL IN语句
|
|
id_placeholders = ','.join(['%s'] * len(ids))
|
|
sql = f"""
|
|
UPDATE {self.table_name}
|
|
SET 是否已处理 = 1
|
|
WHERE id IN ({id_placeholders})
|
|
"""
|
|
|
|
result = self.db_agent.execute_sql(sql, params=ids)
|
|
self.log.info(f"成功标记 {len(ids)} 条数据为已处理")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log.error(f"标记数据为已处理失败: {str(e)}", exc_info=True)
|
|
return False
|
|
|
|
def load_stopwords(self, stopwords_file: Optional[str] = None) -> set:
|
|
"""加载停用词表"""
|
|
# 使用默认路径(项目根目录下的文件)
|
|
if stopwords_file is None:
|
|
stopwords_file = self.stopwords_file
|
|
|
|
try:
|
|
if os.path.exists(stopwords_file):
|
|
with open(stopwords_file, 'r', encoding='utf-8') as f:
|
|
stopwords = set(line.strip() for line in f if line.strip())
|
|
self.log.info(f"成功加载停用词表,共 {len(stopwords)} 个词")
|
|
return stopwords
|
|
else:
|
|
self.log.warning(f"停用词文件不存在: {stopwords_file},使用默认停用词")
|
|
return self.stopwords
|
|
except Exception as e:
|
|
self.log.error(f"加载停用词表失败: {str(e)}")
|
|
return self.stopwords
|
|
|
|
def add_custom_dict(self, custom_dict_file: Optional[str] = None):
|
|
"""添加自定义词典"""
|
|
if custom_dict_file and os.path.exists(custom_dict_file):
|
|
try:
|
|
jieba.load_userdict(custom_dict_file)
|
|
self.log.info("成功加载自定义词典")
|
|
except Exception as e:
|
|
self.log.warning(f"加载自定义词典失败: {str(e)}")
|
|
|
|
# 从文件加载汽车后市场关键词并添加到jieba词典
|
|
keywords = self.load_keywords()
|
|
for keyword in keywords:
|
|
jieba.add_word(keyword, freq=1000, tag='n')
|
|
|
|
def segment_and_pos(self, text: str, stopwords: set) -> List[str]:
|
|
"""分词并标注词性,过滤停用词"""
|
|
if not text or pd.isna(text):
|
|
return []
|
|
|
|
words = pseg.cut(str(text))
|
|
result = []
|
|
# 汽车后市场相关的词性标签
|
|
allowed_flags = {'n', 'vn', 'np', 'ns', 'nr', 'nt'} # 名词、动词、动名词、名词短语、处所词、人名、机构名
|
|
|
|
for word, flag in words:
|
|
word = word.strip()
|
|
if (len(word) >= 1 and
|
|
word not in stopwords and
|
|
flag in allowed_flags and
|
|
not word.isdigit()): # 过滤纯数字
|
|
result.append(word)
|
|
|
|
return result
|
|
|
|
def is_auto_aftermarket_related(self, text: str) -> bool:
|
|
"""判断文本是否与汽车后市场相关"""
|
|
if not text:
|
|
return False
|
|
|
|
text_lower = str(text).lower()
|
|
|
|
# 从文件加载关键词
|
|
keywords = self.load_keywords()
|
|
|
|
# 检查是否包含汽车后市场关键词
|
|
for keyword in keywords:
|
|
if keyword in text_lower:
|
|
return True
|
|
|
|
# 检查分词结果中是否包含相关词汇
|
|
words = self.segment_and_pos(text, self.stopwords)
|
|
for word in words:
|
|
if word in keywords:
|
|
return True
|
|
|
|
return False
|
|
|
|
def process_dataframe(self, df: pd.DataFrame, stopwords: set) -> pd.DataFrame:
|
|
"""处理整个DataFrame,进行分词和过滤"""
|
|
if df.empty:
|
|
self.log.warning("输入的DataFrame为空")
|
|
return df
|
|
|
|
# 确保所有文本都是字符串,并处理NaN值
|
|
df['文章标题'] = df['文章标题'].fillna('').astype(str)
|
|
df['文章摘要'] = df['文章摘要'].fillna('').astype(str)
|
|
|
|
# 合并标题和摘要进行分词
|
|
df['combined_text'] = df['文章标题'] + ' ' + df['文章摘要']
|
|
|
|
# 分词处理
|
|
df['segmented_words'] = df['combined_text'].apply(lambda x: self.segment_and_pos(x, stopwords))
|
|
|
|
# 判断是否与汽车后市场相关(只要出现关键词就入库)
|
|
df['is_auto_related'] = df['combined_text'].apply(self.is_auto_aftermarket_related)
|
|
df['is_filtered'] = df['is_auto_related']
|
|
|
|
# 添加处理时间
|
|
df['processed_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
self.log.info(f"数据处理完成,共处理 {len(df)} 条记录")
|
|
return df
|
|
|
|
def filter_auto_aftermarket_news(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""过滤出汽车后市场相关的新闻"""
|
|
if df.empty:
|
|
return df
|
|
|
|
# 过滤出包含关键词的文章
|
|
filtered_df = df[df['is_filtered'] == True].copy()
|
|
|
|
self.log.info(f"过滤出 {len(filtered_df)} 条汽车后市场相关新闻")
|
|
return filtered_df
|
|
|
|
def save_to_database(self, df: pd.DataFrame) -> bool:
|
|
"""保存处理结果到数据库"""
|
|
if df.empty:
|
|
self.log.warning("没有数据需要保存")
|
|
return False
|
|
|
|
try:
|
|
# 准备保存的数据
|
|
save_df = df[['文章标题', '文章摘要', '发布时间', '来源URL', '文章链接',
|
|
'segmented_words', 'is_auto_related', 'processed_time']].copy()
|
|
|
|
# 将分词结果转换为字符串
|
|
save_df['分词结果'] = save_df['segmented_words'].apply(lambda x: ' '.join(x))
|
|
|
|
# 重命名列名为中文
|
|
save_df = save_df.rename(columns={
|
|
'is_auto_related': '是否汽车相关',
|
|
'processed_time': '处理时间'
|
|
})
|
|
|
|
# 删除不需要的列
|
|
save_df = save_df.drop('segmented_words', axis=1)
|
|
|
|
# 检查目标表是否存在,不存在则创建
|
|
# 注意:如果连接失败,table_exists可能返回False,需要捕获异常
|
|
try:
|
|
table_exists = self.db_agent.table_exists(self.processed_table_name)
|
|
if not table_exists:
|
|
self.log.warning(f"表 {self.processed_table_name} 不存在,正在创建...")
|
|
self.create_processed_table()
|
|
else:
|
|
# 表存在时,也确保有唯一索引(安全操作,不会删除数据)
|
|
self.create_processed_table() # 这个方法会检查并添加索引,不会删除数据
|
|
except Exception as table_check_error:
|
|
# 如果检查表存在性时连接失败,记录错误但不中断
|
|
# 因为后续的插入操作会再次尝试连接
|
|
self.log.warning(f"检查表存在性时出错(可能是连接问题): {str(table_check_error)}")
|
|
# 尝试创建表(如果表已存在,CREATE TABLE IF NOT EXISTS不会报错)
|
|
try:
|
|
self.create_processed_table()
|
|
except Exception as create_error:
|
|
# 如果创建表也失败(可能是连接问题),记录错误
|
|
self.log.error(f"创建表时出错(可能是连接问题): {str(create_error)}")
|
|
# 继续尝试插入,如果表存在,插入会成功;如果表不存在,插入会失败并抛出异常
|
|
|
|
# 插入数据(ignore_duplicates=True 会跳过重复的文章链接)
|
|
# 注意:INSERT INTO + ignore_duplicates 只会跳过重复记录,不会覆盖或删除现有数据
|
|
# 如果数据库连接失败,此操作会抛出异常,不会部分成功
|
|
inserted_rows = self.db_agent.insert_from_df(
|
|
table_name=self.processed_table_name,
|
|
df=save_df,
|
|
ignore_duplicates=True # 跳过重复的文章链接,不会删除或覆盖现有数据
|
|
)
|
|
|
|
self.log.info(f"成功保存 {inserted_rows} 条处理结果到数据库")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.log.error(f"保存到数据库失败: {str(e)}", exc_info=True)
|
|
return False
|
|
|
|
def create_processed_table(self):
|
|
"""
|
|
创建处理结果表(带唯一索引保护,防止重复插入)
|
|
使用 MySQLAgent 的安全方法,确保不会删除现有数据
|
|
"""
|
|
create_sql = f"""
|
|
CREATE TABLE IF NOT EXISTS {self.processed_table_name} (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
文章标题 TEXT,
|
|
文章摘要 TEXT,
|
|
发布时间 DATETIME,
|
|
来源URL VARCHAR(1024),
|
|
文章链接 VARCHAR(1024),
|
|
分词结果 TEXT,
|
|
是否汽车相关 BOOLEAN,
|
|
处理时间 DATETIME,
|
|
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
|
|
"""
|
|
|
|
try:
|
|
# 使用安全方法创建表(如果不存在)
|
|
self.db_agent.create_table_if_not_exists(
|
|
table_name=self.processed_table_name,
|
|
create_sql=create_sql
|
|
)
|
|
|
|
# 使用安全方法添加唯一索引(如果不存在)
|
|
# 注意:唯一索引在创建表时不能直接包含,因为如果表已存在会报错
|
|
# 所以先创建表,再单独添加索引
|
|
self.db_agent.add_unique_index_if_not_exists(
|
|
table_name=self.processed_table_name,
|
|
index_name='uk_article_link',
|
|
column_name='文章链接',
|
|
column_length=500,
|
|
check_duplicates=True
|
|
)
|
|
|
|
except Exception as e:
|
|
# 如果创建表或添加索引失败(可能是连接问题),抛出异常
|
|
# 这样上层调用可以知道操作失败,不会误以为成功
|
|
self.log.error(f"创建/检查表失败(可能是数据库连接问题): {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
def get_processing_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""获取处理统计信息"""
|
|
if df.empty:
|
|
return {}
|
|
|
|
total_count = len(df)
|
|
filtered_count = len(df[df['is_filtered'] == True])
|
|
|
|
stats = {
|
|
'total_articles': total_count,
|
|
'filtered_articles': filtered_count,
|
|
'filter_rate': filtered_count / total_count if total_count > 0 else 0,
|
|
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
return stats
|
|
|
|
def process_rss_data(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]:
|
|
"""处理RSS数据的主函数"""
|
|
try:
|
|
self.log.info("开始处理RSS数据...")
|
|
|
|
# 1. 加载RSS数据
|
|
df = self.load_rss_data(limit)
|
|
if df.empty:
|
|
self.log.warning("没有加载到RSS数据")
|
|
return {'success': False, 'message': '没有数据可处理'}
|
|
|
|
# 2. 加载停用词表
|
|
stopwords = self.load_stopwords()
|
|
|
|
# 3. 添加自定义词典
|
|
self.add_custom_dict()
|
|
|
|
# 4. 处理数据
|
|
processed_df = self.process_dataframe(df, stopwords)
|
|
|
|
# 5. 过滤汽车后市场相关新闻
|
|
filtered_df = self.filter_auto_aftermarket_news(processed_df)
|
|
|
|
# 6. 获取统计信息
|
|
stats = self.get_processing_statistics(processed_df)
|
|
|
|
# 7. 保存到数据库
|
|
if save_to_db and not filtered_df.empty:
|
|
save_success = self.save_to_database(filtered_df)
|
|
stats['save_success'] = save_success
|
|
|
|
# 8. 标记数据为已处理
|
|
if not df.empty and 'id' in df.columns:
|
|
processed_ids = df['id'].tolist()
|
|
mark_success = self.mark_as_processed(processed_ids)
|
|
stats['mark_success'] = mark_success
|
|
if not mark_success:
|
|
self.log.warning("部分数据标记为已处理失败")
|
|
|
|
# 9. 输出结果
|
|
self.log.info("RSS数据处理完成", **stats)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'RSS数据处理完成',
|
|
'statistics': stats,
|
|
'filtered_data': filtered_df
|
|
}
|
|
|
|
except Exception as e:
|
|
self.log.error(f"RSS数据处理失败: {str(e)}", exc_info=True)
|
|
return {'success': False, 'message': f'处理失败: {str(e)}'}
|
|
|
|
|
|
def main(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]:
|
|
"""主函数入口(实例方法),对外统一调用"""
|
|
return self.process_rss_data(limit=limit, save_to_db=save_to_db)
|
|
|
|
if __name__ == "__main__":
|
|
RSSDataProcessor().main(limit=5000, save_to_db=True)
|