本地化&2.0

This commit is contained in:
z66
2025-12-02 14:01:39 +08:00
parent ec1baf539c
commit a9eda60493
15 changed files with 409 additions and 140 deletions
@@ -9,8 +9,8 @@
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# 基础配置 # 基础配置
PLATFORM = "bili" # 平台,xhs | dy | ks | bili | wb | tieba | zhihu PLATFORM = "zhihu" # 平台,xhs | dy | ks | bili | wb | tieba | zhihu
KEYWORDS = "电影鬼灭之刃,亲属想侵吞3姐妹亡父赔偿款,网警斩断侵害未成年人网络黑色产业链,2007年后出生的人不能在马尔代夫吸烟,沈月,是公主也是自己的骑士,以军虐囚视频,唐朝诡事录,广州地铁回应APP乘车码频繁弹窗广告,全红婵的减肥计划精确到克" # 关键词搜索配置,以英文逗号分隔 KEYWORDS = "F6智慧门店,南京爱福路汽车科技有限公司,汽车后市场,汽修店,新康众" # 关键词搜索配置,以英文逗号分隔
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = "" COOKIES = ""
CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)
@@ -61,7 +61,7 @@ BROWSER_LAUNCH_TIMEOUT = 30
AUTO_CLOSE_BROWSER = True AUTO_CLOSE_BROWSER = True
# 数据保存类型选项配置,支持五种类型:csv、db、json、sqlite、postgresql, 最好保存到DB,有排重的功能。 # 数据保存类型选项配置,支持五种类型:csv、db、json、sqlite、postgresql, 最好保存到DB,有排重的功能。
SAVE_DATA_OPTION = "postgresql" # csv or db or json or sqlite or postgresql SAVE_DATA_OPTION = "db" # csv or db or json or sqlite or postgresql
# 用户浏览器缓存的浏览器文件配置 # 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
@@ -70,7 +70,7 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
START_PAGE = 1 START_PAGE = 1
# 爬取视频/帖子的数量控制 # 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 5 CRAWLER_MAX_NOTES_COUNT = 50
# 并发爬虫数量控制 # 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 1 MAX_CONCURRENCY_NUM = 1
@@ -84,6 +84,11 @@ ENABLE_GET_COMMENTS = True
# 爬取一级评论的数量控制(单视频/帖子) # 爬取一级评论的数量控制(单视频/帖子)
CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 20 CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 20
# 是否对评论做去重及重复页跳出(针对贴吧等平台)
ENABLE_COMMENT_DEDUP = True
# 连续多少页没有新评论时中断评论循环
COMMENT_DUP_BREAK_THRESHOLD = 2
# 是否开启爬二级评论模式, 默认不开启爬二级评论 # 是否开启爬二级评论模式, 默认不开启爬二级评论
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
ENABLE_GET_SUB_COMMENTS = False ENABLE_GET_SUB_COMMENTS = False
@@ -12,10 +12,10 @@
import os import os
# mysql config - 使用MindSpider的数据库配置 # mysql config - 使用MindSpider的数据库配置
MYSQL_DB_PWD = "bettafish" MYSQL_DB_PWD = "123123"
MYSQL_DB_USER = "bettafish" MYSQL_DB_USER = "intelligence"
MYSQL_DB_HOST = "127.0.0.1" MYSQL_DB_HOST = "123.60.167.249"
MYSQL_DB_PORT = 5444 MYSQL_DB_PORT = 3306
MYSQL_DB_NAME = "bettafish" MYSQL_DB_NAME = "bettafish"
mysql_db_config = { mysql_db_config = {
@@ -48,7 +48,7 @@ sqlite_db_config = {
POSTGRESQL_DB_PWD = os.getenv("POSTGRESQL_DB_PWD", "bettafish") POSTGRESQL_DB_PWD = os.getenv("POSTGRESQL_DB_PWD", "bettafish")
POSTGRESQL_DB_USER = os.getenv("POSTGRESQL_DB_USER", "bettafish") POSTGRESQL_DB_USER = os.getenv("POSTGRESQL_DB_USER", "bettafish")
POSTGRESQL_DB_HOST = os.getenv("POSTGRESQL_DB_HOST", "127.0.0.1") POSTGRESQL_DB_HOST = os.getenv("POSTGRESQL_DB_HOST", "127.0.0.1")
POSTGRESQL_DB_PORT = os.getenv("POSTGRESQL_DB_PORT", "5444") POSTGRESQL_DB_PORT = os.getenv("POSTGRESQL_DB_PORT", "5432")
POSTGRESQL_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "bettafish") POSTGRESQL_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "bettafish")
postgresql_db_config = { postgresql_db_config = {
@@ -69,8 +69,8 @@ async def main():
print(f"Database {args.init_db} initialized successfully.") print(f"Database {args.init_db} initialized successfully.")
return # Exit the main function cleanly return # Exit the main function cleanly
crawler = None
try:
crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM) crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM)
await crawler.start() await crawler.start()
@@ -85,14 +85,24 @@ async def main():
await file_writer.generate_wordcloud_from_comments() await file_writer.generate_wordcloud_from_comments()
except Exception as e: except Exception as e:
print(f"Error generating wordcloud: {e}") print(f"Error generating wordcloud: {e}")
finally:
# 确保爬虫结束后关闭浏览器
if crawler:
try:
await crawler.close()
print(f"[MediaCrawler] 浏览器已关闭")
except Exception as e:
print(f"[MediaCrawler] 关闭浏览器时出错: {e}")
def cleanup(): def cleanup():
if crawler: # 注意:crawler.close() 已经在 main() 的 finally 块中调用
# asyncio.run(crawler.close()) # 这里只处理数据库关闭
pass if config.SAVE_DATA_OPTION in ["db", "sqlite", "postgresql"]:
if config.SAVE_DATA_OPTION in ["db", "sqlite"]: try:
asyncio.run(db.close()) asyncio.run(db.close())
except Exception as e:
print(f"[MediaCrawler] 关闭数据库连接时出错: {e}")
if __name__ == "__main__": if __name__ == "__main__":
@@ -362,13 +362,16 @@ class DouYinCrawler(AbstractCrawler):
async def close(self) -> None: async def close(self) -> None:
"""Close browser context""" """Close browser context"""
try:
# 如果使用CDP模式,需要特殊处理 # 如果使用CDP模式,需要特殊处理
if self.cdp_manager: if self.cdp_manager:
await self.cdp_manager.cleanup() await self.cdp_manager.cleanup()
self.cdp_manager = None self.cdp_manager = None
else: elif self.browser_context:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[DouYinCrawler.close] Browser context closed ...") utils.logger.info("[DouYinCrawler.close] Browser context closed ...")
except Exception as e:
utils.logger.error(f"[DouYinCrawler.close] An error occurred during close: {e}")
async def get_aweme_media(self, aweme_item: Dict): async def get_aweme_media(self, aweme_item: Dict):
""" """
@@ -426,10 +426,13 @@ class KuaishouCrawler(AbstractCrawler):
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
try:
# 如果使用CDP模式,需要特殊处理 # 如果使用CDP模式,需要特殊处理
if self.cdp_manager: if self.cdp_manager:
await self.cdp_manager.cleanup() await self.cdp_manager.cleanup()
self.cdp_manager = None self.cdp_manager = None
else: elif self.browser_context:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[KuaishouCrawler.close] Browser context closed ...") utils.logger.info("[KuaishouCrawler.close] Browser context closed ...")
except Exception as e:
utils.logger.error(f"[KuaishouCrawler.close] An error occurred during close: {e}")
@@ -10,7 +10,7 @@
import asyncio import asyncio
import json import json
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union, Set
from urllib.parse import urlencode, quote from urllib.parse import urlencode, quote
import requests import requests
@@ -328,6 +328,8 @@ class BaiduTieBaClient(AbstractApiClient):
result: List[TiebaComment] = [] result: List[TiebaComment] = []
current_page = 1 current_page = 1
seen_comment_ids: Set[str] = set()
duplicate_page_count = 0
while note_detail.total_replay_page >= current_page and len(result) < max_count: while note_detail.total_replay_page >= current_page and len(result) < max_count:
# 构造评论页URL # 构造评论页URL
@@ -353,6 +355,26 @@ class BaiduTieBaClient(AbstractApiClient):
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有评论,停止爬取") utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有评论,停止爬取")
break break
if config.ENABLE_COMMENT_DEDUP:
new_comments: List[TiebaComment] = []
for comment in comments:
comment_id = getattr(comment, "comment_id", None)
if comment_id and comment_id not in seen_comment_ids:
seen_comment_ids.add(comment_id)
new_comments.append(comment)
if not new_comments:
duplicate_page_count += 1
utils.logger.info(
f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有出现新的评论(重复数据),计数={duplicate_page_count}"
)
if duplicate_page_count >= config.COMMENT_DUP_BREAK_THRESHOLD:
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 连续 {duplicate_page_count} 页无新增评论,提前结束抓取")
break
current_page += 1
continue
comments = new_comments
duplicate_page_count = 0
# 限制评论数量 # 限制评论数量
if len(result) + len(comments) > max_count: if len(result) + len(comments) > max_count:
comments = comments[:max_count - len(result)] comments = comments[:max_count - len(result)]
@@ -408,6 +430,8 @@ class BaiduTieBaClient(AbstractApiClient):
current_page = 1 current_page = 1
max_sub_page_num = parment_comment.sub_comment_count // 10 + 1 max_sub_page_num = parment_comment.sub_comment_count // 10 + 1
seen_sub_ids: Set[str] = set()
duplicate_page_count = 0
while max_sub_page_num >= current_page: while max_sub_page_num >= current_page:
# 构造子评论URL # 构造子评论URL
@@ -442,6 +466,28 @@ class BaiduTieBaClient(AbstractApiClient):
) )
break break
if config.ENABLE_COMMENT_DEDUP:
new_sub_comments: List[TiebaComment] = []
for sub_comment in sub_comments:
sub_comment_id = getattr(sub_comment, "comment_id", None)
if sub_comment_id and sub_comment_id not in seen_sub_ids:
seen_sub_ids.add(sub_comment_id)
new_sub_comments.append(sub_comment)
if not new_sub_comments:
duplicate_page_count += 1
utils.logger.info(
f"[BaiduTieBaClient.get_comments_all_sub_comments] 评论{parment_comment.comment_id}{current_page}页未出现新子评论,计数={duplicate_page_count}"
)
if duplicate_page_count >= config.COMMENT_DUP_BREAK_THRESHOLD:
utils.logger.info(
f"[BaiduTieBaClient.get_comments_all_sub_comments] 评论{parment_comment.comment_id}连续 {duplicate_page_count} 页无新增子评论,提前结束"
)
break
current_page += 1
continue
sub_comments = new_sub_comments
duplicate_page_count = 0
if callback: if callback:
await callback(parment_comment.note_id, sub_comments) await callback(parment_comment.note_id, sub_comments)
@@ -662,10 +662,13 @@ class TieBaCrawler(AbstractCrawler):
Returns: Returns:
""" """
try:
# 如果使用CDP模式,需要特殊处理 # 如果使用CDP模式,需要特殊处理
if self.cdp_manager: if self.cdp_manager:
await self.cdp_manager.cleanup() await self.cdp_manager.cleanup()
self.cdp_manager = None self.cdp_manager = None
else: elif self.browser_context:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[BaiduTieBaCrawler.close] Browser context closed ...") utils.logger.info("[BaiduTieBaCrawler.close] Browser context closed ...")
except Exception as e:
utils.logger.error(f"[BaiduTieBaCrawler.close] An error occurred during close: {e}")
@@ -383,10 +383,13 @@ class WeiboCrawler(AbstractCrawler):
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
try:
# 如果使用CDP模式,需要特殊处理 # 如果使用CDP模式,需要特殊处理
if self.cdp_manager: if self.cdp_manager:
await self.cdp_manager.cleanup() await self.cdp_manager.cleanup()
self.cdp_manager = None self.cdp_manager = None
else: elif self.browser_context:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[WeiboCrawler.close] Browser context closed ...") utils.logger.info("[WeiboCrawler.close] Browser context closed ...")
except Exception as e:
utils.logger.error(f"[WeiboCrawler.close] An error occurred during close: {e}")
@@ -437,13 +437,16 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
try:
# 如果使用CDP模式,需要特殊处理 # 如果使用CDP模式,需要特殊处理
if self.cdp_manager: if self.cdp_manager:
await self.cdp_manager.cleanup() await self.cdp_manager.cleanup()
self.cdp_manager = None self.cdp_manager = None
else: elif self.browser_context:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...") utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...")
except Exception as e:
utils.logger.error(f"[XiaoHongShuCrawler.close] An error occurred during close: {e}")
async def get_notice_media(self, note_detail: Dict): async def get_notice_media(self, note_detail: Dict):
if not config.ENABLE_GET_MEIDAS: if not config.ENABLE_GET_MEIDAS:
@@ -139,6 +139,12 @@ class ZhihuCrawler(AbstractCrawler):
if config.CRAWLER_MAX_NOTES_COUNT < zhihu_limit_count: if config.CRAWLER_MAX_NOTES_COUNT < zhihu_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = zhihu_limit_count config.CRAWLER_MAX_NOTES_COUNT = zhihu_limit_count
start_page = config.START_PAGE start_page = config.START_PAGE
# 统计信息
total_saved_contents = 0
total_failed_contents = 0
total_saved_comments = 0
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info( utils.logger.info(
@@ -164,7 +170,7 @@ class ZhihuCrawler(AbstractCrawler):
) )
) )
utils.logger.info( utils.logger.info(
f"[ZhihuCrawler.search] Search contents :{content_list}" f"[ZhihuCrawler.search] Search contents :{len(content_list)}"
) )
if not content_list: if not content_list:
utils.logger.info("No more content!") utils.logger.info("No more content!")
@@ -175,14 +181,42 @@ class ZhihuCrawler(AbstractCrawler):
utils.logger.info(f"[ZhihuCrawler.search] Sleeping for {config.CRAWLER_MAX_SLEEP_SEC} seconds after page {page-1}") utils.logger.info(f"[ZhihuCrawler.search] Sleeping for {config.CRAWLER_MAX_SLEEP_SEC} seconds after page {page-1}")
page += 1 page += 1
# 保存内容,添加异常处理和统计
saved_count = 0
failed_count = 0
for content in content_list: for content in content_list:
try:
await zhihu_store.update_zhihu_content(content) await zhihu_store.update_zhihu_content(content)
saved_count += 1
except Exception as e:
failed_count += 1
utils.logger.error(
f"[ZhihuCrawler.search] 保存内容失败 (content_id={content.content_id}): {e}"
)
if saved_count > 0:
utils.logger.info(
f"[ZhihuCrawler.search] 关键词 '{keyword}'{page-1} 页: 成功保存 {saved_count} 条内容"
)
total_saved_contents += saved_count
if failed_count > 0:
utils.logger.warning(
f"[ZhihuCrawler.search] 关键词 '{keyword}'{page-1} 页: 保存失败 {failed_count} 条内容"
)
total_failed_contents += failed_count
await self.batch_get_content_comments(content_list) await self.batch_get_content_comments(content_list)
except DataFetchError: except DataFetchError:
utils.logger.error("[ZhihuCrawler.search] Search content error") utils.logger.error("[ZhihuCrawler.search] Search content error")
return return
# 输出最终统计信息
utils.logger.info(
f"[ZhihuCrawler.search] 关键词搜索完成统计: "
f"成功保存 {total_saved_contents} 条内容, "
f"失败 {total_failed_contents} 条内容"
)
async def batch_get_content_comments(self, content_list: List[ZhihuContent]): async def batch_get_content_comments(self, content_list: List[ZhihuContent]):
""" """
Batch get content comments Batch get content comments
@@ -473,10 +507,13 @@ class ZhihuCrawler(AbstractCrawler):
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
try:
# 如果使用CDP模式,需要特殊处理 # 如果使用CDP模式,需要特殊处理
if self.cdp_manager: if self.cdp_manager:
await self.cdp_manager.cleanup() await self.cdp_manager.cleanup()
self.cdp_manager = None self.cdp_manager = None
else: elif self.browser_context:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[ZhihuCrawler.close] Browser context closed ...") utils.logger.info("[ZhihuCrawler.close] Browser context closed ...")
except Exception as e:
utils.logger.error(f"[ZhihuCrawler.close] An error occurred during close: {e}")
@@ -36,9 +36,11 @@ class ZhihuStoreFactory:
def create_store() -> AbstractStore: def create_store() -> AbstractStore:
store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION) store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class: if not store_class:
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or postgresql ...") raise ValueError(
"[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or postgresql ...")
return store_class() return store_class()
async def batch_update_zhihu_contents(contents: List[ZhihuContent]): async def batch_update_zhihu_contents(contents: List[ZhihuContent]):
""" """
批量更新知乎内容 批量更新知乎内容
@@ -54,6 +56,7 @@ async def batch_update_zhihu_contents(contents: List[ZhihuContent]):
for content_item in contents: for content_item in contents:
await update_zhihu_content(content_item) await update_zhihu_content(content_item)
async def update_zhihu_content(content_item: ZhihuContent): async def update_zhihu_content(content_item: ZhihuContent):
""" """
更新知乎内容 更新知乎内容
@@ -70,7 +73,6 @@ async def update_zhihu_content(content_item: ZhihuContent):
await ZhihuStoreFactory.create_store().store_content(local_db_item) await ZhihuStoreFactory.create_store().store_content(local_db_item)
async def batch_update_zhihu_note_comments(comments: List[ZhihuComment]): async def batch_update_zhihu_note_comments(comments: List[ZhihuComment]):
""" """
批量更新知乎内容评论 批量更新知乎内容评论
@@ -94,6 +94,7 @@ class ZhihuDbStoreImplement(AbstractStore):
content_item: content item dict content_item: content item dict
""" """
content_id = content_item.get("content_id") content_id = content_item.get("content_id")
try:
async with get_session() as session: async with get_session() as session:
stmt = select(ZhihuContent).where(ZhihuContent.content_id == content_id) stmt = select(ZhihuContent).where(ZhihuContent.content_id == content_id)
result = await session.execute(stmt) result = await session.execute(stmt)
@@ -101,10 +102,16 @@ class ZhihuDbStoreImplement(AbstractStore):
if existing_content: if existing_content:
for key, value in content_item.items(): for key, value in content_item.items():
setattr(existing_content, key, value) setattr(existing_content, key, value)
utils.logger.debug(f"[ZhihuDbStore] 更新内容: {content_id}")
else: else:
new_content = ZhihuContent(**content_item) new_content = ZhihuContent(**content_item)
session.add(new_content) session.add(new_content)
utils.logger.debug(f"[ZhihuDbStore] 新增内容: {content_id}")
await session.commit() await session.commit()
utils.logger.info(f"[ZhihuDbStore] 成功保存内容到数据库: {content_id}")
except Exception as e:
utils.logger.error(f"[ZhihuDbStore] 保存内容失败 (content_id={content_id}): {e}")
raise
async def store_comment(self, comment_item: Dict): async def store_comment(self, comment_item: Dict):
""" """
@@ -113,6 +120,7 @@ class ZhihuDbStoreImplement(AbstractStore):
comment_item: comment item dict comment_item: comment item dict
""" """
comment_id = comment_item.get("comment_id") comment_id = comment_item.get("comment_id")
try:
async with get_session() as session: async with get_session() as session:
stmt = select(ZhihuComment).where(ZhihuComment.comment_id == comment_id) stmt = select(ZhihuComment).where(ZhihuComment.comment_id == comment_id)
result = await session.execute(stmt) result = await session.execute(stmt)
@@ -120,10 +128,16 @@ class ZhihuDbStoreImplement(AbstractStore):
if existing_comment: if existing_comment:
for key, value in comment_item.items(): for key, value in comment_item.items():
setattr(existing_comment, key, value) setattr(existing_comment, key, value)
utils.logger.debug(f"[ZhihuDbStore] 更新评论: {comment_id}")
else: else:
new_comment = ZhihuComment(**comment_item) new_comment = ZhihuComment(**comment_item)
session.add(new_comment) session.add(new_comment)
utils.logger.debug(f"[ZhihuDbStore] 新增评论: {comment_id}")
await session.commit() await session.commit()
utils.logger.info(f"[ZhihuDbStore] 成功保存评论到数据库: {comment_id}")
except Exception as e:
utils.logger.error(f"[ZhihuDbStore] 保存评论失败 (comment_id={comment_id}): {e}")
raise
async def store_creator(self, creator: Dict): async def store_creator(self, creator: Dict):
""" """
@@ -32,6 +32,7 @@ class KeywordManager:
def __init__(self): def __init__(self):
"""初始化关键词管理器""" """初始化关键词管理器"""
self.engine: Engine = None self.engine: Engine = None
self.custom_keywords_path: Optional[Path] = self._resolve_custom_keywords_path()
self.connect() self.connect()
def connect(self): def connect(self):
@@ -68,24 +69,31 @@ class KeywordManager:
Returns: Returns:
关键词列表 关键词列表
""" """
if not getattr(settings, "ENABLE_KEYWORD_SEARCH", True):
logger.info("关键词搜索已通过配置禁用,返回默认关键词列表")
return self._limit_keywords(self._get_default_keywords(), max_keywords)
if not target_date: if not target_date:
target_date = date.today() target_date = date.today()
if getattr(settings, "USE_DEFAULT_KEYWORDS_ONLY", False):
logger.info("配置启用默认关键词模式,直接返回默认关键词")
return self._limit_keywords(self._get_default_keywords(), max_keywords)
logger.info(f"正在获取 {target_date} 的关键词...") logger.info(f"正在获取 {target_date} 的关键词...")
# 优先使用自定义关键词
custom_keywords = self._get_custom_keywords(target_date, max_keywords)
if custom_keywords:
return custom_keywords
# 首先尝试获取指定日期的关键词 # 首先尝试获取指定日期的关键词
topics_data = self.get_daily_topics(target_date) topics_data = self.get_daily_topics(target_date)
if topics_data and topics_data.get('keywords'): if topics_data and topics_data.get('keywords'):
keywords = topics_data['keywords'] keywords = topics_data['keywords']
logger.info(f"成功获取 {target_date}{len(keywords)} 个关键词") logger.info(f"成功获取 {target_date}{len(keywords)} 个关键词")
return self._limit_keywords(keywords, max_keywords)
# 如果关键词太多,随机选择指定数量
if len(keywords) > max_keywords:
keywords = random.sample(keywords, max_keywords)
logger.info(f"随机选择了 {max_keywords} 个关键词")
return keywords
# 如果没有当天的关键词,尝试获取最近几天的 # 如果没有当天的关键词,尝试获取最近几天的
logger.info(f"{target_date} 没有关键词数据,尝试获取最近的关键词...") logger.info(f"{target_date} 没有关键词数据,尝试获取最近的关键词...")
@@ -100,15 +108,14 @@ class KeywordManager:
# 去重并限制数量 # 去重并限制数量
unique_keywords = list(set(all_keywords)) unique_keywords = list(set(all_keywords))
if len(unique_keywords) > max_keywords: limited_keywords = self._limit_keywords(unique_keywords, max_keywords)
unique_keywords = random.sample(unique_keywords, max_keywords)
logger.info(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词") logger.info(f"从最近7天的数据中获取到 {len(limited_keywords)} 个关键词")
return unique_keywords return limited_keywords
# 如果都没有,返回默认关键词 # 如果都没有,返回默认关键词
logger.info("没有找到任何关键词数据,使用默认关键词") logger.info("没有找到任何关键词数据,使用默认关键词")
return self._get_default_keywords() return self._limit_keywords(self._get_default_keywords(), max_keywords)
def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]: def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
""" """
@@ -177,16 +184,74 @@ class KeywordManager:
logger.exception(f"获取最近话题分析失败: {e}") logger.exception(f"获取最近话题分析失败: {e}")
return [] return []
def _resolve_custom_keywords_path(self) -> Optional[Path]:
"""解析自定义关键词文件路径"""
raw_path = getattr(settings, "CUSTOM_KEYWORDS_FILE", None)
if not raw_path:
return None
path = Path(raw_path).expanduser()
if not path.is_absolute():
path = project_root / path
return path
@staticmethod
def _limit_keywords(keywords: List[str], max_keywords: int) -> List[str]:
"""根据最大数量限制关键词"""
if not keywords:
return []
if max_keywords and len(keywords) > max_keywords:
keywords = random.sample(keywords, max_keywords)
return keywords
def _get_custom_keywords(self, target_date: date, max_keywords: int) -> Optional[List[str]]:
"""
从自定义关键词文件中获取指定日期的关键词
支持格式
1. ["AI", "大模型"]
2. {"2025-11-26": ["AI"], "default": ["科技"]}
"""
if not self.custom_keywords_path:
return None
path = self.custom_keywords_path
if not path.exists():
logger.warning(f"自定义关键词文件不存在: {path}")
return None
try:
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
logger.error(f"读取自定义关键词文件失败({path}): {e}")
return None
keywords: Optional[List[str]] = None
if isinstance(data, list):
keywords = data
elif isinstance(data, dict):
date_key = target_date.isoformat()
if date_key in data and isinstance(data[date_key], list):
keywords = data[date_key]
elif "default" in data and isinstance(data["default"], list):
keywords = data["default"]
elif "keywords" in data and isinstance(data["keywords"], list):
keywords = data["keywords"]
if not keywords:
return None
cleaned_keywords = [kw.strip() for kw in keywords if isinstance(kw, str) and kw.strip()]
if not cleaned_keywords:
return None
limited = self._limit_keywords(cleaned_keywords, max_keywords)
logger.info(f"使用自定义关键词({path})共 {len(limited)}")
return limited
def _get_default_keywords(self) -> List[str]: def _get_default_keywords(self) -> List[str]:
"""获取默认关键词列表""" """获取默认关键词列表"""
return [ return [
"科技", "人工智能", "AI", "编程", "互联网", "F6智慧门店","南京爱福路汽车科技有限公司","汽车后市场","汽修店","新康众"
"创业", "投资", "理财", "股市", "经济",
"教育", "学习", "考试", "大学", "就业",
"健康", "养生", "运动", "美食", "旅游",
"时尚", "美妆", "购物", "生活", "家居",
"电影", "音乐", "游戏", "娱乐", "明星",
"新闻", "热点", "社会", "政策", "环保"
] ]
def get_all_keywords_for_platforms(self, platforms: List[str], target_date: date = None, def get_all_keywords_for_platforms(self, platforms: List[str], target_date: date = None,
@@ -286,6 +351,16 @@ class KeywordManager:
if not target_date: if not target_date:
target_date = date.today() target_date = date.today()
# 当配置为仅使用默认关键词时,直接返回默认关键词的摘要,避免上层误判为“无数据”
if getattr(settings, "USE_DEFAULT_KEYWORDS_ONLY", False):
default_keywords = self._get_default_keywords()
return {
'date': target_date,
'keywords_count': len(default_keywords),
'summary': '使用默认关键词模式',
'has_data': bool(default_keywords)
}
topics_data = self.get_daily_topics(target_date) topics_data = self.get_daily_topics(target_date)
if topics_data: if topics_data:
@@ -251,16 +251,30 @@ postgresql_db_config = {{
logger.info(f"执行命令: {' '.join(cmd)}") logger.info(f"执行命令: {' '.join(cmd)}")
# 切换到MediaCrawler目录并执行 # 切换到MediaCrawler目录并执行,捕获输出
result = subprocess.run( result = subprocess.run(
cmd, cmd,
cwd=self.mediacrawler_path, cwd=self.mediacrawler_path,
timeout=3600 # 60分钟超时 timeout=3600, # 60分钟超时
capture_output=True,
text=True,
encoding='utf-8',
errors='replace'
) )
end_time = datetime.now() end_time = datetime.now()
duration = (end_time - start_time).total_seconds() duration = (end_time - start_time).total_seconds()
# 解析输出,提取实际保存的数据量
output_lines = result.stdout.split('\n') if result.stdout else []
error_lines = result.stderr.split('\n') if result.stderr else []
# 合并所有输出行用于解析
all_lines = output_lines + error_lines
# 解析统计信息
parsed_stats = self._parse_crawl_output(all_lines, error_lines)
# 创建统计信息 # 创建统计信息
crawl_stats = { crawl_stats = {
"platform": platform, "platform": platform,
@@ -270,9 +284,11 @@ postgresql_db_config = {{
"end_time": end_time.isoformat(), "end_time": end_time.isoformat(),
"return_code": result.returncode, "return_code": result.returncode,
"success": result.returncode == 0, "success": result.returncode == 0,
"notes_count": 0, "notes_count": parsed_stats.get("notes_count", 0),
"comments_count": 0, "comments_count": parsed_stats.get("comments_count", 0),
"errors_count": 0 "errors_count": parsed_stats.get("errors_count", 0),
"output_preview": '\n'.join(output_lines[-20:]) if output_lines else "", # 最后20行输出
"error_preview": '\n'.join(error_lines[-20:]) if error_lines else "" # 最后20行错误
} }
# 保存统计信息 # 保存统计信息
@@ -280,8 +296,16 @@ postgresql_db_config = {{
if result.returncode == 0: if result.returncode == 0:
logger.info(f"{platform} 爬取完成,耗时: {duration:.1f}") logger.info(f"{platform} 爬取完成,耗时: {duration:.1f}")
logger.info(f" 保存内容: {crawl_stats['notes_count']} 条,评论: {crawl_stats['comments_count']}")
if crawl_stats['notes_count'] == 0 and crawl_stats['comments_count'] == 0:
logger.warning(f"⚠️ {platform} 爬取成功但未保存任何数据,请检查数据库连接和保存逻辑")
# 输出部分日志用于调试
if crawl_stats['error_preview']:
logger.warning(f" 错误信息: {crawl_stats['error_preview'][:500]}")
else: else:
logger.error(f"{platform} 爬取失败,返回码: {result.returncode}") logger.error(f"{platform} 爬取失败,返回码: {result.returncode}")
if error_lines:
logger.error(f" 错误信息: {crawl_stats['error_preview'][:500]}")
return crawl_stats return crawl_stats
@@ -294,6 +318,7 @@ postgresql_db_config = {{
def _parse_crawl_output(self, output_lines: List[str], error_lines: List[str]) -> Dict: def _parse_crawl_output(self, output_lines: List[str], error_lines: List[str]) -> Dict:
"""解析爬取输出,提取统计信息""" """解析爬取输出,提取统计信息"""
import re
stats = { stats = {
"notes_count": 0, "notes_count": 0,
"comments_count": 0, "comments_count": 0,
@@ -301,32 +326,60 @@ postgresql_db_config = {{
"login_required": False "login_required": False
} }
# 解析输出行 # 合并所有行用于解析
for line in output_lines: all_lines = output_lines + error_lines
if "条笔记" in line or "条内容" in line:
# 解析输出行,查找各种可能的数据保存信息
for line in all_lines:
line_lower = line.lower()
# 查找保存的内容数量(多种可能的格式)
# 例如:"保存了 10 条笔记"、"成功保存 5 条内容"、"inserted 3 records"等
if any(keyword in line_lower for keyword in ["条笔记", "条内容", "条视频", "条帖子", "条回答"]):
try: try:
# 提取数字 # 提取数字,优先取第一个数字
import re
numbers = re.findall(r'\d+', line) numbers = re.findall(r'\d+', line)
if numbers: if numbers:
stats["notes_count"] = int(numbers[0]) # 如果找到多个数字,取最大的(通常是总数)
potential_count = max([int(n) for n in numbers])
if potential_count > stats["notes_count"]:
stats["notes_count"] = potential_count
except: except:
pass pass
elif "条评论" in line:
# 查找保存的评论数量
if "条评论" in line_lower:
try: try:
import re
numbers = re.findall(r'\d+', line) numbers = re.findall(r'\d+', line)
if numbers: if numbers:
stats["comments_count"] = int(numbers[0]) potential_count = max([int(n) for n in numbers])
if potential_count > stats["comments_count"]:
stats["comments_count"] = potential_count
except: except:
pass pass
elif "登录" in line or "扫码" in line:
# 查找数据库相关错误
if any(keyword in line_lower for keyword in ["数据库", "database", "connection", "连接失败", "保存失败"]):
if "error" in line_lower or "失败" in line_lower or "异常" in line_lower:
stats["errors_count"] += 1
# 查找登录相关
if any(keyword in line_lower for keyword in ["登录", "扫码", "login", "需要登录"]):
stats["login_required"] = True stats["login_required"] = True
# 解析错误行 # 如果没有找到明确的保存数量,尝试从数据库操作日志中提取
for line in error_lines: if stats["notes_count"] == 0 and stats["comments_count"] == 0:
if "error" in line.lower() or "异常" in line: # 查找可能的数据库插入信息
stats["errors_count"] += 1 for line in all_lines:
# 查找类似 "insert into" 或 "保存到" 的信息
if "insert" in line_lower or "保存到" in line_lower:
try:
numbers = re.findall(r'\d+', line)
if numbers:
# 尝试提取可能的记录数
pass # 这里可以进一步解析
except:
pass
return stats return stats
+12
View File
@@ -22,6 +22,18 @@ class Settings(BaseSettings):
DB_PASSWORD: str = Field("your_password", description="数据库密码") DB_PASSWORD: str = Field("your_password", description="数据库密码")
DB_NAME: str = Field("mindspider", description="数据库名称") DB_NAME: str = Field("mindspider", description="数据库名称")
DB_CHARSET: str = Field("utf8mb4", description="数据库字符集") DB_CHARSET: str = Field("utf8mb4", description="数据库字符集")
CUSTOM_KEYWORDS_FILE: Optional[str] = Field(
None,
description="自定义关键词文件路径(可为绝对路径或相对MindSpider目录的路径)"
)
USE_DEFAULT_KEYWORDS_ONLY: bool = Field(
True,
description="为True时忽略数据库/自定义结果,直接使用默认关键词"
)
ENABLE_KEYWORD_SEARCH: bool = Field(
True,
description="开启后运行基于关键词的爬取流程,关闭则完全跳过关键词搜索"
)
MINDSPIDER_API_KEY: Optional[str] = Field(None, description="MINDSPIDER API密钥") MINDSPIDER_API_KEY: Optional[str] = Field(None, description="MINDSPIDER API密钥")
MINDSPIDER_BASE_URL: Optional[str] = Field("https://api.deepseek.com", description="MINDSPIDER API基础URL,推荐deepseek-chat模型使用https://api.deepseek.com") MINDSPIDER_BASE_URL: Optional[str] = Field("https://api.deepseek.com", description="MINDSPIDER API基础URL,推荐deepseek-chat模型使用https://api.deepseek.com")
MINDSPIDER_MODEL_NAME: Optional[str] = Field("deepseek-chat", description="MINDSPIDER API模型名称, 推荐deepseek-chat") MINDSPIDER_MODEL_NAME: Optional[str] = Field("deepseek-chat", description="MINDSPIDER API模型名称, 推荐deepseek-chat")