# RSS数据处理模块 - 汽车后市场新闻分词和过滤 import pandas as pd import jieba import jieba.posseg as pseg import os import sys from typing import List, Dict, Any, Optional from datetime import datetime # 添加项目根目录到路径 current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) from utils.mysql_agent import MySQLAgent from utils.logger import log from config import Config class RSSDataProcessor: """RSS数据处理器 - 专门处理汽车后市场相关新闻""" def __init__(self): """初始化处理器""" self.log = log.bind(module="RSSDataProcessor") self.db_agent = MySQLAgent(Config.MYSQL_CONFIG) self.table_name = "collector_rss_subscriptions" self.processed_table_name = "processed_rss_data" # 获取项目根目录 current_dir = os.path.dirname(os.path.abspath(__file__)) self.project_root = os.path.dirname(current_dir) # 设置文件路径(相对于项目根目录) self.keywords_file = os.path.join(self.project_root, "processors", "keywords.txt") self.stopwords_file = os.path.join(self.project_root, "processors", "stopwords.txt") # 汽车后市场相关关键词(默认值,实际从文件加载) self.auto_aftermarket_keywords = { '汽车配件', '汽车维修', '汽车保养', '汽车改装', '汽车美容', '汽车装饰', '轮胎', '机油', '刹车片', '火花塞', '滤清器', '蓄电池', '车灯', '保险杠', '车门', '座椅', '方向盘', '仪表盘', '音响', '导航', '汽车用品', '车载设备', '汽车电子', '汽车安全', '汽车保险', '二手车', '汽车交易', '汽车金融', '汽车租赁', '汽车服务', '4S店', '汽修店', '汽车后市场', '汽车产业链', '汽车供应链', '汽车', '车' } # 停用词表(默认值,实际从文件加载) self.stopwords = { '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这', '那', '它', '他', '她', '我们', '你们', '他们', '什么', '怎么', '为什么', '因为', '所以', '但是', '然后', '如果', '虽然', '而且', '或者', '可以', '应该', '必须', '需要', '想要', '希望', '觉得', '认为', '知道', '了解', '明白', '清楚', '简单', '容易', '困难', '重要', '主要', '基本', '一般', '特别', '非常', '十分', '相当', '比较', '更加', '最', '更', '已经', '正在', '将要', '可能', '也许', '大概', '大约', '左右', '上下', '今天', '明天', '昨天', '现在', '以前', '以后', '时候', '时间', '地方', '这里', '那里', '这样', '那样', '如此', '这样', '那样', '如何', '怎样' } # 缓存关键词,避免重复加载 self._cached_keywords = None self.log.info("RSS数据处理器初始化完成") def load_keywords(self, keywords_file: Optional[str] = None) -> set: """从文件加载汽车后市场关键词(带缓存)""" # 如果已经缓存,直接返回 if self._cached_keywords is not None: return self._cached_keywords # 使用默认路径(项目根目录下的文件) if keywords_file is None: keywords_file = self.keywords_file keywords = set() try: if os.path.exists(keywords_file): with open(keywords_file, 'r', encoding='utf-8') as f: keywords = set(line.strip() for line in f if line.strip()) self.log.info(f"成功加载汽车后市场关键词,共 {len(keywords)} 个") else: self.log.warning(f"关键词文件不存在: {keywords_file}") # 使用默认关键词 keywords = self.auto_aftermarket_keywords except Exception as e: self.log.error(f"加载关键词失败: {str(e)}") keywords = self.auto_aftermarket_keywords # 缓存关键词 self._cached_keywords = keywords return keywords def load_rss_data(self, limit: int = 1000) -> pd.DataFrame: """从数据库加载未处理的RSS数据""" try: sql = f""" SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接 FROM {self.table_name} WHERE 是否已处理 = 0 ORDER BY 发布时间 DESC LIMIT %s """ df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False) self.log.info(f"成功加载 {len(df)} 条未处理的RSS数据") return df except Exception as e: self.log.error(f"加载RSS数据失败: {str(e)}", exc_info=True) return pd.DataFrame() def mark_as_processed(self, ids: List[int]) -> bool: """标记指定ID的数据为已处理""" if not ids: return True try: # 将ID列表转换为字符串格式用于SQL IN语句 id_placeholders = ','.join(['%s'] * len(ids)) sql = f""" UPDATE {self.table_name} SET 是否已处理 = 1 WHERE id IN ({id_placeholders}) """ result = self.db_agent.execute_sql(sql, params=ids) self.log.info(f"成功标记 {len(ids)} 条数据为已处理") return True except Exception as e: self.log.error(f"标记数据为已处理失败: {str(e)}", exc_info=True) return False def load_stopwords(self, stopwords_file: Optional[str] = None) -> set: """加载停用词表""" # 使用默认路径(项目根目录下的文件) if stopwords_file is None: stopwords_file = self.stopwords_file try: if os.path.exists(stopwords_file): with open(stopwords_file, 'r', encoding='utf-8') as f: stopwords = set(line.strip() for line in f if line.strip()) self.log.info(f"成功加载停用词表,共 {len(stopwords)} 个词") return stopwords else: self.log.warning(f"停用词文件不存在: {stopwords_file},使用默认停用词") return self.stopwords except Exception as e: self.log.error(f"加载停用词表失败: {str(e)}") return self.stopwords def add_custom_dict(self, custom_dict_file: Optional[str] = None): """添加自定义词典""" if custom_dict_file and os.path.exists(custom_dict_file): try: jieba.load_userdict(custom_dict_file) self.log.info("成功加载自定义词典") except Exception as e: self.log.warning(f"加载自定义词典失败: {str(e)}") # 从文件加载汽车后市场关键词并添加到jieba词典 keywords = self.load_keywords() for keyword in keywords: jieba.add_word(keyword, freq=1000, tag='n') def segment_and_pos(self, text: str, stopwords: set) -> List[str]: """分词并标注词性,过滤停用词""" if not text or pd.isna(text): return [] words = pseg.cut(str(text)) result = [] # 汽车后市场相关的词性标签 allowed_flags = {'n', 'vn', 'np', 'ns', 'nr', 'nt'} # 名词、动词、动名词、名词短语、处所词、人名、机构名 for word, flag in words: word = word.strip() if (len(word) >= 1 and word not in stopwords and flag in allowed_flags and not word.isdigit()): # 过滤纯数字 result.append(word) return result def is_auto_aftermarket_related(self, text: str) -> bool: """判断文本是否与汽车后市场相关""" if not text: return False text_lower = str(text).lower() # 从文件加载关键词 keywords = self.load_keywords() # 检查是否包含汽车后市场关键词 for keyword in keywords: if keyword in text_lower: return True # 检查分词结果中是否包含相关词汇 words = self.segment_and_pos(text, self.stopwords) for word in words: if word in keywords: return True return False def process_dataframe(self, df: pd.DataFrame, stopwords: set) -> pd.DataFrame: """处理整个DataFrame,进行分词和过滤""" if df.empty: self.log.warning("输入的DataFrame为空") return df # 确保所有文本都是字符串,并处理NaN值 df['文章标题'] = df['文章标题'].fillna('').astype(str) df['文章摘要'] = df['文章摘要'].fillna('').astype(str) # 合并标题和摘要进行分词 df['combined_text'] = df['文章标题'] + ' ' + df['文章摘要'] # 分词处理 df['segmented_words'] = df['combined_text'].apply(lambda x: self.segment_and_pos(x, stopwords)) # 判断是否与汽车后市场相关(只要出现关键词就入库) df['is_auto_related'] = df['combined_text'].apply(self.is_auto_aftermarket_related) df['is_filtered'] = df['is_auto_related'] # 添加处理时间 df['processed_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.log.info(f"数据处理完成,共处理 {len(df)} 条记录") return df def filter_auto_aftermarket_news(self, df: pd.DataFrame) -> pd.DataFrame: """过滤出汽车后市场相关的新闻""" if df.empty: return df # 过滤出包含关键词的文章 filtered_df = df[df['is_filtered'] == True].copy() self.log.info(f"过滤出 {len(filtered_df)} 条汽车后市场相关新闻") return filtered_df def save_to_database(self, df: pd.DataFrame) -> bool: """保存处理结果到数据库""" if df.empty: self.log.warning("没有数据需要保存") return False try: # 准备保存的数据 save_df = df[['文章标题', '文章摘要', '发布时间', '来源URL', '文章链接', 'segmented_words', 'is_auto_related', 'processed_time']].copy() # 将分词结果转换为字符串 save_df['分词结果'] = save_df['segmented_words'].apply(lambda x: ' '.join(x)) # 重命名列名为中文 save_df = save_df.rename(columns={ 'is_auto_related': '是否汽车相关', 'processed_time': '处理时间' }) # 删除不需要的列 save_df = save_df.drop('segmented_words', axis=1) # 检查目标表是否存在,不存在则创建 if not self.db_agent.table_exists(self.processed_table_name): self.create_processed_table() # 插入数据 inserted_rows = self.db_agent.insert_from_df( table_name=self.processed_table_name, df=save_df, ignore_duplicates=True ) self.log.info(f"成功保存 {inserted_rows} 条处理结果到数据库") return True except Exception as e: self.log.error(f"保存到数据库失败: {str(e)}", exc_info=True) return False def create_processed_table(self): """创建处理结果表""" create_sql = f""" CREATE TABLE IF NOT EXISTS {self.processed_table_name} ( id INT AUTO_INCREMENT PRIMARY KEY, 文章标题 TEXT, 文章摘要 TEXT, 发布时间 DATETIME, 来源URL VARCHAR(1024), 文章链接 VARCHAR(1024), 分词结果 TEXT, 是否汽车相关 BOOLEAN, 处理时间 DATETIME, 创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ try: self.db_agent.execute_sql(create_sql) self.log.info(f"成功创建处理结果表: {self.processed_table_name}") except Exception as e: self.log.error(f"创建表失败: {str(e)}", exc_info=True) raise def get_processing_statistics(self, df: pd.DataFrame) -> Dict[str, Any]: """获取处理统计信息""" if df.empty: return {} total_count = len(df) filtered_count = len(df[df['is_filtered'] == True]) stats = { 'total_articles': total_count, 'filtered_articles': filtered_count, 'filter_rate': filtered_count / total_count if total_count > 0 else 0, 'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } return stats def process_rss_data(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]: """处理RSS数据的主函数""" try: self.log.info("开始处理RSS数据...") # 1. 加载RSS数据 df = self.load_rss_data(limit) if df.empty: self.log.warning("没有加载到RSS数据") return {'success': False, 'message': '没有数据可处理'} # 2. 加载停用词表 stopwords = self.load_stopwords() # 3. 添加自定义词典 self.add_custom_dict() # 4. 处理数据 processed_df = self.process_dataframe(df, stopwords) # 5. 过滤汽车后市场相关新闻 filtered_df = self.filter_auto_aftermarket_news(processed_df) # 6. 获取统计信息 stats = self.get_processing_statistics(processed_df) # 7. 保存到数据库 if save_to_db and not filtered_df.empty: save_success = self.save_to_database(filtered_df) stats['save_success'] = save_success # 8. 标记数据为已处理 if not df.empty and 'id' in df.columns: processed_ids = df['id'].tolist() mark_success = self.mark_as_processed(processed_ids) stats['mark_success'] = mark_success if not mark_success: self.log.warning("部分数据标记为已处理失败") # 9. 输出结果 self.log.info("RSS数据处理完成", **stats) return { 'success': True, 'message': 'RSS数据处理完成', 'statistics': stats, 'filtered_data': filtered_df } except Exception as e: self.log.error(f"RSS数据处理失败: {str(e)}", exc_info=True) return {'success': False, 'message': f'处理失败: {str(e)}'} def main(): """主函数入口""" try: # 创建处理器实例 processor = RSSDataProcessor() # 处理RSS数据 result = processor.process_rss_data( limit=5000, # 处理最近5000条数据 save_to_db=True # 保存到数据库 ) if result['success']: print("RSS数据处理完成!") print(f"处理统计: {result['statistics']}") else: print(f"处理失败: {result['message']}") except Exception as e: print(f"程序运行出错: {str(e)}") if __name__ == "__main__": main()