From ff1ce2a3babb89f59f7efcddae5fb05c3f307046 Mon Sep 17 00:00:00 2001 From: z66 <1415243231@qq.com> Date: Tue, 16 Dec 2025 10:56:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=83=A8=E5=88=86=E7=88=AC?= =?UTF-8?q?=E8=99=AB=E4=BB=A5=E5=85=BC=E5=AE=B9=E6=9C=AC=E5=9C=B0=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E5=8F=8A=E6=95=B0=E6=8D=AE=E5=BA=93=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../MediaCrawler/config/base_config.py | 10 +- .../MediaCrawler/database/db_session.py | 8 +- .../MediaCrawler/main.py | 56 ++-- .../media_platform/bilibili/client.py | 23 +- .../media_platform/bilibili/login.py | 13 + .../media_platform/douyin/client.py | 24 +- .../media_platform/douyin/core.py | 35 +- .../media_platform/kuaishou/client.py | 52 ++- .../media_platform/kuaishou/core.py | 21 +- .../media_platform/kuaishou/login.py | 55 +++- .../media_platform/tieba/client.py | 304 ++++++++++++++++++ .../media_platform/weibo/client.py | 40 ++- .../MediaCrawler/media_platform/weibo/core.py | 45 ++- .../media_platform/zhihu/client.py | 19 +- .../MediaCrawler/media_platform/zhihu/core.py | 91 +++++- .../store/bilibili/_store_impl.py | 26 ++ .../MediaCrawler/store/douyin/_store_impl.py | 24 ++ .../store/kuaishou/_store_impl.py | 24 ++ .../MediaCrawler/store/tieba/_store_impl.py | 26 ++ .../MediaCrawler/store/weibo/__init__.py | 14 +- .../MediaCrawler/store/weibo/_store_impl.py | 147 +++++++-- .../MediaCrawler/store/xhs/_store_impl.py | 28 ++ .../MediaCrawler/store/zhihu/__init__.py | 30 +- .../MediaCrawler/store/zhihu/_store_impl.py | 82 ++++- .../MediaCrawler/tools/utils.py | 180 ++++++++++- .../DeepSentimentCrawling/keyword_manager.py | 3 +- .../DeepSentimentCrawling/platform_crawler.py | 132 +++++++- MindSpider/config.py | 8 + 28 files changed, 1394 insertions(+), 126 deletions(-) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py index f59a864..5aafc36 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/config/base_config.py @@ -9,8 +9,8 @@ # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 基础配置 -PLATFORM = "zhihu" # 平台,xhs | dy | ks | bili | wb | tieba | zhihu -KEYWORDS = "F6智慧门店,南京爱福路汽车科技有限公司,汽车后市场,汽修店,新康众" # 关键词搜索配置,以英文逗号分隔 +PLATFORM = "ks" # 平台,xhs | dy | ks | bili | wb | tieba | zhihu +KEYWORDS = "F6智慧门店,F6智数,中国汽车后市场白皮书,南京爱福路汽车科技有限公司,汽车后市场,汽车修理厂,新康众,天猫养车,汽后,汽修厂,爱福路,康众" # 关键词搜索配置,以英文逗号分隔 LOGIN_TYPE = "qrcode" # qrcode or phone or cookie COOKIES = "" CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) @@ -30,6 +30,12 @@ IP_PROXY_PROVIDER_NAME = "kuaidaili" # kuaidaili | wandouhttp # 抖音如果一直提示失败,打开浏览器看下是否扫码登录之后出现了手机号验证,如果出现了手动过一下再试。 HEADLESS = True +# HTTP/网络配置 +# 如内网有自签名证书导致 TLS 失败,可临时置为 False +HTTPX_VERIFY = False +# 若需要指定上游代理(如 http://user:pass@host:port),填此值;留空使用系统/环境变量 +HTTPX_PROXY = "" + # 是否保存登录状态 SAVE_LOGIN_STATE = True diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/database/db_session.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/database/db_session.py index 64e2647..65846a5 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/database/db_session.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/database/db_session.py @@ -13,10 +13,11 @@ _engines = {} async def create_database_if_not_exists(db_type: str): if db_type == "mysql" or db_type == "db": # Connect to the server without a database - server_url = f"mysql+asyncmy://{mysql_db_config['user']}:{mysql_db_config['password']}@{mysql_db_config['host']}:{mysql_db_config['port']}" + server_url = f"mysql+asyncmy://{mysql_db_config['user']}:{mysql_db_config['password']}@{mysql_db_config['host']}:{mysql_db_config['port']}?charset=utf8mb4" engine = create_async_engine(server_url, echo=False) async with engine.connect() as conn: - await conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {mysql_db_config['db_name']}")) + # 确保数据库使用utf8mb4字符集 + await conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {mysql_db_config['db_name']} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")) await engine.dispose() elif db_type == "postgresql": # Connect to PostgreSQL default database (postgres) to create target database @@ -48,7 +49,8 @@ def get_async_engine(db_type: str = None): if db_type == "sqlite": db_url = f"sqlite+aiosqlite:///{sqlite_db_config['db_path']}" elif db_type == "mysql" or db_type == "db": - db_url = f"mysql+asyncmy://{mysql_db_config['user']}:{mysql_db_config['password']}@{mysql_db_config['host']}:{mysql_db_config['port']}/{mysql_db_config['db_name']}" + # 添加charset=utf8mb4以支持完整的UTF-8编码(包括emoji和中文) + db_url = f"mysql+asyncmy://{mysql_db_config['user']}:{mysql_db_config['password']}@{mysql_db_config['host']}:{mysql_db_config['port']}/{mysql_db_config['db_name']}?charset=utf8mb4" elif db_type == "postgresql": db_url = f"postgresql+asyncpg://{postgresql_db_config['user']}:{postgresql_db_config['password']}@{postgresql_db_config['host']}:{postgresql_db_config['port']}/{postgresql_db_config['db_name']}" else: diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py index fc7a416..dabf584 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/main.py @@ -11,42 +11,54 @@ import asyncio import sys -from typing import Optional +from typing import Dict, Optional, Type + +import importlib import cmd_arg import config from database import db from base.base_crawler import AbstractCrawler -from media_platform.bilibili import BilibiliCrawler -from media_platform.douyin import DouYinCrawler -from media_platform.kuaishou import KuaishouCrawler -from media_platform.tieba import TieBaCrawler -from media_platform.weibo import WeiboCrawler -from media_platform.xhs import XiaoHongShuCrawler -from media_platform.zhihu import ZhihuCrawler from tools.async_file_writer import AsyncFileWriter from var import crawler_type_var class CrawlerFactory: - CRAWLERS = { - "xhs": XiaoHongShuCrawler, - "dy": DouYinCrawler, - "ks": KuaishouCrawler, - "bili": BilibiliCrawler, - "wb": WeiboCrawler, - "tieba": TieBaCrawler, - "zhihu": ZhihuCrawler, + _CRAWLER_PATHS = { + "xhs": "media_platform.xhs.XiaoHongShuCrawler", + "dy": "media_platform.douyin.DouYinCrawler", + "ks": "media_platform.kuaishou.KuaishouCrawler", + "bili": "media_platform.bilibili.BilibiliCrawler", + "wb": "media_platform.weibo.WeiboCrawler", + "tieba": "media_platform.tieba.TieBaCrawler", + "zhihu": "media_platform.zhihu.ZhihuCrawler", } + _cache: Dict[str, Type[AbstractCrawler]] = {} @staticmethod def create_crawler(platform: str) -> AbstractCrawler: - crawler_class = CrawlerFactory.CRAWLERS.get(platform) - if not crawler_class: + path = CrawlerFactory._CRAWLER_PATHS.get(platform) + if not path: raise ValueError( "Invalid Media Platform Currently only supported xhs or dy or ks or bili ..." ) - return crawler_class() + + if platform not in CrawlerFactory._cache: + module_name, class_name = path.rsplit(".", 1) + try: + module = importlib.import_module(module_name) + crawler_class = getattr(module, class_name) + except ModuleNotFoundError as exc: + hint = ( + "Please install optional dependency 'xhshow' (pip install xhshow) " + "or disable the xhs platform." + if platform == "xhs" and exc.name == "xhshow" + else f"Missing dependency while importing {module_name}" + ) + raise ModuleNotFoundError(f"{exc}: {hint}") from exc + CrawlerFactory._cache[platform] = crawler_class + + return CrawlerFactory._cache[platform]() crawler: Optional[AbstractCrawler] = None @@ -59,6 +71,12 @@ crawler: Optional[AbstractCrawler] = None async def main(): # Init crawler global crawler + + # 导入工具模块以初始化日志 + from tools import utils + utils.logger.info("=" * 60) + utils.logger.info("MediaCrawler 启动") + utils.logger.info("=" * 60) # parse cmd args = await cmd_arg.parse_cmd() diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/client.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/client.py index 7019c10..f4ae3b7 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/client.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/client.py @@ -49,8 +49,27 @@ class BilibiliClient(AbstractApiClient): self.cookie_dict = cookie_dict async def request(self, method, url, **kwargs) -> Any: - async with httpx.AsyncClient(proxy=self.proxy) as client: - response = await client.request(method, url, timeout=self.timeout, **kwargs) + """ + Basic HTTP request wrapper with retries for transient network errors. + """ + verify = getattr(config, "HTTPX_VERIFY", True) + # 优先使用传入 proxy,其次是 config.HTTPX_PROXY,最后走系统环境变量 + proxy = self.proxy or getattr(config, "HTTPX_PROXY", "") or None + + async with httpx.AsyncClient(proxy=proxy, timeout=self.timeout, verify=verify) as client: + # 简单重试,处理短暂的连接失败 + last_exc: Optional[Exception] = None + for attempt in range(3): + try: + response = await client.request(method, url, **kwargs) + break + except httpx.HTTPError as e: + last_exc = e + if attempt == 2: + # 3rd failure -> give up + utils.logger.error(f"[BilibiliClient.request] Network error on {method} {url}: {repr(e)}") + raise DataFetchError(f"network error: {e}") from e + await asyncio.sleep(1) try: data: Dict = response.json() except json.JSONDecodeError: diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/login.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/login.py index ffefb63..3bf3a8c 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/login.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/bilibili/login.py @@ -68,10 +68,23 @@ class BilibiliLogin(AbstractLogin): return True return False + async def _has_valid_login_cookie(self) -> bool: + """ + 快速检查当前上下文是否已有登录态,用于避免重复扫码。 + """ + current_cookie = await self.browser_context.cookies() + _, cookie_dict = utils.convert_cookies(current_cookie) + return bool(cookie_dict.get("SESSDATA") or cookie_dict.get("DedeUserID")) + async def login_by_qrcode(self): """login bilibili website and keep webdriver login state""" utils.logger.info("[BilibiliLogin.login_by_qrcode] Begin login bilibili by qrcode ...") + # 如果已经登录则直接跳过扫码流程 + if await self._has_valid_login_cookie(): + utils.logger.info("[BilibiliLogin.login_by_qrcode] 已检测到有效登录态,跳过扫码登录") + return + # click login button login_button_ele = self.context_page.locator( "xpath=//div[@class='right-entry__outside go-login-btn']//div" diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/client.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/client.py index 5d980ec..8fe9902 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/client.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/client.py @@ -95,15 +95,25 @@ class DouYinClient(AbstractApiClient): params["a_bogus"] = a_bogus async def request(self, method, url, **kwargs): - async with httpx.AsyncClient(proxy=self.proxy) as client: - response = await client.request(method, url, timeout=self.timeout, **kwargs) try: - if response.text == "" or response.text == "blocked": - utils.logger.error(f"request params incrr, response.text: {response.text}") - raise Exception("account blocked") - return response.json() + async with httpx.AsyncClient(proxy=self.proxy) as client: + response = await client.request(method, url, timeout=self.timeout, **kwargs) + try: + if response.text == "" or response.text == "blocked": + utils.logger.error(f"request params incrr, response.text: {response.text}") + raise Exception("account blocked") + return response.json() + except Exception as e: + raise DataFetchError(f"{e}, {response.text}") + except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.WriteTimeout) as e: + utils.logger.error(f"网络连接错误: {type(e).__name__}: {e}") + raise DataFetchError(f"网络连接失败: {type(e).__name__}: {e}") + except httpx.TimeoutException as e: + utils.logger.error(f"请求超时: {e}") + raise DataFetchError(f"请求超时: {e}") except Exception as e: - raise DataFetchError(f"{e}, {response.text}") + utils.logger.error(f"请求异常: {type(e).__name__}: {e}") + raise DataFetchError(f"请求失败: {type(e).__name__}: {e}") async def get(self, uri: str, params: Optional[Dict] = None, headers: Optional[Dict] = None): """ diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py index d2bb921..1a7db6d 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/douyin/core.py @@ -121,6 +121,8 @@ class DouYinCrawler(AbstractCrawler): utils.logger.info(f"[DouYinCrawler.search] Skip {page}") page += 1 continue + posts_res = None + retry_success = False try: utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page}") posts_res = await self.dy_client.search_info_by_keyword( @@ -129,11 +131,36 @@ class DouYinCrawler(AbstractCrawler): publish_time=PublishTimeType(config.PUBLISH_TIME_TYPE), search_id=dy_search_id, ) - if posts_res.get("data") is None or posts_res.get("data") == []: - utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`") + retry_success = True + except DataFetchError as e: + utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} failed: {e}") + # 如果是网络连接错误,等待后重试一次 + if "网络连接" in str(e) or "ConnectError" in str(e) or "超时" in str(e): + utils.logger.warning(f"[DouYinCrawler.search] 网络错误,等待3秒后重试...") + await asyncio.sleep(3) + try: + posts_res = await self.dy_client.search_info_by_keyword( + keyword=keyword, + offset=page * dy_limit_count - dy_limit_count, + publish_time=PublishTimeType(config.PUBLISH_TIME_TYPE), + search_id=dy_search_id, + ) + retry_success = True + except Exception as retry_e: + utils.logger.error(f"[DouYinCrawler.search] 重试失败: {retry_e}") + break + else: break - except DataFetchError: - utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} failed") + except Exception as e: + utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} unexpected error: {type(e).__name__}: {e}") + break + + # 如果请求失败(包括重试失败),跳过后续处理 + if not retry_success or posts_res is None: + break + + if posts_res.get("data") is None or posts_res.get("data") == []: + utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`") break page += 1 diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/client.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/client.py index 11401ed..1614d40 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/client.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/client.py @@ -45,13 +45,51 @@ class KuaiShouClient(AbstractApiClient): self.graphql = KuaiShouGraphQL() async def request(self, method, url, **kwargs) -> Any: - async with httpx.AsyncClient(proxy=self.proxy) as client: - response = await client.request(method, url, timeout=self.timeout, **kwargs) - data: Dict = response.json() - if data.get("errors"): - raise DataFetchError(data.get("errors", "unkonw error")) - else: - return data.get("data", {}) + """Make HTTP request with retry and proxy fallback.""" + max_retries = 3 + + # build proxy attempts: try proxy first (if set), then no-proxy + proxy_attempts: List[Optional[str]] = [] + if self.proxy: + proxy_attempts.append(self.proxy) + proxy_attempts.append(None) # always allow a direct attempt + + last_exc: Optional[Exception] = None + + for attempt in range(max_retries): + proxy_to_use = proxy_attempts[min(attempt, len(proxy_attempts) - 1)] + try: + async with httpx.AsyncClient(proxy=proxy_to_use) as client: + response = await client.request(method, url, timeout=self.timeout, **kwargs) + data: Dict = response.json() + if data.get("errors"): + raise DataFetchError(data.get("errors", "unkonw error")) + return data.get("data", {}) + except (httpx.ConnectError, httpx.ConnectTimeout, httpx.NetworkError) as e: + last_exc = e + utils.logger.warning( + f"[KuaiShouClient.request] Network error (attempt {attempt+1}/{max_retries}) " + f"proxy={proxy_to_use} url={url} err={e!r}" + ) + if attempt < max_retries - 1: + await asyncio.sleep(1) + continue + utils.logger.error( + f"[KuaiShouClient.request] Network failed after {max_retries} attempts " + f"proxy={proxy_to_use} url={url} err={e!r}" + ) + raise + except Exception as e: + # For other exceptions (like DataFetchError), don't retry + last_exc = e + utils.logger.error( + f"[KuaiShouClient.request] Request failed proxy={proxy_to_use} url={url} err={e!r}" + ) + raise + + # If somehow we exit the loop without returning, raise last exception + if last_exc: + raise last_exc async def get(self, uri: str, params=None) -> Dict: final_uri = uri diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py index 1f5669b..e4df586 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/core.py @@ -83,7 +83,26 @@ class KuaishouCrawler(AbstractCrawler): self.context_page = await self.browser_context.new_page() - await self.context_page.goto(f"{self.index_url}?isHome=1") + # 添加重试机制处理网络连接错误 + max_retries = 3 + retry_count = 0 + while retry_count < max_retries: + try: + await self.context_page.goto(f"{self.index_url}?isHome=1", timeout=30000) + break + except Exception as e: + retry_count += 1 + error_msg = str(e) + if "ERR_CONNECTION_RESET" in error_msg or "net::" in error_msg or "Connection" in error_msg: + if retry_count < max_retries: + utils.logger.warning(f"[KuaishouCrawler] 网络连接错误,第 {retry_count} 次重试: {e}") + await asyncio.sleep(2 * retry_count) # 递增等待时间 + else: + utils.logger.error(f"[KuaishouCrawler] 网络连接失败,已重试 {max_retries} 次: {e}") + raise + else: + # 非网络错误直接抛出 + raise # Create a client to interact with the kuaishou website. self.ks_client = await self.create_ks_client(httpx_proxy_format) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/login.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/login.py index 432cf96..1dafb86 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/login.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/kuaishou/login.py @@ -49,6 +49,21 @@ class KuaishouLogin(AbstractLogin): else: raise ValueError("[KuaishouLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...") + async def _quick_check_login_state(self) -> bool: + """ + Quick check if the current login status is successful without retry + Returns True if logged in, False otherwise + """ + try: + current_cookie = await self.browser_context.cookies() + _, cookie_dict = utils.convert_cookies(current_cookie) + kuaishou_pass_token = cookie_dict.get("passToken") + if kuaishou_pass_token: + return True + return False + except Exception: + return False + @retry(stop=stop_after_attempt(600), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) async def check_login_state(self) -> bool: """ @@ -67,11 +82,47 @@ class KuaishouLogin(AbstractLogin): """login kuaishou website and keep webdriver login state""" utils.logger.info("[KuaishouLogin.login_by_qrcode] Begin login kuaishou by qrcode ...") - # click login button + # Check if already logged in (quick check without retry) + is_logged_in = await self._quick_check_login_state() + if is_logged_in: + utils.logger.info("[KuaishouLogin.login_by_qrcode] Already logged in, skipping login button click ...") + return + + # Check if login button exists (if not, might already be logged in) login_button_ele = self.context_page.locator( "xpath=//p[text()='登录']" ) - await login_button_ele.click() + + try: + # Wait for the element to be visible with a shorter timeout + await login_button_ele.wait_for(state="visible", timeout=3000) + utils.logger.info("[KuaishouLogin.login_by_qrcode] Login button found, attempting to click ...") + + # Try normal click first + await login_button_ele.click(timeout=5000) + except Exception as e: + # If login button is not found, might already be logged in + if "timeout" in str(e).lower() or "waiting for" in str(e).lower(): + utils.logger.info("[KuaishouLogin.login_by_qrcode] Login button not found, checking if already logged in ...") + # Double check login state (quick check) + is_logged_in = await self._quick_check_login_state() + if is_logged_in: + utils.logger.info("[KuaishouLogin.login_by_qrcode] Already logged in, skipping login ...") + return + utils.logger.warning(f"[KuaishouLogin.login_by_qrcode] Login button not found and not logged in: {e}") + raise + else: + utils.logger.warning(f"[KuaishouLogin.login_by_qrcode] Normal click failed: {e}, trying force click...") + try: + # If normal click fails, try force click to bypass overlay + await login_button_ele.click(force=True, timeout=5000) + except Exception as e2: + utils.logger.warning(f"[KuaishouLogin.login_by_qrcode] Force click failed: {e2}, trying JavaScript click...") + # If force click also fails, use JavaScript to click directly + await login_button_ele.evaluate("element => element.click()") + + # Wait a moment for the login modal to appear + await asyncio.sleep(1) # find login qrcode qrcode_img_selector = "//div[@class='qrcode-img']//img" diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py index 005adae..f8469f7 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/tieba/client.py @@ -48,6 +48,8 @@ class BaiduTieBaClient(AbstractApiClient): self._page_extractor = TieBaExtractor() self.default_ip_proxy = default_ip_proxy self.playwright_page = playwright_page # Playwright页面对象 + self._last_captcha_check_time = 0 # 上次验证码检测时间 + self._captcha_verified_recently = False # 是否最近完成过验证码 def _sync_request(self, method, url, proxy=None, **kwargs): """ @@ -210,6 +212,287 @@ class BaiduTieBaClient(AbstractApiClient): self.headers["Cookie"] = cookie_str utils.logger.info("[BaiduTieBaClient.update_cookies] Cookie has been updated") + async def _wait_for_captcha_completion(self, max_wait_time: int = 300): + """ + 检测并等待百度验证码完成(在爬虫过程中使用) + 等待用户手动拖动验证码,验证成功后自动识别并继续 + + Args: + max_wait_time: 最大等待时间(秒),默认120秒 + """ + if not self.playwright_page: + return + + import time + + async def _detect_captcha() -> bool: + """更全面地检测验证码(包含文本、URL 及常见容器)""" + # DOM 选择器 + selector_hits = [ + '.tang-pass-slider', + '#captcha', + '.vcode-img', + '.pass-verify', + '.tang-pass-verify', + '.pass-verify-slider', + 'div[id*="captcha"]', + 'div[class*="verify"]', + 'div[class*="captcha"]', + 'text=安全验证', + 'text=请输入验证码', + 'text=拖动', + 'text=滑动', + ] + for selector in selector_hits: + try: + element = await self.playwright_page.query_selector(selector) + if element and await element.is_visible(): + return True + except Exception: + continue + + # URL 关键词 + url_lower = (self.playwright_page.url or "").lower() + if any(key in url_lower for key in ["verify", "captcha", "wappass"]): + return True + + # 页面文本关键词(截断以降低开销) + try: + page_text = (await self.playwright_page.content())[:4000] + if any( + kw in page_text + for kw in ["安全验证", "请输入验证码", "完成验证", "滑块", "拖动完成验证"] + ): + return True + except Exception: + pass + return False + + # 如果最近5秒内刚完成过验证码,跳过检测(避免重复检测) + if self._captcha_verified_recently: + time_since_last_check = time.time() - self._last_captcha_check_time + if time_since_last_check < 5: + utils.logger.debug( + f"[BaiduTieBaClient] 最近 {time_since_last_check:.1f} 秒内完成过验证码,跳过检测" + ) + return + else: + self._captcha_verified_recently = False + + # 基础选择器(用于后续反复检测) + captcha_selectors = [ + '.tang-pass-slider', + '#captcha', + '.vcode-img', + '.pass-verify', + '.tang-pass-verify', + '.pass-verify-slider', + 'div[id*="captcha"]', + 'div[class*="verify"]', + 'div[class*="captcha"]', + ] + success_selectors = [ + '.tang-pass-success', + '.pass-verify-success', + 'div[class*="success"]', + ] + + # 检测验证码是否存在 + captcha_found = await _detect_captcha() + if captcha_found: + utils.logger.warning("[BaiduTieBaClient] 🔐 检测到验证码,请手动拖动完成验证...") + if not captcha_found: + return + + # 记录当前URL,用于检测页面跳转 + initial_url = self.playwright_page.url + utils.logger.info(f"[BaiduTieBaClient] 当前页面URL: {initial_url}") + utils.logger.info(f"[BaiduTieBaClient] ⏳ 等待用户手动完成验证码(最多等待 {max_wait_time} 秒)...") + + start_time = time.time() + last_log_time = 0 + check_interval = 1 # 检查间隔改为1秒,更快响应 + + while True: + # 检查是否超时 + elapsed_time = time.time() - start_time + if elapsed_time >= max_wait_time: + utils.logger.warning( + f"[BaiduTieBaClient] ⏰ 等待验证码超时({max_wait_time}秒),跳过当前百度贴吧爬取任务" + ) + # 超时直接中断本次百度贴吧爬虫,交给上层捕获处理 + raise TimeoutError( + f"Baidu captcha wait timeout ({max_wait_time}s), skip tieba crawling" + ) + + try: + # 检测验证成功的标识 + verification_success = False + for selector in success_selectors: + try: + element = await self.playwright_page.query_selector(selector) + if element: + is_visible = await element.is_visible() + if is_visible: + verification_success = True + utils.logger.info(f"[BaiduTieBaClient] ✅ 检测到验证成功标识 (selector: {selector})") + break + except Exception: + continue + + # 检测验证码是否还存在 + captcha_still_exists = False + for selector in captcha_selectors: + try: + element = await self.playwright_page.query_selector(selector) + if element: + is_visible = await element.is_visible() + if is_visible: + captcha_still_exists = True + break + except Exception: + continue + + # 检测页面URL是否变化(验证成功后可能会跳转) + current_url = self.playwright_page.url + url_changed = current_url != initial_url + + # 判断验证是否成功 + # 成功条件:1. 验证码消失 2. 或者检测到成功标识 3. 或者URL变化(且不是验证码页面) + if verification_success or (not captcha_still_exists and url_changed): + # 验证码消失且URL变化,可能是验证成功后的跳转 + utils.logger.info("[BaiduTieBaClient] 🔍 验证码已消失,检测到页面变化,等待3秒确认验证完成...") + await asyncio.sleep(3) + + # 再次确认验证码是否真的消失了 + captcha_still_exists = False + for selector in captcha_selectors: + try: + element = await self.playwright_page.query_selector(selector) + if element: + is_visible = await element.is_visible() + if is_visible: + captcha_still_exists = True + break + except Exception: + continue + + if not captcha_still_exists: + # 确认验证成功 + final_url = self.playwright_page.url + utils.logger.info(f"[BaiduTieBaClient] ✅ 验证码验证成功!") + if url_changed: + utils.logger.info(f"[BaiduTieBaClient] 📍 页面已跳转: {initial_url} -> {final_url}") + else: + utils.logger.info(f"[BaiduTieBaClient] 📍 页面URL未变化,验证在当前页面完成") + + # 标记最近完成过验证码,避免立即再次检测 + self._captcha_verified_recently = True + import time + self._last_captcha_check_time = time.time() + + # 等待页面稳定,避免立即再次检测验证码 + await asyncio.sleep(3) + + # 验证成功后,再次检查是否又出现了验证码(防止跳转到新的验证码页面) + utils.logger.info("[BaiduTieBaClient] 🔍 验证成功后,检查是否又出现验证码...") + await asyncio.sleep(2) + + captcha_reappeared = False + for selector in captcha_selectors: + try: + element = await self.playwright_page.query_selector(selector) + if element: + is_visible = await element.is_visible() + if is_visible: + captcha_reappeared = True + utils.logger.warning(f"[BaiduTieBaClient] ⚠️ 验证成功后检测到新的验证码 (selector: {selector}),继续等待...") + break + except Exception: + continue + + if not captcha_reappeared: + utils.logger.info("[BaiduTieBaClient] ✅ 确认验证成功,未出现新的验证码,继续执行...") + break + else: + # 如果又出现了验证码,重置状态继续等待 + utils.logger.warning("[BaiduTieBaClient] ⚠️ 检测到新的验证码,重置等待状态...") + initial_url = self.playwright_page.url + start_time = time.time() + continue + else: + # 验证码又出现了,可能验证失败或页面刷新 + utils.logger.warning("[BaiduTieBaClient] ⚠️ 验证码重新出现,可能验证失败,继续等待...") + elif not captcha_still_exists and not url_changed: + # 验证码消失但URL未变化,可能是验证成功但未跳转 + utils.logger.info("[BaiduTieBaClient] 🔍 验证码已消失,等待3秒确认验证完成...") + await asyncio.sleep(3) + + # 再次确认 + captcha_still_exists = False + for selector in captcha_selectors: + try: + element = await self.playwright_page.query_selector(selector) + if element: + is_visible = await element.is_visible() + if is_visible: + captcha_still_exists = True + break + except Exception: + continue + + if not captcha_still_exists: + utils.logger.info("[BaiduTieBaClient] ✅ 验证码验证成功!") + + # 标记最近完成过验证码 + self._captcha_verified_recently = True + import time + self._last_captcha_check_time = time.time() + + # 等待页面稳定 + await asyncio.sleep(3) + + # 验证成功后,再次检查是否又出现了验证码 + utils.logger.info("[BaiduTieBaClient] 🔍 验证成功后,检查是否又出现验证码...") + await asyncio.sleep(2) + + captcha_reappeared = False + for selector in captcha_selectors: + try: + element = await self.playwright_page.query_selector(selector) + if element: + is_visible = await element.is_visible() + if is_visible: + captcha_reappeared = True + utils.logger.warning(f"[BaiduTieBaClient] ⚠️ 验证成功后检测到新的验证码 (selector: {selector}),继续等待...") + break + except Exception: + continue + + if not captcha_reappeared: + utils.logger.info("[BaiduTieBaClient] ✅ 确认验证成功,未出现新的验证码,继续执行...") + break + else: + # 如果又出现了验证码,重置状态继续等待 + utils.logger.warning("[BaiduTieBaClient] ⚠️ 检测到新的验证码,重置等待状态...") + initial_url = self.playwright_page.url + start_time = time.time() + continue + + except Exception as e: + # 如果检测过程中出现异常,继续等待 + utils.logger.debug(f"[BaiduTieBaClient] 验证码检测异常: {e}") + + # 等待一段时间后再次检查 + await asyncio.sleep(check_interval) + + # 每10秒输出一次提示 + current_time = int(elapsed_time) + if current_time != last_log_time and current_time % 10 == 0 and current_time > 0: + remaining_time = max_wait_time - current_time + utils.logger.info(f"[BaiduTieBaClient] ⏳ 仍在等待验证码完成...(剩余 {remaining_time} 秒)") + last_log_time = current_time + async def get_notes_by_keyword( self, keyword: str, @@ -253,6 +536,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问搜索页面 await self.playwright_page.goto(full_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) @@ -290,6 +576,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问帖子详情页面 await self.playwright_page.goto(note_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) @@ -340,6 +629,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问评论页面 await self.playwright_page.goto(comment_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) @@ -448,6 +740,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问子评论页面 await self.playwright_page.goto(sub_comment_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) @@ -527,6 +822,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问贴吧页面 await self.playwright_page.goto(tieba_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) @@ -562,6 +860,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问创作者主页 await self.playwright_page.goto(creator_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) @@ -597,6 +898,9 @@ class BaiduTieBaClient(AbstractApiClient): # 使用Playwright访问创作者帖子列表页面 await self.playwright_page.goto(creator_url, wait_until="domcontentloaded") + # 检测并等待验证码完成 + await self._wait_for_captcha_completion() + # 等待页面加载,使用配置文件中的延时设置 await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/client.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/client.py index 08c82da..7e7f67e 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/client.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/media_platform/weibo/client.py @@ -58,11 +58,47 @@ class WeiboClient: if enable_return_response: return response - data: Dict = response.json() + # 检查响应状态码 + if response.status_code != 200: + error_msg = f"HTTP {response.status_code}: {response.text[:200]}" + utils.logger.error(f"[WeiboClient.request] request {method}:{url} failed with status {response.status_code}") + raise DataFetchError(error_msg) + + # 检查响应内容类型 + content_type = response.headers.get("content-type", "").lower() + if "application/json" not in content_type and "text/json" not in content_type: + # 可能是HTML响应(如登录页面) + response_text = response.text[:500] + utils.logger.warning(f"[WeiboClient.request] Unexpected content type: {content_type}, response preview: {response_text}") + # 如果看起来像是HTML,可能是需要登录 + if " None: """Search for notes and retrieve their comment information.""" - utils.logger.info("[ZhihuCrawler.search] Begin search zhihu keywords") + utils.logger.info("[ZhihuCrawler.search] ========== 开始搜索知乎关键词 ==========") zhihu_limit_count = 20 # zhihu limit page fixed value if config.CRAWLER_MAX_NOTES_COUNT < zhihu_limit_count: config.CRAWLER_MAX_NOTES_COUNT = zhihu_limit_count @@ -145,7 +192,19 @@ class ZhihuCrawler(AbstractCrawler): total_failed_contents = 0 total_saved_comments = 0 - for keyword in config.KEYWORDS.split(","): + # 安全地处理关键词列表 + if not config.KEYWORDS or not config.KEYWORDS.strip(): + utils.logger.error("[ZhihuCrawler.search] 关键词配置为空,无法执行搜索任务") + return + + keywords_list = [k.strip() for k in config.KEYWORDS.split(",") if k.strip()] + if not keywords_list: + utils.logger.error("[ZhihuCrawler.search] 关键词列表为空,无法执行搜索任务") + return + + utils.logger.info(f"[ZhihuCrawler.search] 关键词列表: {keywords_list}, 共 {len(keywords_list)} 个关键词") + + for keyword in keywords_list: source_keyword_var.set(keyword) utils.logger.info( f"[ZhihuCrawler.search] Current search keyword: {keyword}" @@ -420,6 +479,18 @@ class ZhihuCrawler(AbstractCrawler): cookie_str, cookie_dict = utils.convert_cookies( await self.browser_context.cookies() ) + + # 获取用户配置的关键词用于 referer,如果没有则使用默认值 + referer_keyword = "test" + if config.KEYWORDS and config.KEYWORDS.strip(): + keywords_list = [k.strip() for k in config.KEYWORDS.split(",") if k.strip()] + if keywords_list: + referer_keyword = keywords_list[0] + + from urllib.parse import quote + encoded_referer_keyword = quote(referer_keyword) + referer_url = f"https://www.zhihu.com/search?q={encoded_referer_keyword}&time_interval=a_year&type=content" + zhihu_client_obj = ZhiHuClient( proxy=httpx_proxy, headers={ @@ -427,7 +498,7 @@ class ZhihuCrawler(AbstractCrawler): "accept-language": "zh-CN,zh;q=0.9", "cookie": cookie_str, "priority": "u=1, i", - "referer": "https://www.zhihu.com/search?q=python&time_interval=a_year&type=content", + "referer": referer_url, "user-agent": self.user_agent, "x-api-version": "3.0.91", "x-app-za": "OS=Web", diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/bilibili/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/bilibili/_store_impl.py index 95f0ca8..91de616 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/bilibili/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/bilibili/_store_impl.py @@ -119,6 +119,32 @@ class BiliDbStoreImplement(AbstractStore): content_item: content item dict """ video_id = content_item.get("video_id") + if not video_id: + return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 + try: + import sys + from pathlib import Path + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + title = content_item.get("title", "") + desc = content_item.get("desc", "") + content_text = title + " " + desc + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(content_text, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[BilibiliDbStoreImplement.store_content] ❌ Filtered video {video_id} - content does not match any keyword") + return + except Exception as e: + utils.logger.debug(f"[BilibiliDbStoreImplement.store_content] Failed to load keyword config: {e}") + # 确保 video_id 为整数类型,匹配数据库 BigInteger 字段 if video_id is not None: video_id = int(video_id) if not isinstance(video_id, int) else video_id diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/douyin/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/douyin/_store_impl.py index 85312fe..2ec213a 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/douyin/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/douyin/_store_impl.py @@ -88,6 +88,30 @@ class DouyinDbStoreImplement(AbstractStore): content_item: content item dict """ aweme_id = content_item.get("aweme_id") + if not aweme_id: + return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 + try: + import sys + from pathlib import Path + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + desc = content_item.get("desc", "") + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(desc, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[DouyinDbStoreImplement.store_content] ❌ Filtered aweme {aweme_id} - content does not match any keyword") + return + except Exception as e: + utils.logger.debug(f"[DouyinDbStoreImplement.store_content] Failed to load keyword config: {e}") + async with get_session() as session: result = await session.execute(select(DouyinAweme).where(DouyinAweme.aweme_id == aweme_id)) aweme_detail = result.scalar_one_or_none() diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/kuaishou/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/kuaishou/_store_impl.py index 4292cb0..fde0284 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/kuaishou/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/kuaishou/_store_impl.py @@ -89,6 +89,30 @@ class KuaishouDbStoreImplement(AbstractStore): content_item: content item dict """ video_id = content_item.get("video_id") + if not video_id: + return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 + try: + import sys + from pathlib import Path + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + caption = content_item.get("caption", "") + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(caption, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[KuaishouDbStoreImplement.store_content] ❌ Filtered video {video_id} - content does not match any keyword") + return + except Exception as e: + utils.logger.debug(f"[KuaishouDbStoreImplement.store_content] Failed to load keyword config: {e}") + async with get_session() as session: result = await session.execute(select(KuaishouVideo).where(KuaishouVideo.video_id == video_id)) video_detail = result.scalar_one_or_none() diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/tieba/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/tieba/_store_impl.py index 723d1f4..b7f8045 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/tieba/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/tieba/_store_impl.py @@ -95,6 +95,32 @@ class TieBaDbStoreImplement(AbstractStore): content_item: content item dict """ note_id = content_item.get("note_id") + if not note_id: + return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 + try: + import sys + from pathlib import Path + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + title = content_item.get("title", "") + text = content_item.get("text", "") + content_text = title + " " + text + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(content_text, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[TiebaDbStoreImplement.store_content] ❌ Filtered note {note_id} - content does not match any keyword") + return + except Exception as e: + utils.logger.debug(f"[TiebaDbStoreImplement.store_content] Failed to load keyword config: {e}") + async with get_session() as session: stmt = select(TiebaNote).where(TiebaNote.note_id == note_id) res = await session.execute(stmt) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/__init__.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/__init__.py index 8f6286e..ee42689 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/__init__.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/__init__.py @@ -93,7 +93,12 @@ async def update_weibo_note(note_item: Dict): "source_keyword": source_keyword_var.get(), } utils.logger.info(f"[store.weibo.update_weibo_note] weibo note id:{note_id}, title:{save_content_item.get('content')[:24]} ...") - await WeibostoreFactory.create_store().store_content(content_item=save_content_item) + try: + await WeibostoreFactory.create_store().store_content(content_item=save_content_item) + utils.logger.debug(f"[store.weibo.update_weibo_note] Successfully saved note {note_id}") + except Exception as e: + utils.logger.error(f"[store.weibo.update_weibo_note] Failed to save note {note_id}: {e}", exc_info=True) + raise async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]): @@ -148,7 +153,12 @@ async def update_weibo_note_comment(note_id: str, comment_item: Dict): "avatar": user_info.get("profile_image_url", ""), } utils.logger.info(f"[store.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {save_comment_item.get('content', '')[:24]} ...") - await WeibostoreFactory.create_store().store_comment(comment_item=save_comment_item) + try: + await WeibostoreFactory.create_store().store_comment(comment_item=save_comment_item) + utils.logger.debug(f"[store.weibo.update_weibo_note_comment] Successfully saved comment {comment_id}") + except Exception as e: + utils.logger.error(f"[store.weibo.update_weibo_note_comment] Failed to save comment {comment_id}: {e}", exc_info=True) + raise async def update_weibo_note_image(picid: str, pic_content, extension_file_name): diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/_store_impl.py index f7503db..6ca831a 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/weibo/_store_impl.py @@ -21,7 +21,7 @@ import pathlib from typing import Dict import aiofiles -from sqlalchemy import select +from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession import config @@ -29,7 +29,7 @@ from base.base_crawler import AbstractStore from database.models import WeiboCreator, WeiboNote, WeiboNoteComment from tools import utils, words from tools.async_file_writer import AsyncFileWriter -from database.db_session import get_session +from database.db_session import get_session, get_async_engine from var import crawler_type_var @@ -88,6 +88,33 @@ class WeiboCsvStoreImplement(AbstractStore): class WeiboDbStoreImplement(AbstractStore): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + async def _check_connection(self): + """检查数据库连接是否正常(使用类变量缓存检查结果)""" + # 使用类变量缓存检查结果,避免重复检查 + if not hasattr(WeiboDbStoreImplement, '_global_connection_checked'): + WeiboDbStoreImplement._global_connection_checked = False + + if WeiboDbStoreImplement._global_connection_checked: + return True + + try: + engine = get_async_engine(config.SAVE_DATA_OPTION) + if engine is None: + utils.logger.error(f"[WeiboDbStoreImplement._check_connection] Engine is None for SAVE_DATA_OPTION={config.SAVE_DATA_OPTION}") + return False + + async with engine.connect() as conn: + await conn.execute(text("SELECT 1")) + WeiboDbStoreImplement._global_connection_checked = True + utils.logger.info(f"[WeiboDbStoreImplement._check_connection] Database connection verified") + return True + except Exception as e: + utils.logger.error(f"[WeiboDbStoreImplement._check_connection] Database connection failed: {e}", exc_info=True) + return False async def store_content(self, content_item: Dict): """ @@ -99,21 +126,62 @@ class WeiboDbStoreImplement(AbstractStore): """ note_id = content_item.get("note_id") - async with get_session() as session: - stmt = select(WeiboNote).where(WeiboNote.note_id == note_id) - res = await session.execute(stmt) - db_note = res.scalar_one_or_none() - if db_note: - db_note.last_modify_ts = utils.get_current_timestamp() - for key, value in content_item.items(): - if hasattr(db_note, key): - setattr(db_note, key, value) - else: - content_item["add_ts"] = utils.get_current_timestamp() - content_item["last_modify_ts"] = utils.get_current_timestamp() - db_note = WeiboNote(**content_item) - session.add(db_note) - await session.commit() + if not note_id: + utils.logger.error(f"[WeiboDbStoreImplement.store_content] note_id is missing in content_item: {content_item}") + return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 + try: + import sys + from pathlib import Path + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + content_text = content_item.get("content", "") + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(content_text, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[WeiboDbStoreImplement.store_content] ❌ Filtered note {note_id} - content does not match any keyword") + return + except Exception as e: + utils.logger.debug(f"[WeiboDbStoreImplement.store_content] Failed to load keyword config: {e}") + + # 检查数据库连接 + if not await self._check_connection(): + utils.logger.error(f"[WeiboDbStoreImplement.store_content] Database connection check failed, skipping save for note {note_id}") + return + + try: + async with get_session() as session: + if session is None: + utils.logger.error(f"[WeiboDbStoreImplement.store_content] Database session is None, check SAVE_DATA_OPTION config") + return + + stmt = select(WeiboNote).where(WeiboNote.note_id == note_id) + res = await session.execute(stmt) + db_note = res.scalar_one_or_none() + if db_note: + utils.logger.debug(f"[WeiboDbStoreImplement.store_content] Updating existing note {note_id}") + db_note.last_modify_ts = utils.get_current_timestamp() + for key, value in content_item.items(): + if hasattr(db_note, key): + setattr(db_note, key, value) + else: + utils.logger.debug(f"[WeiboDbStoreImplement.store_content] Creating new note {note_id}") + content_item["add_ts"] = utils.get_current_timestamp() + content_item["last_modify_ts"] = utils.get_current_timestamp() + db_note = WeiboNote(**content_item) + session.add(db_note) + await session.commit() + utils.logger.debug(f"[WeiboDbStoreImplement.store_content] Successfully committed note {note_id} to database") + except Exception as e: + utils.logger.error(f"[WeiboDbStoreImplement.store_content] Database error saving note {note_id}: {e}", exc_info=True) + raise async def store_comment(self, comment_item: Dict): """ @@ -125,21 +193,36 @@ class WeiboDbStoreImplement(AbstractStore): """ comment_id = comment_item.get("comment_id") - async with get_session() as session: - stmt = select(WeiboNoteComment).where(WeiboNoteComment.comment_id == comment_id) - res = await session.execute(stmt) - db_comment = res.scalar_one_or_none() - if db_comment: - db_comment.last_modify_ts = utils.get_current_timestamp() - for key, value in comment_item.items(): - if hasattr(db_comment, key): - setattr(db_comment, key, value) - else: - comment_item["add_ts"] = utils.get_current_timestamp() - comment_item["last_modify_ts"] = utils.get_current_timestamp() - db_comment = WeiboNoteComment(**comment_item) - session.add(db_comment) - await session.commit() + if not comment_id: + utils.logger.error(f"[WeiboDbStoreImplement.store_comment] comment_id is missing in comment_item: {comment_item}") + return + + try: + async with get_session() as session: + if session is None: + utils.logger.error(f"[WeiboDbStoreImplement.store_comment] Database session is None, check SAVE_DATA_OPTION config") + return + + stmt = select(WeiboNoteComment).where(WeiboNoteComment.comment_id == comment_id) + res = await session.execute(stmt) + db_comment = res.scalar_one_or_none() + if db_comment: + utils.logger.debug(f"[WeiboDbStoreImplement.store_comment] Updating existing comment {comment_id}") + db_comment.last_modify_ts = utils.get_current_timestamp() + for key, value in comment_item.items(): + if hasattr(db_comment, key): + setattr(db_comment, key, value) + else: + utils.logger.debug(f"[WeiboDbStoreImplement.store_comment] Creating new comment {comment_id}") + comment_item["add_ts"] = utils.get_current_timestamp() + comment_item["last_modify_ts"] = utils.get_current_timestamp() + db_comment = WeiboNoteComment(**comment_item) + session.add(db_comment) + await session.commit() + utils.logger.debug(f"[WeiboDbStoreImplement.store_comment] Successfully committed comment {comment_id} to database") + except Exception as e: + utils.logger.error(f"[WeiboDbStoreImplement.store_comment] Database error saving comment {comment_id}: {e}", exc_info=True) + raise async def store_creator(self, creator: Dict): """ diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/xhs/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/xhs/_store_impl.py index 1bffddd..394817a 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/xhs/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/xhs/_store_impl.py @@ -89,6 +89,34 @@ class XhsDbStoreImplement(AbstractStore): note_id = content_item.get("note_id") if not note_id: return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 + try: + import sys + from pathlib import Path + # 添加项目根目录到路径,以便导入 MindSpider 的 config + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + title = content_item.get("title", "") + desc = content_item.get("desc", "") + content_text = title + " " + desc + + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + # 如果配置了关键词,进行匹配检查 + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(content_text, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[XhsDbStoreImplement.store_content] ❌ Filtered note {note_id} - content does not match any keyword") + return + except Exception as e: + # 如果配置读取失败,记录警告但不阻止保存(向后兼容) + utils.logger.debug(f"[XhsDbStoreImplement.store_content] Failed to load keyword config: {e}") + async with get_session() as session: if await self.content_is_exist(session, note_id): await self.update_content(session, content_item) diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py index 45130f3..02d6dba 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/__init__.py @@ -85,8 +85,21 @@ async def batch_update_zhihu_note_comments(comments: List[ZhihuComment]): if not comments: return + success_count = 0 + error_count = 0 for comment_item in comments: - await update_zhihu_content_comment(comment_item) + try: + await update_zhihu_content_comment(comment_item) + success_count += 1 + except Exception as e: + error_count += 1 + comment_id = getattr(comment_item, 'comment_id', 'unknown') + utils.logger.error(f"[store.zhihu.batch_update_zhihu_note_comments] 保存评论失败 (comment_id={comment_id}): {e}", exc_info=True) + + if error_count > 0: + utils.logger.warning(f"[store.zhihu.batch_update_zhihu_note_comments] 批量保存完成: 成功 {success_count} 条, 失败 {error_count} 条") + else: + utils.logger.info(f"[store.zhihu.batch_update_zhihu_note_comments] 批量保存完成: 成功 {success_count} 条") async def update_zhihu_content_comment(comment_item: ZhihuComment): @@ -98,10 +111,17 @@ async def update_zhihu_content_comment(comment_item: ZhihuComment): Returns: """ - local_db_item = comment_item.model_dump() - local_db_item.update({"last_modify_ts": utils.get_current_timestamp()}) - utils.logger.info(f"[store.zhihu.update_zhihu_note_comment] zhihu content comment:{local_db_item}") - await ZhihuStoreFactory.create_store().store_comment(local_db_item) + try: + local_db_item = comment_item.model_dump() + local_db_item.update({"last_modify_ts": utils.get_current_timestamp()}) + # 使用更安全的日志记录方式,避免编码问题导致日志输出异常 + comment_id = local_db_item.get('comment_id', 'unknown') + utils.logger.debug(f"[store.zhihu.update_zhihu_note_comment] 准备保存评论: comment_id={comment_id}") + await ZhihuStoreFactory.create_store().store_comment(local_db_item) + except Exception as e: + comment_id = getattr(comment_item, 'comment_id', 'unknown') + utils.logger.error(f"[store.zhihu.update_zhihu_note_comment] 保存评论异常 (comment_id={comment_id}): {e}", exc_info=True) + raise async def save_creator(creator: ZhihuCreator): diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py index d659504..e4b3ac0 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/store/zhihu/_store_impl.py @@ -94,23 +94,71 @@ class ZhihuDbStoreImplement(AbstractStore): content_item: content item dict """ content_id = content_item.get("content_id") + if not content_id: + return + + # 关键词过滤:仅在落库时进行,仅对主贴/视频过滤,不过滤评论 + # 支持精确匹配和模糊匹配两种模式 try: + import sys + from pathlib import Path + project_root = Path(__file__).resolve().parents[4] + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from config import settings + + title = content_item.get("title", "") + content = content_item.get("content", "") + content_text = title + " " + content + strict_keywords = getattr(settings, 'STRICT_KEYWORDS', None) + fuzzy_keywords = getattr(settings, 'FUZZY_KEYWORDS', None) + + if strict_keywords or fuzzy_keywords: + if not utils.check_keyword_match_with_modes(content_text, strict_keywords, fuzzy_keywords): + utils.logger.warning(f"[ZhihuDbStoreImplement.store_content] ❌ Filtered content {content_id} - content does not match any keyword") + return + except Exception as e: + utils.logger.debug(f"[ZhihuDbStoreImplement.store_content] Failed to load keyword config: {e}") + + try: + # 确保所有字符串值都是正确的UTF-8编码 + cleaned_item = {} + for key, value in content_item.items(): + if isinstance(value, bytes): + # 如果是bytes类型,尝试解码为UTF-8 + try: + value = value.decode('utf-8') + except UnicodeDecodeError: + # 如果UTF-8解码失败,尝试其他编码 + try: + value = value.decode('gbk', errors='replace') + except: + value = value.decode('utf-8', errors='replace') + elif isinstance(value, str): + # 确保字符串是有效的UTF-8 + try: + value.encode('utf-8') + except UnicodeEncodeError: + # 如果编码失败,尝试修复 + value = value.encode('utf-8', errors='replace').decode('utf-8', errors='replace') + cleaned_item[key] = value + async with get_session() as session: stmt = select(ZhihuContent).where(ZhihuContent.content_id == content_id) result = await session.execute(stmt) existing_content = result.scalars().first() if existing_content: - for key, value in content_item.items(): + for key, value in cleaned_item.items(): setattr(existing_content, key, value) utils.logger.debug(f"[ZhihuDbStore] 更新内容: {content_id}") else: - new_content = ZhihuContent(**content_item) + new_content = ZhihuContent(**cleaned_item) session.add(new_content) utils.logger.debug(f"[ZhihuDbStore] 新增内容: {content_id}") await session.commit() utils.logger.info(f"[ZhihuDbStore] 成功保存内容到数据库: {content_id}") except Exception as e: - utils.logger.error(f"[ZhihuDbStore] 保存内容失败 (content_id={content_id}): {e}") + utils.logger.error(f"[ZhihuDbStore] 保存内容失败 (content_id={content_id}): {e}", exc_info=True) raise async def store_comment(self, comment_item: Dict): @@ -121,22 +169,44 @@ class ZhihuDbStoreImplement(AbstractStore): """ comment_id = comment_item.get("comment_id") try: + # 确保所有字符串值都是正确的UTF-8编码 + cleaned_item = {} + for key, value in comment_item.items(): + if isinstance(value, bytes): + # 如果是bytes类型,尝试解码为UTF-8 + try: + value = value.decode('utf-8') + except UnicodeDecodeError: + # 如果UTF-8解码失败,尝试其他编码 + try: + value = value.decode('gbk', errors='replace') + except: + value = value.decode('utf-8', errors='replace') + elif isinstance(value, str): + # 确保字符串是有效的UTF-8 + try: + value.encode('utf-8') + except UnicodeEncodeError: + # 如果编码失败,尝试修复 + value = value.encode('utf-8', errors='replace').decode('utf-8', errors='replace') + cleaned_item[key] = value + async with get_session() as session: stmt = select(ZhihuComment).where(ZhihuComment.comment_id == comment_id) result = await session.execute(stmt) existing_comment = result.scalars().first() if existing_comment: - for key, value in comment_item.items(): + for key, value in cleaned_item.items(): setattr(existing_comment, key, value) utils.logger.debug(f"[ZhihuDbStore] 更新评论: {comment_id}") else: - new_comment = ZhihuComment(**comment_item) + new_comment = ZhihuComment(**cleaned_item) session.add(new_comment) utils.logger.debug(f"[ZhihuDbStore] 新增评论: {comment_id}") await session.commit() utils.logger.info(f"[ZhihuDbStore] 成功保存评论到数据库: {comment_id}") except Exception as e: - utils.logger.error(f"[ZhihuDbStore] 保存评论失败 (comment_id={comment_id}): {e}") + utils.logger.error(f"[ZhihuDbStore] 保存评论失败 (comment_id={comment_id}): {e}", exc_info=True) raise async def store_creator(self, creator: Dict): diff --git a/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/utils.py b/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/utils.py index 20c72c8..a61bfbb 100644 --- a/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/utils.py +++ b/MindSpider/DeepSentimentCrawling/MediaCrawler/tools/utils.py @@ -11,6 +11,11 @@ import argparse import logging +import os +import re +import sys +from logging.handlers import RotatingFileHandler +from pathlib import Path from .crawler_util import * from .slider_util import * @@ -19,17 +24,80 @@ from .time_util import * def init_loging_config(): level = logging.INFO - logging.basicConfig( - level=level, - format="%(asctime)s %(name)s %(levelname)s (%(filename)s:%(lineno)d) - %(message)s", - datefmt='%Y-%m-%d %H:%M:%S' - ) + + # 日志格式 + log_format = "%(asctime)s %(name)s %(levelname)s (%(filename)s:%(lineno)d) - %(message)s" + date_format = '%Y-%m-%d %H:%M:%S' + + # 创建日志目录(项目根目录的 logs 文件夹) + # 从当前文件位置向上查找,直到找到包含 logs 目录的项目根目录 + current_file = Path(__file__).resolve() + project_root = None + + # 方法1: 向上查找直到找到 logs 目录 + for parent in current_file.parents: + logs_dir = parent / "logs" + if logs_dir.exists() or parent.name == "BettaFish-1.2.0": + project_root = parent + break + + # 方法2: 如果没找到,使用当前工作目录 + if project_root is None: + project_root = Path.cwd() + # 如果当前在 MediaCrawler 目录,向上查找 + if project_root.name == "MediaCrawler": + project_root = project_root.parent.parent + + log_dir = project_root / "logs" + log_dir.mkdir(exist_ok=True) + + # 日志文件路径 + log_file = log_dir / "mediacrawler.log" + + # 配置根日志记录器 + root_logger = logging.getLogger() + root_logger.setLevel(level) + + # 清除已有的处理器,避免重复 + root_logger.handlers.clear() + + # 控制台处理器 - 明确使用 sys.stdout 确保输出到控制台 + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(level) + console_formatter = logging.Formatter(log_format, datefmt=date_format) + console_handler.setFormatter(console_formatter) + root_logger.addHandler(console_handler) + + # 确保输出立即刷新 + sys.stdout.flush() + sys.stderr.flush() + + # 文件处理器(带轮转,最大10MB,保留5个备份) + try: + file_handler = RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5, + encoding='utf-8' + ) + file_handler.setLevel(level) + file_formatter = logging.Formatter(log_format, datefmt=date_format) + file_handler.setFormatter(file_formatter) + root_logger.addHandler(file_handler) + except Exception as e: + # 如果文件日志初始化失败,至少保证控制台日志可用 + print(f"警告: 无法初始化文件日志: {e}") + + # 创建 MediaCrawler 专用日志记录器 _logger = logging.getLogger("MediaCrawler") _logger.setLevel(level) - + # 关闭 httpx 的 INFO 日志 logging.getLogger("httpx").setLevel(logging.WARNING) - + + # 输出日志文件位置 + _logger.info(f"日志文件: {log_file}") + return _logger @@ -44,3 +112,101 @@ def str2bool(v): return False else: raise argparse.ArgumentTypeError('Boolean value expected.') + + +def check_keyword_match_strict(content: str, keyword: str) -> bool: + """ + 严格关键词匹配:检查内容是否包含关键词(严格模式) + + Args: + content: 要检查的内容文本 + keyword: 关键词(可以是单个关键词,也可以是逗号分隔的多个关键词) + + Returns: + bool: 如果内容包含任意一个关键词返回True,否则返回False + """ + if not content or not keyword: + return False + + # 清理HTML标签 + clean_content = re.sub(r"<.*?>", "", content) + # 转换为小写进行匹配 + clean_content_lower = clean_content.lower() + + # 支持多个关键词(逗号分隔),只要匹配任意一个即可 + keywords = [k.strip().lower() for k in keyword.split(",") if k.strip()] + + # 检查内容是否包含任意一个关键词 + for kw in keywords: + if kw in clean_content_lower: + return True + + return False + + +def check_keyword_match_fuzzy(content: str, keyword: str) -> bool: + """ + 模糊关键词匹配:检查内容是否包含关键词(模糊模式,支持部分匹配) + + Args: + content: 要检查的内容文本 + keyword: 关键词(可以是单个关键词,也可以是逗号分隔的多个关键词) + + Returns: + bool: 如果内容包含任意一个关键词(或关键词的部分)返回True,否则返回False + """ + if not content or not keyword: + return False + + # 清理HTML标签 + clean_content = re.sub(r"<.*?>", "", content) + # 转换为小写进行匹配 + clean_content_lower = clean_content.lower() + + # 支持多个关键词(逗号分隔),只要匹配任意一个即可 + keywords = [k.strip().lower() for k in keyword.split(",") if k.strip()] + + # 检查内容是否包含任意一个关键词(或关键词的部分) + for kw in keywords: + # 精确匹配 + if kw in clean_content_lower: + return True + # 模糊匹配:如果关键词长度>=3,检查是否包含关键词的主要部分 + if len(kw) >= 3: + # 去除空格后匹配 + kw_no_space = kw.replace(" ", "") + content_no_space = clean_content_lower.replace(" ", "") + if kw_no_space in content_no_space: + return True + # 检查关键词的前半部分(至少2个字符) + if len(kw) >= 4: + half_kw = kw[:len(kw)//2] + if half_kw in clean_content_lower: + return True + + return False + + +def check_keyword_match_with_modes(content: str, strict_keywords: str = None, fuzzy_keywords: str = None) -> bool: + """ + 使用精确和模糊两种模式检查关键词匹配 + + Args: + content: 要检查的内容文本 + strict_keywords: 精确匹配关键词(逗号分隔) + fuzzy_keywords: 模糊匹配关键词(逗号分隔) + + Returns: + bool: 如果内容匹配任意一个关键词(精确或模糊)返回True,否则返回False + """ + # 先检查精确匹配关键词 + if strict_keywords: + if check_keyword_match_strict(content, strict_keywords): + return True + + # 再检查模糊匹配关键词 + if fuzzy_keywords: + if check_keyword_match_fuzzy(content, fuzzy_keywords): + return True + + return False diff --git a/MindSpider/DeepSentimentCrawling/keyword_manager.py b/MindSpider/DeepSentimentCrawling/keyword_manager.py index c87b479..9ef859a 100644 --- a/MindSpider/DeepSentimentCrawling/keyword_manager.py +++ b/MindSpider/DeepSentimentCrawling/keyword_manager.py @@ -251,7 +251,8 @@ class KeywordManager: def _get_default_keywords(self) -> List[str]: """获取默认关键词列表""" return [ - "F6智慧门店","南京爱福路汽车科技有限公司","汽车后市场","汽修店","新康众" + "F6智慧门店","F6智数","中国汽车后市场白皮书","南京爱福路汽车科技有限公司","汽车后市场","汽车修理厂", + "新康众","天猫养车","汽后","汽修厂","爱福路","康众", ] def get_all_keywords_for_platforms(self, platforms: List[str], target_date: date = None, diff --git a/MindSpider/DeepSentimentCrawling/platform_crawler.py b/MindSpider/DeepSentimentCrawling/platform_crawler.py index b7c17dd..a5cef42 100644 --- a/MindSpider/DeepSentimentCrawling/platform_crawler.py +++ b/MindSpider/DeepSentimentCrawling/platform_crawler.py @@ -252,14 +252,63 @@ postgresql_db_config = {{ logger.info(f"执行命令: {' '.join(cmd)}") # 切换到MediaCrawler目录并执行,捕获输出 - result = subprocess.run( + # 使用utf-8编码,errors='surrogateescape'可以更好地处理编码问题 + # 设置环境变量确保子进程使用UTF-8编码 + env = os.environ.copy() + env['PYTHONIOENCODING'] = 'utf-8' + env['PYTHONUTF8'] = '1' + + # 使用 Popen 实时输出日志,而不是等到结束才显示 + import subprocess as sp + process = sp.Popen( cmd, cwd=self.mediacrawler_path, - timeout=3600, # 60分钟超时 - capture_output=True, + stdout=sp.PIPE, + stderr=sp.STDOUT, # 将stderr合并到stdout text=True, encoding='utf-8', - errors='replace' + errors='surrogateescape', + env=env, + bufsize=1, # 行缓冲 + universal_newlines=True + ) + + # 实时读取并输出日志 + output_lines = [] + error_lines = [] + try: + for line in process.stdout: + line = line.rstrip() + if line: + output_lines.append(line) + # 实时输出到控制台 + print(f"[{platform}] {line}", flush=True) + logger.info(f"[{platform}] {line}") + + # 等待进程完成 + return_code = process.wait(timeout=3600) + except sp.TimeoutExpired: + process.kill() + process.wait() + return_code = -1 + logger.error(f"[{platform}] 爬取超时") + except Exception as e: + process.kill() + process.wait() + return_code = -1 + logger.error(f"[{platform}] 执行异常: {e}", exc_info=True) + + # 创建类似 subprocess.run 的 result 对象 + class Result: + def __init__(self, returncode, stdout, stderr): + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + result = Result( + returncode=return_code, + stdout='\n'.join(output_lines), + stderr='\n'.join(error_lines) ) end_time = datetime.now() @@ -269,6 +318,19 @@ postgresql_db_config = {{ output_lines = result.stdout.split('\n') if result.stdout else [] error_lines = result.stderr.split('\n') if result.stderr else [] + # 输出日志到控制台和日志文件 + if output_lines: + logger.info(f"[{platform}] 爬虫标准输出:") + for line in output_lines: + if line.strip(): # 忽略空行 + logger.info(f"[{platform}] {line}") + + if error_lines: + logger.warning(f"[{platform}] 爬虫错误输出:") + for line in error_lines: + if line.strip(): # 忽略空行 + logger.warning(f"[{platform}] {line}") + # 合并所有输出行用于解析 all_lines = output_lines + error_lines @@ -329,10 +391,64 @@ postgresql_db_config = {{ # 合并所有行用于解析 all_lines = output_lines + error_lines + # 用于统计各平台的保存操作次数(通过日志关键字统计) + # 视频/内容保存操作的关键字 + content_save_keywords = [ + "[store.bilibili.update_bilibili_video]", + "update_bilibili_video", + "[store.douyin.update_dy_aweme]", + "update_dy_aweme", + "[store.kuaishou.update_kuaishou_video]", + "update_kuaishou_video", + "[store.xhs.update_xhs_note]", + "update_xhs_note", + "[store.weibo.update_weibo_note]", + "update_weibo_note", + "[store.tieba.update_tieba_note]", + "update_tieba_note", + "[store.zhihu.update_zhihu_content]", + "update_zhihu_content", + ] + + # 评论保存操作的关键字 + comment_save_keywords = [ + "[store.bilibili.update_bilibili_video_comment]", + "update_bilibili_video_comment", + "[store.douyin.update_dy_aweme_comment]", + "update_dy_aweme_comment", + "[store.kuaishou.update_ks_video_comment]", + "update_ks_video_comment", + "[store.xhs.update_xhs_note_comment]", + "update_xhs_note_comment", + "[store.weibo.update_weibo_note_comment]", + "update_weibo_note_comment", + "[store.tieba.update_tieba_note_comment]", + "update_tieba_note_comment", + "[store.zhihu.update_zhihu_content_comment]", + "update_zhihu_note_comment", + "update_zhihu_content_comment", + ] + + # 先统计日志关键字出现的次数(用于bilibili等没有汇总信息的平台) + log_keyword_content_count = 0 + log_keyword_comment_count = 0 + # 解析输出行,查找各种可能的数据保存信息 for line in all_lines: line_lower = line.lower() + # 统计视频/内容保存操作(通过日志关键字) + for keyword in content_save_keywords: + if keyword in line or keyword.lower() in line_lower: + log_keyword_content_count += 1 + break # 避免重复计数 + + # 统计评论保存操作(通过日志关键字) + for keyword in comment_save_keywords: + if keyword in line or keyword.lower() in line_lower: + log_keyword_comment_count += 1 + break # 避免重复计数 + # 查找保存的内容数量(多种可能的格式) # 例如:"保存了 10 条笔记"、"成功保存 5 条内容"、"inserted 3 records"等 if any(keyword in line_lower for keyword in ["条笔记", "条内容", "条视频", "条帖子", "条回答"]): @@ -367,10 +483,18 @@ postgresql_db_config = {{ if any(keyword in line_lower for keyword in ["登录", "扫码", "login", "需要登录"]): stats["login_required"] = True + # 如果通过汇总信息没有找到保存数量,使用日志关键字统计的结果 + # 这样可以支持bilibili等没有输出汇总信息的平台 + if stats["notes_count"] == 0 and log_keyword_content_count > 0: + stats["notes_count"] = log_keyword_content_count + if stats["comments_count"] == 0 and log_keyword_comment_count > 0: + stats["comments_count"] = log_keyword_comment_count + # 如果没有找到明确的保存数量,尝试从数据库操作日志中提取 if stats["notes_count"] == 0 and stats["comments_count"] == 0: # 查找可能的数据库插入信息 for line in all_lines: + line_lower = line.lower() # 查找类似 "insert into" 或 "保存到" 的信息 if "insert" in line_lower or "保存到" in line_lower: try: diff --git a/MindSpider/config.py b/MindSpider/config.py index 68043ca..e68c4d3 100644 --- a/MindSpider/config.py +++ b/MindSpider/config.py @@ -34,6 +34,14 @@ class Settings(BaseSettings): True, description="开启后运行基于关键词的爬取流程,关闭则完全跳过关键词搜索" ) + STRICT_KEYWORDS: Optional[str] = Field( + None, + description="精确匹配关键词(逗号分隔),内容必须完整包含这些关键词才能落库" + ) + FUZZY_KEYWORDS: Optional[str] = Field( + None, + description="模糊匹配关键词(逗号分隔),内容包含这些关键词的部分即可落库" + ) MINDSPIDER_API_KEY: Optional[str] = Field(None, description="MINDSPIDER API密钥") MINDSPIDER_BASE_URL: Optional[str] = Field("https://api.deepseek.com", description="MINDSPIDER API基础URL,推荐deepseek-chat模型使用https://api.deepseek.com") MINDSPIDER_MODEL_NAME: Optional[str] = Field("deepseek-chat", description="MINDSPIDER API模型名称, 推荐deepseek-chat")