diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py index dbea153..f59a864 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py @@ -9,8 +9,8 @@ # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 基础配置 -PLATFORM = "bili" # 平台,xhs | dy | ks | bili | wb | tieba | zhihu -KEYWORDS = "电影鬼灭之刃,亲属想侵吞3姐妹亡父赔偿款,网警斩断侵害未成年人网络黑色产业链,2007年后出生的人不能在马尔代夫吸烟,沈月,是公主也是自己的骑士,以军虐囚视频,唐朝诡事录,广州地铁回应APP乘车码频繁弹窗广告,全红婵的减肥计划精确到克" # 关键词搜索配置,以英文逗号分隔 +PLATFORM = "zhihu" # 平台,xhs | dy | ks | bili | wb | tieba | zhihu +KEYWORDS = "F6智慧门店,南京爱福路汽车科技有限公司,汽车后市场,汽修店,新康众" # 关键词搜索配置,以英文逗号分隔 LOGIN_TYPE = "qrcode" # qrcode or phone or cookie COOKIES = "" CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) @@ -61,7 +61,7 @@ BROWSER_LAUNCH_TIMEOUT = 30 AUTO_CLOSE_BROWSER = True # 数据保存类型选项配置,支持五种类型:csv、db、json、sqlite、postgresql, 最好保存到DB,有排重的功能。 -SAVE_DATA_OPTION = "postgresql" # csv or db or json or sqlite or postgresql +SAVE_DATA_OPTION = "db" # csv or db or json or sqlite or postgresql # 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name @@ -70,7 +70,7 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name START_PAGE = 1 # 爬取视频/帖子的数量控制 -CRAWLER_MAX_NOTES_COUNT = 5 +CRAWLER_MAX_NOTES_COUNT = 50 # 并发爬虫数量控制 MAX_CONCURRENCY_NUM = 1 @@ -84,6 +84,11 @@ ENABLE_GET_COMMENTS = True # 爬取一级评论的数量控制(单视频/帖子) CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 20 +# 是否对评论做去重及重复页跳出(针对贴吧等平台) +ENABLE_COMMENT_DEDUP = True +# 连续多少页没有新评论时中断评论循环 +COMMENT_DUP_BREAK_THRESHOLD = 2 + # 是否开启爬二级评论模式, 默认不开启爬二级评论 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 ENABLE_GET_SUB_COMMENTS = False diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/config/db_config.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/config/db_config.py index 0b6d45b..52f2332 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/config/db_config.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/config/db_config.py @@ -12,10 +12,10 @@ import os # mysql config - 使用MindSpider的数据库配置 -MYSQL_DB_PWD = "bettafish" -MYSQL_DB_USER = "bettafish" -MYSQL_DB_HOST = "127.0.0.1" -MYSQL_DB_PORT = 5444 +MYSQL_DB_PWD = "123123" +MYSQL_DB_USER = "intelligence" +MYSQL_DB_HOST = "123.60.167.249" +MYSQL_DB_PORT = 3306 MYSQL_DB_NAME = "bettafish" mysql_db_config = { @@ -48,7 +48,7 @@ sqlite_db_config = { POSTGRESQL_DB_PWD = os.getenv("POSTGRESQL_DB_PWD", "bettafish") POSTGRESQL_DB_USER = os.getenv("POSTGRESQL_DB_USER", "bettafish") POSTGRESQL_DB_HOST = os.getenv("POSTGRESQL_DB_HOST", "127.0.0.1") -POSTGRESQL_DB_PORT = os.getenv("POSTGRESQL_DB_PORT", "5444") +POSTGRESQL_DB_PORT = os.getenv("POSTGRESQL_DB_PORT", "5432") POSTGRESQL_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "bettafish") postgresql_db_config = { diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py index b4c55a1..fc7a416 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py @@ -69,30 +69,40 @@ async def main(): print(f"Database {args.init_db} initialized successfully.") return # Exit the main function cleanly + crawler = None + try: + crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM) + await crawler.start() - - crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM) - await crawler.start() - - # Generate wordcloud after crawling is complete - # Only for JSON save mode - if config.SAVE_DATA_OPTION == "json" and config.ENABLE_GET_WORDCLOUD: - try: - file_writer = AsyncFileWriter( - platform=config.PLATFORM, - crawler_type=crawler_type_var.get() - ) - await file_writer.generate_wordcloud_from_comments() - except Exception as e: - print(f"Error generating wordcloud: {e}") + # Generate wordcloud after crawling is complete + # Only for JSON save mode + if config.SAVE_DATA_OPTION == "json" and config.ENABLE_GET_WORDCLOUD: + try: + file_writer = AsyncFileWriter( + platform=config.PLATFORM, + crawler_type=crawler_type_var.get() + ) + await file_writer.generate_wordcloud_from_comments() + except Exception as e: + print(f"Error generating wordcloud: {e}") + finally: + # 确保爬虫结束后关闭浏览器 + if crawler: + try: + await crawler.close() + print(f"[MediaCrawler] 浏览器已关闭") + except Exception as e: + print(f"[MediaCrawler] 关闭浏览器时出错: {e}") def cleanup(): - if crawler: - # asyncio.run(crawler.close()) - pass - if config.SAVE_DATA_OPTION in ["db", "sqlite"]: - asyncio.run(db.close()) + # 注意:crawler.close() 已经在 main() 的 finally 块中调用 + # 这里只处理数据库关闭 + if config.SAVE_DATA_OPTION in ["db", "sqlite", "postgresql"]: + try: + asyncio.run(db.close()) + except Exception as e: + print(f"[MediaCrawler] 关闭数据库连接时出错: {e}") if __name__ == "__main__": diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py index c002155..d2bb921 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py @@ -362,13 +362,16 @@ class DouYinCrawler(AbstractCrawler): async def close(self) -> None: """Close browser context""" - # 如果使用CDP模式,需要特殊处理 - if self.cdp_manager: - await self.cdp_manager.cleanup() - self.cdp_manager = None - else: - await self.browser_context.close() - utils.logger.info("[DouYinCrawler.close] Browser context closed ...") + try: + # 如果使用CDP模式,需要特殊处理 + if self.cdp_manager: + await self.cdp_manager.cleanup() + self.cdp_manager = None + elif self.browser_context: + await self.browser_context.close() + utils.logger.info("[DouYinCrawler.close] Browser context closed ...") + except Exception as e: + utils.logger.error(f"[DouYinCrawler.close] An error occurred during close: {e}") async def get_aweme_media(self, aweme_item: Dict): """ diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py index 4cd2eb8..1f5669b 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py @@ -426,10 +426,13 @@ class KuaishouCrawler(AbstractCrawler): async def close(self): """Close browser context""" - # 如果使用CDP模式,需要特殊处理 - if self.cdp_manager: - await self.cdp_manager.cleanup() - self.cdp_manager = None - else: - await self.browser_context.close() - utils.logger.info("[KuaishouCrawler.close] Browser context closed ...") + try: + # 如果使用CDP模式,需要特殊处理 + if self.cdp_manager: + await self.cdp_manager.cleanup() + self.cdp_manager = None + elif self.browser_context: + await self.browser_context.close() + utils.logger.info("[KuaishouCrawler.close] Browser context closed ...") + except Exception as e: + utils.logger.error(f"[KuaishouCrawler.close] An error occurred during close: {e}") diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py index 5de7458..005adae 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py @@ -10,7 +10,7 @@ import asyncio import json -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union, Set from urllib.parse import urlencode, quote import requests @@ -328,6 +328,8 @@ class BaiduTieBaClient(AbstractApiClient): result: List[TiebaComment] = [] current_page = 1 + seen_comment_ids: Set[str] = set() + duplicate_page_count = 0 while note_detail.total_replay_page >= current_page and len(result) < max_count: # 构造评论页URL @@ -353,6 +355,26 @@ class BaiduTieBaClient(AbstractApiClient): utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有评论,停止爬取") break + if config.ENABLE_COMMENT_DEDUP: + new_comments: List[TiebaComment] = [] + for comment in comments: + comment_id = getattr(comment, "comment_id", None) + if comment_id and comment_id not in seen_comment_ids: + seen_comment_ids.add(comment_id) + new_comments.append(comment) + if not new_comments: + duplicate_page_count += 1 + utils.logger.info( + f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有出现新的评论(重复数据),计数={duplicate_page_count}" + ) + if duplicate_page_count >= config.COMMENT_DUP_BREAK_THRESHOLD: + utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 连续 {duplicate_page_count} 页无新增评论,提前结束抓取") + break + current_page += 1 + continue + comments = new_comments + duplicate_page_count = 0 + # 限制评论数量 if len(result) + len(comments) > max_count: comments = comments[:max_count - len(result)] @@ -408,6 +430,8 @@ class BaiduTieBaClient(AbstractApiClient): current_page = 1 max_sub_page_num = parment_comment.sub_comment_count // 10 + 1 + seen_sub_ids: Set[str] = set() + duplicate_page_count = 0 while max_sub_page_num >= current_page: # 构造子评论URL @@ -442,6 +466,28 @@ class BaiduTieBaClient(AbstractApiClient): ) break + if config.ENABLE_COMMENT_DEDUP: + new_sub_comments: List[TiebaComment] = [] + for sub_comment in sub_comments: + sub_comment_id = getattr(sub_comment, "comment_id", None) + if sub_comment_id and sub_comment_id not in seen_sub_ids: + seen_sub_ids.add(sub_comment_id) + new_sub_comments.append(sub_comment) + if not new_sub_comments: + duplicate_page_count += 1 + utils.logger.info( + f"[BaiduTieBaClient.get_comments_all_sub_comments] 评论{parment_comment.comment_id}第{current_page}页未出现新子评论,计数={duplicate_page_count}" + ) + if duplicate_page_count >= config.COMMENT_DUP_BREAK_THRESHOLD: + utils.logger.info( + f"[BaiduTieBaClient.get_comments_all_sub_comments] 评论{parment_comment.comment_id}连续 {duplicate_page_count} 页无新增子评论,提前结束" + ) + break + current_page += 1 + continue + sub_comments = new_sub_comments + duplicate_page_count = 0 + if callback: await callback(parment_comment.note_id, sub_comments) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/core.py index 268cf26..c4453d8 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/core.py @@ -662,10 +662,13 @@ class TieBaCrawler(AbstractCrawler): Returns: """ - # 如果使用CDP模式,需要特殊处理 - if self.cdp_manager: - await self.cdp_manager.cleanup() - self.cdp_manager = None - else: - await self.browser_context.close() - utils.logger.info("[BaiduTieBaCrawler.close] Browser context closed ...") + try: + # 如果使用CDP模式,需要特殊处理 + if self.cdp_manager: + await self.cdp_manager.cleanup() + self.cdp_manager = None + elif self.browser_context: + await self.browser_context.close() + utils.logger.info("[BaiduTieBaCrawler.close] Browser context closed ...") + except Exception as e: + utils.logger.error(f"[BaiduTieBaCrawler.close] An error occurred during close: {e}") diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/core.py index e78a212..69dfecf 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/core.py @@ -383,10 +383,13 @@ class WeiboCrawler(AbstractCrawler): async def close(self): """Close browser context""" - # 如果使用CDP模式,需要特殊处理 - if self.cdp_manager: - await self.cdp_manager.cleanup() - self.cdp_manager = None - else: - await self.browser_context.close() - utils.logger.info("[WeiboCrawler.close] Browser context closed ...") + try: + # 如果使用CDP模式,需要特殊处理 + if self.cdp_manager: + await self.cdp_manager.cleanup() + self.cdp_manager = None + elif self.browser_context: + await self.browser_context.close() + utils.logger.info("[WeiboCrawler.close] Browser context closed ...") + except Exception as e: + utils.logger.error(f"[WeiboCrawler.close] An error occurred during close: {e}") diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/xhs/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/xhs/core.py index bbc8ee7..0730f79 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/xhs/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/xhs/core.py @@ -437,13 +437,16 @@ class XiaoHongShuCrawler(AbstractCrawler): async def close(self): """Close browser context""" - # 如果使用CDP模式,需要特殊处理 - if self.cdp_manager: - await self.cdp_manager.cleanup() - self.cdp_manager = None - else: - await self.browser_context.close() - utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...") + try: + # 如果使用CDP模式,需要特殊处理 + if self.cdp_manager: + await self.cdp_manager.cleanup() + self.cdp_manager = None + elif self.browser_context: + await self.browser_context.close() + utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...") + except Exception as e: + utils.logger.error(f"[XiaoHongShuCrawler.close] An error occurred during close: {e}") async def get_notice_media(self, note_detail: Dict): if not config.ENABLE_GET_MEIDAS: diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/zhihu/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/zhihu/core.py index ad1b729..b0a05fd 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/zhihu/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/zhihu/core.py @@ -139,6 +139,12 @@ class ZhihuCrawler(AbstractCrawler): if config.CRAWLER_MAX_NOTES_COUNT < zhihu_limit_count: config.CRAWLER_MAX_NOTES_COUNT = zhihu_limit_count start_page = config.START_PAGE + + # 统计信息 + total_saved_contents = 0 + total_failed_contents = 0 + total_saved_comments = 0 + for keyword in config.KEYWORDS.split(","): source_keyword_var.set(keyword) utils.logger.info( @@ -164,7 +170,7 @@ class ZhihuCrawler(AbstractCrawler): ) ) utils.logger.info( - f"[ZhihuCrawler.search] Search contents :{content_list}" + f"[ZhihuCrawler.search] Search contents :{len(content_list)} 条" ) if not content_list: utils.logger.info("No more content!") @@ -175,13 +181,41 @@ class ZhihuCrawler(AbstractCrawler): utils.logger.info(f"[ZhihuCrawler.search] Sleeping for {config.CRAWLER_MAX_SLEEP_SEC} seconds after page {page-1}") page += 1 + # 保存内容,添加异常处理和统计 + saved_count = 0 + failed_count = 0 for content in content_list: - await zhihu_store.update_zhihu_content(content) + try: + await zhihu_store.update_zhihu_content(content) + saved_count += 1 + except Exception as e: + failed_count += 1 + utils.logger.error( + f"[ZhihuCrawler.search] 保存内容失败 (content_id={content.content_id}): {e}" + ) + + if saved_count > 0: + utils.logger.info( + f"[ZhihuCrawler.search] 关键词 '{keyword}' 第 {page-1} 页: 成功保存 {saved_count} 条内容" + ) + total_saved_contents += saved_count + if failed_count > 0: + utils.logger.warning( + f"[ZhihuCrawler.search] 关键词 '{keyword}' 第 {page-1} 页: 保存失败 {failed_count} 条内容" + ) + total_failed_contents += failed_count await self.batch_get_content_comments(content_list) except DataFetchError: utils.logger.error("[ZhihuCrawler.search] Search content error") return + + # 输出最终统计信息 + utils.logger.info( + f"[ZhihuCrawler.search] 关键词搜索完成统计: " + f"成功保存 {total_saved_contents} 条内容, " + f"失败 {total_failed_contents} 条内容" + ) async def batch_get_content_comments(self, content_list: List[ZhihuContent]): """ @@ -473,10 +507,13 @@ class ZhihuCrawler(AbstractCrawler): async def close(self): """Close browser context""" - # 如果使用CDP模式,需要特殊处理 - if self.cdp_manager: - await self.cdp_manager.cleanup() - self.cdp_manager = None - else: - await self.browser_context.close() - utils.logger.info("[ZhihuCrawler.close] Browser context closed ...") + try: + # 如果使用CDP模式,需要特殊处理 + if self.cdp_manager: + await self.cdp_manager.cleanup() + self.cdp_manager = None + elif self.browser_context: + await self.browser_context.close() + utils.logger.info("[ZhihuCrawler.close] Browser context closed ...") + except Exception as e: + utils.logger.error(f"[ZhihuCrawler.close] An error occurred during close: {e}") diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py index d34b933..45130f3 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py @@ -16,9 +16,9 @@ import config from base.base_crawler import AbstractStore from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator from ._store_impl import (ZhihuCsvStoreImplement, - ZhihuDbStoreImplement, - ZhihuJsonStoreImplement, - ZhihuSqliteStoreImplement) + ZhihuDbStoreImplement, + ZhihuJsonStoreImplement, + ZhihuSqliteStoreImplement) from tools import utils from var import source_keyword_var @@ -36,9 +36,11 @@ class ZhihuStoreFactory: def create_store() -> AbstractStore: store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION) if not store_class: - raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or postgresql ...") + raise ValueError( + "[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or postgresql ...") return store_class() + async def batch_update_zhihu_contents(contents: List[ZhihuContent]): """ 批量更新知乎内容 @@ -54,6 +56,7 @@ async def batch_update_zhihu_contents(contents: List[ZhihuContent]): for content_item in contents: await update_zhihu_content(content_item) + async def update_zhihu_content(content_item: ZhihuContent): """ 更新知乎内容 @@ -70,7 +73,6 @@ async def update_zhihu_content(content_item: ZhihuContent): await ZhihuStoreFactory.create_store().store_content(local_db_item) - async def batch_update_zhihu_note_comments(comments: List[ZhihuComment]): """ 批量更新知乎内容评论 @@ -82,7 +84,7 @@ async def batch_update_zhihu_note_comments(comments: List[ZhihuComment]): """ if not comments: return - + for comment_item in comments: await update_zhihu_content_comment(comment_item) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py index ac4dc1b..d659504 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py @@ -94,17 +94,24 @@ class ZhihuDbStoreImplement(AbstractStore): content_item: content item dict """ content_id = content_item.get("content_id") - async with get_session() as session: - stmt = select(ZhihuContent).where(ZhihuContent.content_id == content_id) - result = await session.execute(stmt) - existing_content = result.scalars().first() - if existing_content: - for key, value in content_item.items(): - setattr(existing_content, key, value) - else: - new_content = ZhihuContent(**content_item) - session.add(new_content) - await session.commit() + try: + async with get_session() as session: + stmt = select(ZhihuContent).where(ZhihuContent.content_id == content_id) + result = await session.execute(stmt) + existing_content = result.scalars().first() + if existing_content: + for key, value in content_item.items(): + setattr(existing_content, key, value) + utils.logger.debug(f"[ZhihuDbStore] 更新内容: {content_id}") + else: + new_content = ZhihuContent(**content_item) + session.add(new_content) + utils.logger.debug(f"[ZhihuDbStore] 新增内容: {content_id}") + await session.commit() + utils.logger.info(f"[ZhihuDbStore] 成功保存内容到数据库: {content_id}") + except Exception as e: + utils.logger.error(f"[ZhihuDbStore] 保存内容失败 (content_id={content_id}): {e}") + raise async def store_comment(self, comment_item: Dict): """ @@ -113,17 +120,24 @@ class ZhihuDbStoreImplement(AbstractStore): comment_item: comment item dict """ comment_id = comment_item.get("comment_id") - async with get_session() as session: - stmt = select(ZhihuComment).where(ZhihuComment.comment_id == comment_id) - result = await session.execute(stmt) - existing_comment = result.scalars().first() - if existing_comment: - for key, value in comment_item.items(): - setattr(existing_comment, key, value) - else: - new_comment = ZhihuComment(**comment_item) - session.add(new_comment) - await session.commit() + try: + async with get_session() as session: + stmt = select(ZhihuComment).where(ZhihuComment.comment_id == comment_id) + result = await session.execute(stmt) + existing_comment = result.scalars().first() + if existing_comment: + for key, value in comment_item.items(): + setattr(existing_comment, key, value) + utils.logger.debug(f"[ZhihuDbStore] 更新评论: {comment_id}") + else: + new_comment = ZhihuComment(**comment_item) + session.add(new_comment) + utils.logger.debug(f"[ZhihuDbStore] 新增评论: {comment_id}") + await session.commit() + utils.logger.info(f"[ZhihuDbStore] 成功保存评论到数据库: {comment_id}") + except Exception as e: + utils.logger.error(f"[ZhihuDbStore] 保存评论失败 (comment_id={comment_id}): {e}") + raise async def store_creator(self, creator: Dict): """ diff --git a/MindSpider/DeepSentimentCrawling/keyword_manager.py b/MindSpider/DeepSentimentCrawling/keyword_manager.py index 59a19db..c87b479 100644 --- a/MindSpider/DeepSentimentCrawling/keyword_manager.py +++ b/MindSpider/DeepSentimentCrawling/keyword_manager.py @@ -32,6 +32,7 @@ class KeywordManager: def __init__(self): """初始化关键词管理器""" self.engine: Engine = None + self.custom_keywords_path: Optional[Path] = self._resolve_custom_keywords_path() self.connect() def connect(self): @@ -68,24 +69,31 @@ class KeywordManager: Returns: 关键词列表 """ + if not getattr(settings, "ENABLE_KEYWORD_SEARCH", True): + logger.info("关键词搜索已通过配置禁用,返回默认关键词列表") + return self._limit_keywords(self._get_default_keywords(), max_keywords) + if not target_date: target_date = date.today() + if getattr(settings, "USE_DEFAULT_KEYWORDS_ONLY", False): + logger.info("配置启用默认关键词模式,直接返回默认关键词") + return self._limit_keywords(self._get_default_keywords(), max_keywords) + logger.info(f"正在获取 {target_date} 的关键词...") + # 优先使用自定义关键词 + custom_keywords = self._get_custom_keywords(target_date, max_keywords) + if custom_keywords: + return custom_keywords + # 首先尝试获取指定日期的关键词 topics_data = self.get_daily_topics(target_date) if topics_data and topics_data.get('keywords'): keywords = topics_data['keywords'] logger.info(f"成功获取 {target_date} 的 {len(keywords)} 个关键词") - - # 如果关键词太多,随机选择指定数量 - if len(keywords) > max_keywords: - keywords = random.sample(keywords, max_keywords) - logger.info(f"随机选择了 {max_keywords} 个关键词") - - return keywords + return self._limit_keywords(keywords, max_keywords) # 如果没有当天的关键词,尝试获取最近几天的 logger.info(f"{target_date} 没有关键词数据,尝试获取最近的关键词...") @@ -100,15 +108,14 @@ class KeywordManager: # 去重并限制数量 unique_keywords = list(set(all_keywords)) - if len(unique_keywords) > max_keywords: - unique_keywords = random.sample(unique_keywords, max_keywords) + limited_keywords = self._limit_keywords(unique_keywords, max_keywords) - logger.info(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词") - return unique_keywords + logger.info(f"从最近7天的数据中获取到 {len(limited_keywords)} 个关键词") + return limited_keywords # 如果都没有,返回默认关键词 logger.info("没有找到任何关键词数据,使用默认关键词") - return self._get_default_keywords() + return self._limit_keywords(self._get_default_keywords(), max_keywords) def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]: """ @@ -176,17 +183,75 @@ class KeywordManager: except Exception as e: logger.exception(f"获取最近话题分析失败: {e}") return [] + + def _resolve_custom_keywords_path(self) -> Optional[Path]: + """解析自定义关键词文件路径""" + raw_path = getattr(settings, "CUSTOM_KEYWORDS_FILE", None) + if not raw_path: + return None + path = Path(raw_path).expanduser() + if not path.is_absolute(): + path = project_root / path + return path + + @staticmethod + def _limit_keywords(keywords: List[str], max_keywords: int) -> List[str]: + """根据最大数量限制关键词""" + if not keywords: + return [] + if max_keywords and len(keywords) > max_keywords: + keywords = random.sample(keywords, max_keywords) + return keywords + + def _get_custom_keywords(self, target_date: date, max_keywords: int) -> Optional[List[str]]: + """ + 从自定义关键词文件中获取指定日期的关键词 + 支持格式: + 1. ["AI", "大模型"] + 2. {"2025-11-26": ["AI"], "default": ["科技"]} + """ + if not self.custom_keywords_path: + return None + + path = self.custom_keywords_path + if not path.exists(): + logger.warning(f"自定义关键词文件不存在: {path}") + return None + + try: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + except Exception as e: + logger.error(f"读取自定义关键词文件失败({path}): {e}") + return None + + keywords: Optional[List[str]] = None + if isinstance(data, list): + keywords = data + elif isinstance(data, dict): + date_key = target_date.isoformat() + if date_key in data and isinstance(data[date_key], list): + keywords = data[date_key] + elif "default" in data and isinstance(data["default"], list): + keywords = data["default"] + elif "keywords" in data and isinstance(data["keywords"], list): + keywords = data["keywords"] + + if not keywords: + return None + + cleaned_keywords = [kw.strip() for kw in keywords if isinstance(kw, str) and kw.strip()] + if not cleaned_keywords: + return None + + limited = self._limit_keywords(cleaned_keywords, max_keywords) + logger.info(f"使用自定义关键词({path})共 {len(limited)} 个") + return limited def _get_default_keywords(self) -> List[str]: """获取默认关键词列表""" return [ - "科技", "人工智能", "AI", "编程", "互联网", - "创业", "投资", "理财", "股市", "经济", - "教育", "学习", "考试", "大学", "就业", - "健康", "养生", "运动", "美食", "旅游", - "时尚", "美妆", "购物", "生活", "家居", - "电影", "音乐", "游戏", "娱乐", "明星", - "新闻", "热点", "社会", "政策", "环保" + "F6智慧门店","南京爱福路汽车科技有限公司","汽车后市场","汽修店","新康众" ] def get_all_keywords_for_platforms(self, platforms: List[str], target_date: date = None, @@ -286,6 +351,16 @@ class KeywordManager: if not target_date: target_date = date.today() + # 当配置为仅使用默认关键词时,直接返回默认关键词的摘要,避免上层误判为“无数据” + if getattr(settings, "USE_DEFAULT_KEYWORDS_ONLY", False): + default_keywords = self._get_default_keywords() + return { + 'date': target_date, + 'keywords_count': len(default_keywords), + 'summary': '使用默认关键词模式', + 'has_data': bool(default_keywords) + } + topics_data = self.get_daily_topics(target_date) if topics_data: diff --git a/MindSpider/DeepSentimentCrawling/platform_crawler.py b/MindSpider/DeepSentimentCrawling/platform_crawler.py index 93622b6..b7c17dd 100644 --- a/MindSpider/DeepSentimentCrawling/platform_crawler.py +++ b/MindSpider/DeepSentimentCrawling/platform_crawler.py @@ -251,16 +251,30 @@ postgresql_db_config = {{ logger.info(f"执行命令: {' '.join(cmd)}") - # 切换到MediaCrawler目录并执行 + # 切换到MediaCrawler目录并执行,捕获输出 result = subprocess.run( cmd, cwd=self.mediacrawler_path, - timeout=3600 # 60分钟超时 + timeout=3600, # 60分钟超时 + capture_output=True, + text=True, + encoding='utf-8', + errors='replace' ) end_time = datetime.now() duration = (end_time - start_time).total_seconds() + # 解析输出,提取实际保存的数据量 + output_lines = result.stdout.split('\n') if result.stdout else [] + error_lines = result.stderr.split('\n') if result.stderr else [] + + # 合并所有输出行用于解析 + all_lines = output_lines + error_lines + + # 解析统计信息 + parsed_stats = self._parse_crawl_output(all_lines, error_lines) + # 创建统计信息 crawl_stats = { "platform": platform, @@ -270,9 +284,11 @@ postgresql_db_config = {{ "end_time": end_time.isoformat(), "return_code": result.returncode, "success": result.returncode == 0, - "notes_count": 0, - "comments_count": 0, - "errors_count": 0 + "notes_count": parsed_stats.get("notes_count", 0), + "comments_count": parsed_stats.get("comments_count", 0), + "errors_count": parsed_stats.get("errors_count", 0), + "output_preview": '\n'.join(output_lines[-20:]) if output_lines else "", # 最后20行输出 + "error_preview": '\n'.join(error_lines[-20:]) if error_lines else "" # 最后20行错误 } # 保存统计信息 @@ -280,8 +296,16 @@ postgresql_db_config = {{ if result.returncode == 0: logger.info(f"✅ {platform} 爬取完成,耗时: {duration:.1f}秒") + logger.info(f" 保存内容: {crawl_stats['notes_count']} 条,评论: {crawl_stats['comments_count']} 条") + if crawl_stats['notes_count'] == 0 and crawl_stats['comments_count'] == 0: + logger.warning(f"⚠️ {platform} 爬取成功但未保存任何数据,请检查数据库连接和保存逻辑") + # 输出部分日志用于调试 + if crawl_stats['error_preview']: + logger.warning(f" 错误信息: {crawl_stats['error_preview'][:500]}") else: logger.error(f"❌ {platform} 爬取失败,返回码: {result.returncode}") + if error_lines: + logger.error(f" 错误信息: {crawl_stats['error_preview'][:500]}") return crawl_stats @@ -294,6 +318,7 @@ postgresql_db_config = {{ def _parse_crawl_output(self, output_lines: List[str], error_lines: List[str]) -> Dict: """解析爬取输出,提取统计信息""" + import re stats = { "notes_count": 0, "comments_count": 0, @@ -301,32 +326,60 @@ postgresql_db_config = {{ "login_required": False } - # 解析输出行 - for line in output_lines: - if "条笔记" in line or "条内容" in line: + # 合并所有行用于解析 + all_lines = output_lines + error_lines + + # 解析输出行,查找各种可能的数据保存信息 + for line in all_lines: + line_lower = line.lower() + + # 查找保存的内容数量(多种可能的格式) + # 例如:"保存了 10 条笔记"、"成功保存 5 条内容"、"inserted 3 records"等 + if any(keyword in line_lower for keyword in ["条笔记", "条内容", "条视频", "条帖子", "条回答"]): try: - # 提取数字 - import re + # 提取数字,优先取第一个数字 numbers = re.findall(r'\d+', line) if numbers: - stats["notes_count"] = int(numbers[0]) + # 如果找到多个数字,取最大的(通常是总数) + potential_count = max([int(n) for n in numbers]) + if potential_count > stats["notes_count"]: + stats["notes_count"] = potential_count except: pass - elif "条评论" in line: + + # 查找保存的评论数量 + if "条评论" in line_lower: try: - import re numbers = re.findall(r'\d+', line) if numbers: - stats["comments_count"] = int(numbers[0]) + potential_count = max([int(n) for n in numbers]) + if potential_count > stats["comments_count"]: + stats["comments_count"] = potential_count except: pass - elif "登录" in line or "扫码" in line: + + # 查找数据库相关错误 + if any(keyword in line_lower for keyword in ["数据库", "database", "connection", "连接失败", "保存失败"]): + if "error" in line_lower or "失败" in line_lower or "异常" in line_lower: + stats["errors_count"] += 1 + + # 查找登录相关 + if any(keyword in line_lower for keyword in ["登录", "扫码", "login", "需要登录"]): stats["login_required"] = True - # 解析错误行 - for line in error_lines: - if "error" in line.lower() or "异常" in line: - stats["errors_count"] += 1 + # 如果没有找到明确的保存数量,尝试从数据库操作日志中提取 + if stats["notes_count"] == 0 and stats["comments_count"] == 0: + # 查找可能的数据库插入信息 + for line in all_lines: + # 查找类似 "insert into" 或 "保存到" 的信息 + if "insert" in line_lower or "保存到" in line_lower: + try: + numbers = re.findall(r'\d+', line) + if numbers: + # 尝试提取可能的记录数 + pass # 这里可以进一步解析 + except: + pass return stats diff --git a/MindSpider/config.py b/MindSpider/config.py index 05d5206..68043ca 100644 --- a/MindSpider/config.py +++ b/MindSpider/config.py @@ -22,6 +22,18 @@ class Settings(BaseSettings): DB_PASSWORD: str = Field("your_password", description="数据库密码") DB_NAME: str = Field("mindspider", description="数据库名称") DB_CHARSET: str = Field("utf8mb4", description="数据库字符集") + CUSTOM_KEYWORDS_FILE: Optional[str] = Field( + None, + description="自定义关键词文件路径(可为绝对路径或相对MindSpider目录的路径)" + ) + USE_DEFAULT_KEYWORDS_ONLY: bool = Field( + True, + description="为True时忽略数据库/自定义结果,直接使用默认关键词" + ) + ENABLE_KEYWORD_SEARCH: bool = Field( + True, + description="开启后运行基于关键词的爬取流程,关闭则完全跳过关键词搜索" + ) MINDSPIDER_API_KEY: Optional[str] = Field(None, description="MINDSPIDER API密钥") MINDSPIDER_BASE_URL: Optional[str] = Field("https://api.deepseek.com", description="MINDSPIDER API基础URL,推荐deepseek-chat模型使用https://api.deepseek.com") MINDSPIDER_MODEL_NAME: Optional[str] = Field("deepseek-chat", description="MINDSPIDER API模型名称, 推荐deepseek-chat")