diff --git a/processors/ai_processors/ai_processor_rss_data.py b/processors/ai_processors/ai_processor_rss_data.py index 1d43d8f..98475d8 100644 --- a/processors/ai_processors/ai_processor_rss_data.py +++ b/processors/ai_processors/ai_processor_rss_data.py @@ -195,7 +195,7 @@ class RSSDataAIProcessor: raise def create_ai_result_table(self): - """创建AI处理结果表""" + """创建AI处理结果表(使用安全方法,确保不会删除现有数据)""" create_sql = f""" CREATE TABLE IF NOT EXISTS {self.ai_table} ( id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', @@ -221,10 +221,13 @@ class RSSDataAIProcessor: """ try: - self.db_agent.execute_sql(create_sql) - self.log.info(f"成功创建AI结果表: {self.ai_table}") + # 使用安全方法创建表(如果不存在),确保不会删除现有数据 + self.db_agent.create_table_if_not_exists( + table_name=self.ai_table, + create_sql=create_sql + ) except Exception as e: - self.log.error(f"创建AI结果表失败: {str(e)}", exc_info=True) + self.log.error(f"创建AI结果表失败(可能是数据库连接问题): {str(e)}", exc_info=True) raise def load_unprocessed_data(self, limit: int = 100) -> pd.DataFrame: diff --git a/processors/processor_rss_data.py b/processors/processor_rss_data.py index 86cfdf1..c4d0b7e 100644 --- a/processors/processor_rss_data.py +++ b/processors/processor_rss_data.py @@ -270,14 +270,34 @@ class RSSDataProcessor: save_df = save_df.drop('segmented_words', axis=1) # 检查目标表是否存在,不存在则创建 - if not self.db_agent.table_exists(self.processed_table_name): - self.create_processed_table() + # 注意:如果连接失败,table_exists可能返回False,需要捕获异常 + try: + table_exists = self.db_agent.table_exists(self.processed_table_name) + if not table_exists: + self.log.warning(f"表 {self.processed_table_name} 不存在,正在创建...") + self.create_processed_table() + else: + # 表存在时,也确保有唯一索引(安全操作,不会删除数据) + self.create_processed_table() # 这个方法会检查并添加索引,不会删除数据 + except Exception as table_check_error: + # 如果检查表存在性时连接失败,记录错误但不中断 + # 因为后续的插入操作会再次尝试连接 + self.log.warning(f"检查表存在性时出错(可能是连接问题): {str(table_check_error)}") + # 尝试创建表(如果表已存在,CREATE TABLE IF NOT EXISTS不会报错) + try: + self.create_processed_table() + except Exception as create_error: + # 如果创建表也失败(可能是连接问题),记录错误 + self.log.error(f"创建表时出错(可能是连接问题): {str(create_error)}") + # 继续尝试插入,如果表存在,插入会成功;如果表不存在,插入会失败并抛出异常 - # 插入数据 + # 插入数据(ignore_duplicates=True 会跳过重复的文章链接) + # 注意:INSERT INTO + ignore_duplicates 只会跳过重复记录,不会覆盖或删除现有数据 + # 如果数据库连接失败,此操作会抛出异常,不会部分成功 inserted_rows = self.db_agent.insert_from_df( table_name=self.processed_table_name, df=save_df, - ignore_duplicates=True + ignore_duplicates=True # 跳过重复的文章链接,不会删除或覆盖现有数据 ) self.log.info(f"成功保存 {inserted_rows} 条处理结果到数据库") @@ -288,7 +308,10 @@ class RSSDataProcessor: return False def create_processed_table(self): - """创建处理结果表""" + """ + 创建处理结果表(带唯一索引保护,防止重复插入) + 使用 MySQLAgent 的安全方法,确保不会删除现有数据 + """ create_sql = f""" CREATE TABLE IF NOT EXISTS {self.processed_table_name} ( id INT AUTO_INCREMENT PRIMARY KEY, @@ -306,10 +329,27 @@ class RSSDataProcessor: """ try: - self.db_agent.execute_sql(create_sql) - self.log.info(f"成功创建处理结果表: {self.processed_table_name}") + # 使用安全方法创建表(如果不存在) + self.db_agent.create_table_if_not_exists( + table_name=self.processed_table_name, + create_sql=create_sql + ) + + # 使用安全方法添加唯一索引(如果不存在) + # 注意:唯一索引在创建表时不能直接包含,因为如果表已存在会报错 + # 所以先创建表,再单独添加索引 + self.db_agent.add_unique_index_if_not_exists( + table_name=self.processed_table_name, + index_name='uk_article_link', + column_name='文章链接', + column_length=500, + check_duplicates=True + ) + except Exception as e: - self.log.error(f"创建表失败: {str(e)}", exc_info=True) + # 如果创建表或添加索引失败(可能是连接问题),抛出异常 + # 这样上层调用可以知道操作失败,不会误以为成功 + self.log.error(f"创建/检查表失败(可能是数据库连接问题): {str(e)}", exc_info=True) raise def get_processing_statistics(self, df: pd.DataFrame) -> Dict[str, Any]: diff --git a/utils/Ding_api.py b/utils/Ding_api.py index aef4c7b..bce0241 100644 --- a/utils/Ding_api.py +++ b/utils/Ding_api.py @@ -27,15 +27,26 @@ class DingAPI(): return token -def send_message(self, message): - data = { - "msgtype": "text", - "text": { - "content": message + def card_create(self, data): + """ + 创建并投放卡片 + return: response(dict) + """ + url = 'https://api.dingtalk.com/v1.0/card/instances/createAndDeliver' + + headers = { + 'x-acs-dingtalk-access-token': data["token"], + 'Content-Type': 'application/json' } - } - headers = { - 'Content-Type': 'application/json' - } - response = requests.post(self.url, json=data, headers=headers) - return response.status_code + + data = { + "cardTemplateId": "cee2715f-001d-41cb-8fcd-3be18be9fbf5.schema", + "outTrackId": "", + "cardData":"", + "openSpaceId":"dtv1.card//IM_GROUP.4210192048793363",# 场域id + } + response = requests.post(url, json=data, headers=headers) + return response.json() + + def get_ + diff --git a/utils/mysql_agent.py b/utils/mysql_agent.py index 530dc67..07caa61 100644 --- a/utils/mysql_agent.py +++ b/utils/mysql_agent.py @@ -153,6 +153,12 @@ class MySQLAgent: """ 兼容旧接口的通用插入方法:保留replace参数,同时支持新的ignore_duplicates 自动处理重复数据,对所有数据源通用,插入失败的数据会通过日志记录 + + 安全性说明: + - 使用 INSERT INTO(不是 REPLACE INTO 或 INSERT ... ON DUPLICATE KEY UPDATE) + - 当 ignore_duplicates=True 时,重复记录会被跳过,不会覆盖或删除现有数据 + - 如果数据库连接失败,操作会抛出异常,不会部分成功 + - 所有操作都是安全的,不会导致数据丢失或覆盖 """ # 【兼容性处理】如果未指定ignore_duplicates,用replace参数推导 if ignore_duplicates is None: @@ -592,6 +598,114 @@ class MySQLAgent: exc_info=True) return False + def create_table_if_not_exists(self, table_name: str, create_sql: str) -> bool: + """ + 创建表(如果不存在) + 使用 CREATE TABLE IF NOT EXISTS,不会删除已存在的表和数据 + + 参数: + table_name: 表名 + create_sql: 完整的 CREATE TABLE SQL 语句(必须包含 IF NOT EXISTS) + + 返回: + bool: 是否成功(表已存在也会返回True) + + 注意: + - 此方法使用 CREATE TABLE IF NOT EXISTS,是安全的,不会删除现有数据 + - 如果连接失败,会抛出异常 + """ + if "IF NOT EXISTS" not in create_sql.upper(): + self.log.warning(f"CREATE TABLE 语句建议使用 IF NOT EXISTS 以保证安全性") + + try: + self.execute_sql(create_sql) + self.log.info(f"成功创建/检查表(表已存在时不会删除数据): {table_name}") + return True + except Exception as e: + self.log.error(f"创建/检查表失败(可能是数据库连接问题): {str(e)}", + table=table_name, exc_info=True) + raise + + def add_unique_index_if_not_exists(self, table_name: str, index_name: str, + column_name: str, column_length: int = 500, + check_duplicates: bool = True) -> bool: + """ + 添加唯一索引(如果不存在) + 不会删除数据,只添加索引 + + 参数: + table_name: 表名 + index_name: 索引名称 + column_name: 要添加索引的列名 + column_length: 索引长度(对于VARCHAR/TEXT类型) + check_duplicates: 是否在添加索引前检查重复数据 + + 返回: + bool: 是否成功添加索引(索引已存在也会返回True) + + 注意: + - 此方法是安全的,不会删除数据 + - 如果表中存在重复数据,会跳过添加索引(不会删除数据) + - 如果连接失败,会抛出异常 + """ + try: + # 1. 检查索引是否已存在 + check_index_sql = f""" + SELECT COUNT(*) as cnt + FROM INFORMATION_SCHEMA.STATISTICS + WHERE TABLE_SCHEMA = %s + AND TABLE_NAME = %s + AND INDEX_NAME = %s + """ + result = self.query_to_df( + check_index_sql, + params=(self.config['database'], table_name, index_name), + is_print=False + ) + + if not result.empty and result['cnt'].iloc[0] > 0: + self.log.debug(f"唯一索引 {index_name} 已存在,跳过添加") + return True + + # 2. 如果启用重复检查,先检查是否有重复数据 + if check_duplicates: + check_duplicates_sql = f""" + SELECT {column_name}, COUNT(*) as cnt + FROM `{table_name}` + WHERE {column_name} IS NOT NULL AND {column_name} != '' + GROUP BY {column_name} + HAVING cnt > 1 + LIMIT 1 + """ + duplicates = self.query_to_df(check_duplicates_sql, is_print=False) + + if not duplicates.empty: + self.log.warning( + f"表 {table_name} 中存在重复的 {column_name} 数据,无法添加唯一索引。" + "现有数据不会被删除。", + duplicate_count=len(duplicates) + ) + return False + + # 3. 添加唯一索引 + add_index_sql = f""" + ALTER TABLE `{table_name}` + ADD UNIQUE KEY `{index_name}` ({column_name}({column_length})) + """ + self.execute_sql(add_index_sql) + self.log.info(f"成功添加唯一索引 {index_name}(现有数据不受影响)") + return True + + except Exception as e: + error_msg = str(e) + # 如果索引已存在,不报错 + if "Duplicate key name" in error_msg or "already exists" in error_msg.lower(): + self.log.debug(f"唯一索引 {index_name} 已存在,跳过添加") + return True + else: + self.log.warning(f"添加唯一索引时出现问题(不影响现有数据): {error_msg}") + raise + def execute_sql(self, sql: str, params: Union[tuple, dict, None] = None, fetch: bool = False) -> Union[int, List[Dict[str, Any]]]: """执行SQL语句(原有逻辑完全保留)"""