From a33e4b3ad2d37e816fabfb367b3c9e4aa19c21ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=88=92=E9=85=92=E7=9A=84=E6=9D=8E=E7=99=BD?= <670939375@qq.com> Date: Sat, 23 Aug 2025 11:50:14 +0800 Subject: [PATCH] Private database operation tools completed. --- InsightEngine/tools/search.py | 577 +++++++++++++++++++++++----------- 1 file changed, 393 insertions(+), 184 deletions(-) diff --git a/InsightEngine/tools/search.py b/InsightEngine/tools/search.py index cf7b611..bfb6257 100644 --- a/InsightEngine/tools/search.py +++ b/InsightEngine/tools/search.py @@ -1,240 +1,449 @@ """ -专为 AI Agent 设计的舆情搜索工具集 (Tavily) +专为 AI Agent 设计的本地舆情数据库查询工具集 (MediaCrawlerDB) -版本: 1.5 +版本: 3.0 最后更新: 2025-08-22 -此脚本将复杂的Tavily搜索功能分解为一系列目标明确、参数极少的独立工具, -专为AI Agent调用而设计。Agent只需根据任务意图选择合适的工具, -无需理解复杂的参数组合。所有工具默认搜索“新闻”(topic='news')。 +此脚本将复杂的本地MySQL数据库查询功能封装成一系列目标明确、参数清晰的独立工具, +专为AI Agent调用而设计。Agent只需根据任务意图(如搜索热点、全局搜索话题、 +按时间范围分析、获取评论)选择合适的工具,无需编写复杂的SQL语句。 -新特性: -- 新增 `basic_search_news` 工具,用于执行标准、通用的新闻搜索。 -- 每个搜索结果现在都包含 `published_date` (新闻发布日期)。 +V3.0 核心更新: +- 智能热度计算: `search_hot_content`不再需要`sort_by`参数,改为内部使用统一的加权热度算法, + 综合点赞、评论、分享、观看等数据计算热度分值,使结果更智能、更符合综合热度。 +- 新增平台精搜工具: 新增 `search_topic_on_platform` 工具,作为特例, + 允许Agent在特定平台(B站、微博等七大平台)上对某一话题进行精确搜索,并支持时间筛选。 +- 结构优化: 调整了数据结构与函数文档,以适应新功能。 主要工具: -- basic_search_news: (新增) 执行标准、快速的通用新闻搜索。 -- deep_search_news: 对主题进行最全面的深度分析。 -- search_news_last_24_hours: 获取24小时内的最新动态。 -- search_news_last_week: 获取过去一周的主要报道。 -- search_images_for_news: 查找与新闻主题相关的图片。 -- search_news_by_date: 在指定的历史日期范围内搜索。 +- search_hot_content: 查找指定时间范围内的综合热度最高的内容。 +- search_topic_globally: 在整个数据库中全局搜索与特定话题相关的所有内容和评论。 +- search_topic_by_date: 在指定的历史日期范围内搜索与特定话题相关的内容。 +- get_comments_for_topic: 专门提取公众对于某一特定话题的评论数据。 +- search_topic_on_platform: 在指定的单个社交媒体平台上搜索特定话题。 """ import os -from typing import List, Dict, Any, Optional +import json +import pymysql +import pymysql.cursors +from typing import List, Dict, Any, Optional, Literal from dataclasses import dataclass, field - -# 运行前请确保已安装Tavily库: pip install tavily-python -try: - from tavily import TavilyClient -except ImportError: - raise ImportError("Tavily库未安装,请运行 `pip install tavily-python` 进行安装。") +from datetime import datetime, timedelta, date # --- 1. 数据结构定义 --- @dataclass -class SearchResult: - """ - 网页搜索结果数据类 - 包含 published_date 属性来存储新闻发布日期 - """ - title: str - url: str - content: str - score: Optional[float] = None - raw_content: Optional[str] = None - published_date: Optional[str] = None +class QueryResult: + """统一的数据库查询结果数据类""" + platform: str + content_type: str + title_or_content: str + author_nickname: Optional[str] = None + url: Optional[str] = None + publish_time: Optional[datetime] = None + engagement: Dict[str, int] = field(default_factory=dict) + source_keyword: Optional[str] = None + hotness_score: float = 0.0 # 新增:综合热度分 + source_table: str = "" @dataclass -class ImageResult: - """图片搜索结果数据类""" - url: str - description: Optional[str] = None - -@dataclass -class TavilyResponse: - """封装Tavily API的完整返回结果,以便在工具间传递""" - query: str - answer: Optional[str] = None - results: List[SearchResult] = field(default_factory=list) - images: List[ImageResult] = field(default_factory=list) - response_time: Optional[float] = None - +class DBResponse: + """封装工具的完整返回结果""" + tool_name: str + parameters: Dict[str, Any] + results: List[QueryResult] = field(default_factory=list) + results_count: int = 0 + error_message: Optional[str] = None # --- 2. 核心客户端与专用工具集 --- -class TavilyNewsAgency: - """ - 一个包含多种专用新闻舆情搜索工具的客户端。 - 每个公共方法都设计为供 AI Agent 独立调用的工具。 - """ +class MediaCrawlerDB: + """包含多种专用舆情数据库查询工具的客户端""" + # 权重定义 + W_LIKE = 1.0 + W_COMMENT = 5.0 + W_SHARE = 10.0 # 分享/转发/收藏/投币等高价值互动 + W_VIEW = 0.1 + W_DANMAKU = 0.5 - def __init__(self, api_key: Optional[str] = None): + def __init__(self): """ - 初始化客户端。 - Args: - api_key: Tavily API密钥,若不提供则从环境变量 TAVILY_API_KEY 读取。 + 初始化客户端。连接信息从环境变量自动读取: + - DB_HOST, DB_USER, DB_PASSWORD, DB_NAME + - DB_PORT (可选, 默认 3306) + - DB_CHARSET (可选, 默认 utf8mb4) """ - if api_key is None: - api_key = os.getenv("TAVILY_API_KEY") - if not api_key: - raise ValueError("Tavily API Key未找到!请设置TAVILY_API_KEY环境变量或在初始化时提供") - self._client = TavilyClient(api_key=api_key) + self.db_config = { + 'host': os.getenv("DB_HOST"), + 'user': os.getenv("DB_USER"), + 'password': os.getenv("DB_PASSWORD"), + 'db': os.getenv("DB_NAME"), + 'port': int(os.getenv("DB_PORT", 3306)), + 'charset': os.getenv("DB_CHARSET", "utf8mb4"), + 'cursorclass': pymysql.cursors.DictCursor + } + required = ['host', 'user', 'password', 'db'] + if missing := [k for k in required if not self.db_config[k]]: + raise ValueError(f"数据库配置缺失! 请设置环境变量或在代码中提供: {', '.join([f'DB_{k.upper()}' for k in missing])}") - def _search_internal(self, **kwargs) -> TavilyResponse: - """内部通用的搜索执行器,所有工具最终都调用此方法""" + def _execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]: + conn = None try: - kwargs['topic'] = 'general' - api_params = {k: v for k, v in kwargs.items() if v is not None} - response_dict = self._client.search(**api_params) + conn = pymysql.connect(**self.db_config) + with conn.cursor() as cursor: + cursor.execute(query, params or ()) + return cursor.fetchall() + except pymysql.Error as e: + print(f"数据库查询时发生错误: {e}") + return [] + finally: + if conn: conn.close() + + @staticmethod + def _to_datetime(ts: Any) -> Optional[datetime]: + if not ts: return None + try: + if isinstance(ts, datetime): return ts + if isinstance(ts, date): return datetime.combine(ts, datetime.min.time()) + if isinstance(ts, (int, float)) or str(ts).isdigit(): + val = float(ts) + return datetime.fromtimestamp(val / 1000 if val > 1_000_000_000_000 else val) + if isinstance(ts, str): + return datetime.fromisoformat(ts.split('+')[0].strip()) + except (ValueError, TypeError): return None + + _table_columns_cache = {} + def _get_table_columns(self, table_name: str) -> List[str]: + if table_name in self._table_columns_cache: return self._table_columns_cache[table_name] + results = self._execute_query(f"SHOW COLUMNS FROM `{table_name}`") + columns = [row['Field'] for row in results] if results else [] + self._table_columns_cache[table_name] = columns + return columns + + def _extract_engagement(self, row: Dict[str, Any]) -> Dict[str, int]: + """从数据行中提取并统一互动指标""" + engagement = {} + mapping = { 'likes': ['liked_count', 'like_count', 'voteup_count', 'comment_like_count'], 'comments': ['video_comment', 'comments_count', 'comment_count', 'total_replay_num', 'sub_comment_count'], 'shares': ['video_share_count', 'shared_count', 'share_count', 'total_forwards'], 'views': ['video_play_count', 'viewd_count'], 'favorites': ['video_favorite_count', 'collected_count'], 'coins': ['video_coin_count'], 'danmaku': ['video_danmaku'], } + for key, potential_cols in mapping.items(): + for col in potential_cols: + if col in row and row[col] is not None: + try: engagement[key] = int(row[col]) + except (ValueError, TypeError): engagement[key] = 0 + break + return engagement + + def search_hot_content( + self, + time_period: Literal['24h', 'week', 'year'] = 'week', + limit: int = 10 + ) -> DBResponse: + """ + 【工具】查找热点内容: (已简化) 获取最近一段时间内综合热度最高的内容。 + + Args: + time_period (Literal['24h', 'week', 'year']): 时间范围,默认为 'week'。 + limit (int): 返回结果的最大数量,默认为 10。 + + Returns: + DBResponse: 包含按综合热度排序后的内容列表。 + """ + params_for_log = {'time_period': time_period, 'limit': limit} + print(f"--- TOOL: 查找热点内容 (params: {params_for_log}) ---") + + now = datetime.now() + start_time = now - timedelta(days={'24h': 1, 'week': 7}.get(time_period, 365)) + + # 定义各平台的热度计算SQL片段 + hotness_formulas = { + 'bilibili_video': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(video_comment AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(video_share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_favorite_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_coin_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_danmaku AS UNSIGNED), 0) * {self.W_DANMAKU} + COALESCE(CAST(video_play_count AS DECIMAL(20,2)), 0) * {self.W_VIEW})", + 'douyin_aweme': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(collected_count AS UNSIGNED), 0) * {self.W_SHARE})", + 'weibo_note': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comments_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(shared_count AS UNSIGNED), 0) * {self.W_SHARE})", + 'xhs_note': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(collected_count AS UNSIGNED), 0) * {self.W_SHARE})", + 'kuaishou_video': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(viewd_count AS DECIMAL(20,2)), 0) * {self.W_VIEW})", + 'zhihu_content': f"(COALESCE(CAST(voteup_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT})", + } + + all_queries, params = [], [] + for table, formula in hotness_formulas.items(): + time_filter_sql, time_filter_param = "", None + if table == 'weibo_note': time_filter_sql, time_filter_param = "`create_date_time` >= %s", start_time.strftime('%Y-%m-%d %H:%M:%S') + elif table in ['kuaishou_video', 'xhs_note', 'douyin_aweme']: time_col = 'time' if table == 'xhs_note' else 'create_time'; time_filter_sql, time_filter_param = f"`{time_col}` >= %s", str(int(start_time.timestamp() * 1000)) + elif table == 'zhihu_content': time_filter_sql, time_filter_param = "CAST(`created_time` AS UNSIGNED) >= %s", str(int(start_time.timestamp())) + else: time_filter_sql, time_filter_param = "`create_time` >= %s", str(int(start_time.timestamp())) + + content_type = 'note' if table in ['weibo_note', 'xhs_note'] else 'content' if table == 'zhihu_content' else 'video' + query_template = "SELECT '{platform}' as p, '{type}' as t, {title} as title, {author} as author, {url} as url, {ts} as ts, {formula} as hotness_score, source_keyword, '{tbl}' as tbl FROM `{tbl}` WHERE {time_filter}" - search_results = [ - SearchResult( - title=item.get('title'), - url=item.get('url'), - content=item.get('content'), - score=item.get('score'), - raw_content=item.get('raw_content'), - published_date=item.get('published_date') - ) for item in response_dict.get('results', []) - ] + field_subs = {'platform': table.split('_')[0], 'type': content_type, 'title': 'title', 'author': 'nickname', 'url': 'video_url', 'ts': 'create_time', 'formula': formula, 'tbl': table, 'time_filter': time_filter_sql} + if table == 'weibo_note': field_subs.update({'title': 'content', 'url': 'note_url', 'ts': 'create_date_time'}) + elif table == 'xhs_note': field_subs.update({'ts': 'time', 'url': 'note_url'}) + elif table == 'zhihu_content': field_subs.update({'author': 'user_nickname', 'url': 'content_url', 'ts': 'created_time'}) + elif table == 'douyin_aweme': field_subs.update({'url': 'aweme_url'}) + + all_queries.append(query_template.format(**field_subs)) + params.append(time_filter_param) + + final_query = f"({' ) UNION ALL ( '.join(all_queries)}) ORDER BY hotness_score DESC LIMIT %s" + raw_results = self._execute_query(final_query, tuple(params) + (limit,)) + + formatted_results = [QueryResult(platform=r['p'], content_type=r['t'], title_or_content=r['title'], author_nickname=r.get('author'), url=r['url'], publish_time=self._to_datetime(r['ts']), engagement=self._extract_engagement(r), hotness_score=r.get('hotness_score', 0.0), source_keyword=r.get('source_keyword'), source_table=r['tbl']) for r in raw_results] + return DBResponse("search_hot_content", params_for_log, results=formatted_results, results_count=len(formatted_results)) + + def search_topic_globally(self, topic: str, limit_per_table: int = 5) -> DBResponse: + """ + 【工具】全局话题搜索: 在数据库中(内容、评论、标签、来源关键字)全面搜索指定话题。 + + Args: + topic (str): 要搜索的话题关键词。 + limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 5。 + + Returns: + DBResponse: 包含所有匹配结果的聚合列表。 + """ + params_for_log = {'topic': topic, 'limit_per_table': limit_per_table} + print(f"--- TOOL: 全局话题搜索 (params: {params_for_log}) ---") + + search_term, all_results = f"%{topic}%", [] + search_configs = { 'bilibili_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'bilibili_video_comment': {'fields': ['content'], 'type': 'comment'}, 'douyin_aweme': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'douyin_aweme_comment': {'fields': ['content'], 'type': 'comment'}, 'kuaishou_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'kuaishou_video_comment': {'fields': ['content'], 'type': 'comment'}, 'weibo_note': {'fields': ['content', 'source_keyword'], 'type': 'note'}, 'weibo_note_comment': {'fields': ['content'], 'type': 'comment'}, 'xhs_note': {'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note'}, 'xhs_note_comment': {'fields': ['content'], 'type': 'comment'}, 'zhihu_content': {'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content'}, 'zhihu_comment': {'fields': ['content'], 'type': 'comment'}, 'tieba_note': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'note'}, 'tieba_comment': {'fields': ['content'], 'type': 'comment'}, 'daily_news': {'fields': ['title'], 'type': 'news'}, } + + for table, config in search_configs.items(): + where_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']]) + query = f"SELECT * FROM `{table}` WHERE {where_clause} ORDER BY id DESC LIMIT %s" + params = (search_term,) * len(config['fields']) + (limit_per_table,) + raw_results = self._execute_query(query, params) + for row in raw_results: + content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', '')) + time_key = row.get('create_time') or row.get('time') or row.get('created_time') or row.get('publish_time') or row.get('crawl_date') + all_results.append(QueryResult( + platform=table.split('_')[0], content_type=config['type'], + title_or_content=content[:500] if content else '', + author_nickname=row.get('nickname') or row.get('user_nickname') or row.get('user_name'), + url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'), + publish_time=self._to_datetime(time_key), + engagement=self._extract_engagement(row), + source_keyword=row.get('source_keyword'), + source_table=table + )) + return DBResponse("search_topic_globally", params_for_log, results=all_results, results_count=len(all_results)) + + def search_topic_by_date(self, topic: str, start_date: str, end_date: str, limit_per_table: int = 10) -> DBResponse: + """ + 【工具】按日期搜索话题: 在明确的历史时间段内,搜索与特定话题相关的内容。 + + Args: + topic (str): 要搜索的话题关键词。 + start_date (str): 开始日期,格式 'YYYY-MM-DD'。 + end_date (str): 结束日期,格式 'YYYY-MM-DD'。 + limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 10。 + + Returns: + DBResponse: 包含在指定日期范围内找到的结果的聚合列表。 + """ + params_for_log = {'topic': topic, 'start_date': start_date, 'end_date': end_date, 'limit_per_table': limit_per_table} + print(f"--- TOOL: 按日期搜索话题 (params: {params_for_log}) ---") + + try: + start_dt, end_dt = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1) + except ValueError: + return DBResponse("search_topic_by_date", params_for_log, error_message="日期格式错误,请使用 'YYYY-MM-DD' 格式。") + + search_term, all_results = f"%{topic}%", [] + search_configs = { + 'bilibili_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'sec'}, 'douyin_aweme': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, + 'kuaishou_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, 'weibo_note': {'fields': ['content', 'source_keyword'], 'type': 'note', 'time_col': 'create_date_time', 'time_type': 'str'}, + 'xhs_note': {'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note', 'time_col': 'time', 'time_type': 'ms'}, 'zhihu_content': {'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content', 'time_col': 'created_time', 'time_type': 'sec_str'}, + 'tieba_note': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'note', 'time_col': 'publish_time', 'time_type': 'str'}, 'daily_news': {'fields': ['title'], 'type': 'news', 'time_col': 'crawl_date', 'time_type': 'date_str'}, + } + + for table, config in search_configs.items(): + topic_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']]) + time_col, time_type = config['time_col'], config['time_type'] + if time_type == 'sec': time_params = (int(start_dt.timestamp()), int(end_dt.timestamp())) + elif time_type == 'ms': time_params = (int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000)) + elif time_type in ['str', 'date_str']: time_params = (start_dt.strftime('%Y-%m-%d'), end_dt.strftime('%Y-%m-%d')) + else: time_params = (str(int(start_dt.timestamp())), str(int(end_dt.timestamp()))) + time_clause = f"`{time_col}` >= %s AND `{time_col}` < %s" + if table == 'zhihu_content': time_clause = f"CAST(`{time_col}` AS UNSIGNED) >= %s AND CAST(`{time_col}` AS UNSIGNED) < %s" + query = f"SELECT * FROM `{table}` WHERE ({topic_clause}) AND ({time_clause}) ORDER BY id DESC LIMIT %s" + params = (search_term,) * len(config['fields']) + time_params + (limit_per_table,) + raw_results = self._execute_query(query, params) + for row in raw_results: + content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', '')) + all_results.append(QueryResult( + platform=table.split('_')[0], content_type=config['type'], + title_or_content=content[:500] if content else '', + author_nickname=row.get('nickname') or row.get('user_nickname'), + url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'), + publish_time=self._to_datetime(row.get(config['time_col'])), + engagement=self._extract_engagement(row), + source_keyword=row.get('source_keyword'), + source_table=table + )) + return DBResponse("search_topic_by_date", params_for_log, results=all_results, results_count=len(all_results)) + + def get_comments_for_topic(self, topic: str, limit: int = 50) -> DBResponse: + """ + 【工具】获取话题评论: 专门搜索并返回所有平台中与特定话题相关的公众评论数据。 + + Args: + topic (str): 要搜索的话题关键词。 + limit (int): 返回评论的总数量上限,默认为 50。 + + Returns: + DBResponse: 包含匹配的评论列表。 + """ + params_for_log = {'topic': topic, 'limit': limit} + print(f"--- TOOL: 获取话题评论 (params: {params_for_log}) ---") + + search_term = f"%{topic}%" + comment_tables = ['bilibili_video_comment', 'douyin_aweme_comment', 'kuaishou_video_comment', 'weibo_note_comment', 'xhs_note_comment', 'zhihu_comment', 'tieba_comment'] + + all_queries = [] + for table in comment_tables: + cols = self._get_table_columns(table) + author_col = 'user_nickname' if 'user_nickname' in cols else 'nickname' + like_col = 'comment_like_count' if 'comment_like_count' in cols else 'like_count' if 'like_count' in cols else None + time_col = 'publish_time' if 'publish_time' in cols else 'create_date_time' if 'create_date_time' in cols else 'create_time' + like_select = f"`{like_col}` as likes" if like_col else "'0' as likes" - image_results = [ImageResult(url=item.get('url'), description=item.get('description')) for item in response_dict.get('images', [])] + query = (f"SELECT '{table.split('_')[0]}' as platform, `content`, `{author_col}` as author, " + f"`{time_col}` as ts, {like_select}, '{table}' as source_table " + f"FROM `{table}` WHERE `content` LIKE %s") + all_queries.append(query) - return TavilyResponse( - query=response_dict.get('query'), answer=response_dict.get('answer'), - results=search_results, images=image_results, - response_time=response_dict.get('response_time') - ) - except Exception as e: - print(f"搜索时发生错误: {str(e)}") - return TavilyResponse(query=kwargs.get("query", "Unknown Query")) + final_query = f"({' ) UNION ALL ( '.join(all_queries)}) ORDER BY ts DESC LIMIT %s" + params = (search_term,) * len(comment_tables) + (limit,) + raw_results = self._execute_query(final_query, params) + + formatted = [QueryResult(platform=r['platform'], content_type='comment', title_or_content=r['content'], author_nickname=r['author'], publish_time=self._to_datetime(r['ts']), engagement={'likes': int(r['likes']) if str(r['likes']).isdigit() else 0}, source_table=r['source_table']) for r in raw_results] + return DBResponse("get_comments_for_topic", params_for_log, results=formatted, results_count=len(formatted)) - # --- Agent 可用的工具方法 --- + def search_topic_on_platform( + self, + platform: Literal['bilibili', 'weibo', 'douyin', 'kuaishou', 'xhs', 'zhihu', 'tieba'], + topic: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: int = 20 + ) -> DBResponse: + """ + 【工具】平台定向搜索: (新增) 在指定的单个社交媒体平台上搜索特定话题。 - def basic_search_news(self, query: str, max_results: int = 7) -> TavilyResponse: - """ - 【工具】基础新闻搜索: 执行一次标准、快速的新闻搜索。 - 这是最常用的通用搜索工具,适用于不确定需要何种特定搜索时。 - Agent可提供搜索查询(query)和可选的最大结果数(max_results)。 - """ - print(f"--- TOOL: 基础新闻搜索 (query: {query}) ---") - return self._search_internal( - query=query, - max_results=max_results, - search_depth="basic", - include_answer=False - ) + Args: + platform (Literal['bilibili', ...]): 要搜索的平台,必须是七个支持的平台之一。 + topic (str): 要搜索的话题关键词。 + start_date (Optional[str]): 开始日期,格式 'YYYY-MM-DD'。默认为None。 + end_date (Optional[str]): 结束日期,格式 'YYYY-MM-DD'。默认为None。 + limit (int): 返回结果的最大数量,默认为 20。 - def deep_search_news(self, query: str) -> TavilyResponse: + Returns: + DBResponse: 包含在该平台找到的结果列表。 """ - 【工具】深度新闻分析: 对一个主题进行最全面、最深入的搜索。 - 返回AI生成的“高级”详细摘要答案和最多20条最相关的新闻结果。适用于需要全面了解某个事件背景的场景。 - Agent只需提供搜索查询(query)。 - """ - print(f"--- TOOL: 深度新闻分析 (query: {query}) ---") - return self._search_internal( - query=query, search_depth="advanced", max_results=20, include_answer="advanced" - ) + params_for_log = {'platform': platform, 'topic': topic, 'start_date': start_date, 'end_date': end_date, 'limit': limit} + print(f"--- TOOL: 平台定向搜索 (params: {params_for_log}) ---") - def search_news_last_24_hours(self, query: str) -> TavilyResponse: - """ - 【工具】搜索24小时内新闻: 获取关于某个主题的最新动态。 - 此工具专门查找过去24小时内发布的新闻。适用于追踪突发事件或最新进展。 - Agent只需提供搜索查询(query)。 - """ - print(f"--- TOOL: 搜索24小时内新闻 (query: {query}) ---") - return self._search_internal(query=query, time_range='d', max_results=10) + all_configs = { 'bilibili': [{'table': 'bilibili_video', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'sec'}, {'table': 'bilibili_video_comment', 'fields': ['content'], 'type': 'comment'}], 'douyin': [{'table': 'douyin_aweme', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, {'table': 'douyin_aweme_comment', 'fields': ['content'], 'type': 'comment'}], 'kuaishou': [{'table': 'kuaishou_video', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, {'table': 'kuaishou_video_comment', 'fields': ['content'], 'type': 'comment'}], 'weibo': [{'table': 'weibo_note', 'fields': ['content', 'source_keyword'], 'type': 'note', 'time_col': 'create_date_time', 'time_type': 'str'}, {'table': 'weibo_note_comment', 'fields': ['content'], 'type': 'comment'}], 'xhs': [{'table': 'xhs_note', 'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note', 'time_col': 'time', 'time_type': 'ms'}, {'table': 'xhs_note_comment', 'fields': ['content'], 'type': 'comment'}], 'zhihu': [{'table': 'zhihu_content', 'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content', 'time_col': 'created_time', 'time_type': 'sec_str'}, {'table': 'zhihu_comment', 'fields': ['content'], 'type': 'comment'}], 'tieba': [{'table': 'tieba_note', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'note', 'time_col': 'publish_time', 'time_type': 'str'}, {'table': 'tieba_comment', 'fields': ['content'], 'type': 'comment'}] } + + if platform not in all_configs: + return DBResponse("search_topic_on_platform", params_for_log, error_message=f"不支持的平台: {platform}") - def search_news_last_week(self, query: str) -> TavilyResponse: - """ - 【工具】搜索本周新闻: 获取关于某个主题过去一周内的主要新闻报道。 - 适用于进行周度舆情总结或回顾。 - Agent只需提供搜索查询(query)。 - """ - print(f"--- TOOL: 搜索本周新闻 (query: {query}) ---") - return self._search_internal(query=query, time_range='w', max_results=10) + search_term, all_results = f"%{topic}%", [] + platform_configs = all_configs[platform] - def search_images_for_news(self, query: str) -> TavilyResponse: - """ - 【工具】查找新闻图片: 搜索与某个新闻主题相关的图片。 - 此工具会返回图片链接及描述,适用于需要为报告或文章配图的场景。 - Agent只需提供搜索查询(query)。 - """ - print(f"--- TOOL: 查找新闻图片 (query: {query}) ---") - return self._search_internal( - query=query, include_images=True, include_image_descriptions=True, max_results=5 - ) + time_clause, time_params_tuple = "", () + if start_date and end_date: + try: + start_dt, end_dt = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1) + except ValueError: + return DBResponse("search_topic_on_platform", params_for_log, error_message="日期格式错误,请使用 'YYYY-MM-DD' 格式。") + else: + start_dt, end_dt = None, None - def search_news_by_date(self, query: str, start_date: str, end_date: str) -> TavilyResponse: - """ - 【工具】按指定日期范围搜索新闻: 在一个明确的历史时间段内搜索新闻。 - 这是唯一需要Agent提供详细时间参数的工具。适用于需要对特定历史事件进行分析的场景。 - Agent需要提供查询(query)、开始日期(start_date)和结束日期(end_date),格式均为 'YYYY-MM-DD'。 - """ - print(f"--- TOOL: 按指定日期范围搜索新闻 (query: {query}, from: {start_date}, to: {end_date}) ---") - return self._search_internal( - query=query, start_date=start_date, end_date=end_date, max_results=15 - ) + for config in platform_configs: + table = config['table'] + topic_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']]) + query = f"SELECT * FROM `{table}` WHERE {topic_clause}" + params = [search_term] * len(config['fields']) + if start_dt and end_dt and 'time_col' in config: + time_col, time_type = config['time_col'], config['time_type'] + if time_type == 'sec': t_params = (int(start_dt.timestamp()), int(end_dt.timestamp())) + elif time_type == 'ms': t_params = (int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000)) + elif time_type in ['str', 'date_str']: t_params = (start_dt.strftime('%Y-%m-%d'), end_dt.strftime('%Y-%m-%d')) + else: t_params = (str(int(start_dt.timestamp())), str(int(end_dt.timestamp()))) + + t_clause = f"`{time_col}` >= %s AND `{time_col}` < %s" + if table == 'zhihu_content': t_clause = f"CAST(`{time_col}` AS UNSIGNED) >= %s AND CAST(`{time_col}` AS UNSIGNED) < %s" + + query += f" AND ({t_clause})" + params.extend(t_params) + + query += f" ORDER BY id DESC LIMIT %s" + params.append(limit) + + raw_results = self._execute_query(query, tuple(params)) + for row in raw_results: + content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', '')) + time_key = config.get('time_col') and row.get(config.get('time_col')) + all_results.append(QueryResult(platform=platform, content_type=config['type'], title_or_content=content[:500] if content else '', author_nickname=row.get('nickname') or row.get('user_nickname'), url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'), publish_time=self._to_datetime(time_key), engagement=self._extract_engagement(row), source_keyword=row.get('source_keyword'), source_table=table)) + + return DBResponse("search_topic_on_platform", params_for_log, results=all_results, results_count=len(all_results)) # --- 3. 测试与使用示例 --- - -def print_response_summary(response: TavilyResponse): - """简化的打印函数,用于展示测试结果,现在会显示发布日期""" - if not response or not response.query: - print("未能获取有效响应。") +def print_response_summary(response: DBResponse): + """简化的打印函数,用于展示测试结果""" + if response.error_message: + print(f"工具 '{response.tool_name}' 执行出错: {response.error_message}") + print("-" * 80) return - - print(f"\n查询: '{response.query}' | 耗时: {response.response_time}s") - if response.answer: - print(f"AI摘要: {response.answer[:120]}...") - print(f"找到 {len(response.results)} 条网页, {len(response.images)} 张图片。") - if response.results: - first_result = response.results[0] - date_info = f"(发布于: {first_result.published_date})" if first_result.published_date else "" - print(f"第一条结果: {first_result.title} {date_info}") - print("-" * 60) + params_str = ", ".join(f"{k}='{v}'" for k, v in response.parameters.items()) + print(f"查询: 工具='{response.tool_name}', 参数=[{params_str}]") + print(f"找到 {response.results_count} 条相关记录。") + + if response.results: + print("--- 前5条结果示例 ---") + for i, res in enumerate(response.results[:5]): + engagement_str = ", ".join(f"{k}: {v}" for k, v in res.engagement.items() if v) + content_preview = (res.title_or_content.replace('\n', ' ')[:70] + '...') if res.title_or_content and len(res.title_or_content) > 70 else res.title_or_content + hotness_str = f", hotness: {res.hotness_score:.2f}" if res.hotness_score > 0 else "" + print( + f"{i+1}. [{res.platform.upper()}/{res.content_type}] {content_preview}\n" + f" by: {res.author_nickname}, at: {res.publish_time.strftime('%Y-%m-%d %H:%M') if res.publish_time else 'N/A'}" + f", src_kw: '{res.source_keyword or 'N/A'}'{hotness_str}" + f", engagement: {{{engagement_str}}}" + ) + print("-" * 80) if __name__ == "__main__": - # 在运行前,请确保您已设置 TAVILY_API_KEY 环境变量 try: - # 初始化“新闻社”客户端,它内部包含了所有工具 - agency = TavilyNewsAgency() - - # 场景1: Agent 进行一次常规、快速的搜索 - response1 = agency.basic_search_news(query="奥运会最新赛况", max_results=5) + db_agent_tools = MediaCrawlerDB() + print("数据库工具初始化成功,开始执行测试场景...\n") + + # 场景1: (新) 查找过去一周综合热度最高的内容 (不再需要sort_by) + response1 = db_agent_tools.search_hot_content(time_period='week', limit=5) print_response_summary(response1) - # 场景2: Agent 需要全面了解“全球芯片技术竞争”的背景 - response2 = agency.deep_search_news(query="全球芯片技术竞争") + # 场景2: 查找过去24小时内综合热度最高的内容 + response2 = db_agent_tools.search_hot_content(time_period='24h', limit=5) print_response_summary(response2) - # 场景3: Agent 需要追踪“GTC大会”的最新消息 - response3 = agency.search_news_last_24_hours(query="Nvidia GTC大会 最新发布") + # 场景3: 全局搜索"罗永浩" + response3 = db_agent_tools.search_topic_globally(topic="罗永浩", limit_per_table=2) print_response_summary(response3) - - # 场景4: Agent 需要为一篇关于“自动驾驶”的周报查找素材 - response4 = agency.search_news_last_week(query="自动驾驶商业化落地") - print_response_summary(response4) - - # 场景5: Agent 需要查找“韦伯太空望远镜”的新闻图片 - response5 = agency.search_images_for_news(query="韦伯太空望远镜最新发现") - print_response_summary(response5) - # 场景6: Agent 需要研究2025年第一季度关于“人工智能法规”的新闻 - response6 = agency.search_news_by_date( - query="人工智能法规", - start_date="2025-01-01", - end_date="2025-03-31" - ) - print_response_summary(response6) + # 场景4: (新增) 在B站上精确搜索"论文" + response4 = db_agent_tools.search_topic_on_platform(platform='bilibili', topic="论文", limit=5) + print_response_summary(response4) + + # 场景5: (新增) 在微博上精确搜索 "许凯" 在特定一天内的内容 + response5 = db_agent_tools.search_topic_on_platform(platform='weibo', topic="许凯", start_date='2025-08-22', end_date='2025-08-22', limit=5) + print_response_summary(response5) except ValueError as e: print(f"初始化失败: {e}") - print("请确保 TAVILY_API_KEY 环境变量已正确设置。") + print("请确保相关的数据库环境变量已正确设置, 或在代码中直接提供连接信息。") except Exception as e: print(f"测试过程中发生未知错误: {e}") \ No newline at end of file