The Insight Engine agent has been basically completed.

This commit is contained in:
戒酒的李白
2025-08-23 15:11:51 +08:00
parent c35a6baf05
commit 4e33224633
7 changed files with 437 additions and 54 deletions
+9 -1
View File
@@ -9,10 +9,18 @@ from .search import (
DBResponse,
print_response_summary
)
from .keyword_optimizer import (
KeywordOptimizer,
KeywordOptimizationResponse,
keyword_optimizer
)
__all__ = [
"MediaCrawlerDB",
"QueryResult",
"DBResponse",
"print_response_summary"
"print_response_summary",
"KeywordOptimizer",
"KeywordOptimizationResponse",
"keyword_optimizer"
]
+286
View File
@@ -0,0 +1,286 @@
"""
关键词优化中间件
使用Qwen AI将Agent生成的搜索词优化为更适合舆情数据库查询的关键词
"""
import requests
import json
import sys
import os
from typing import List, Dict, Any
from dataclasses import dataclass
# 添加项目根目录到Python路径以导入config
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from config import GUIJI_QWEN3_API_KEY
@dataclass
class KeywordOptimizationResponse:
"""关键词优化响应"""
original_query: str
optimized_keywords: List[str]
reasoning: str
success: bool
error_message: str = ""
class KeywordOptimizer:
"""
关键词优化器
使用硅基流动的Qwen3模型将Agent生成的搜索词优化为更贴近真实舆情的关键词
"""
def __init__(self, api_key: str = None):
"""
初始化关键词优化器
Args:
api_key: 硅基流动API密钥,如果不提供则从配置文件读取
"""
self.api_key = api_key or GUIJI_QWEN3_API_KEY
self.base_url = "https://api.siliconflow.cn/v1/chat/completions"
self.model = "Qwen/Qwen3-30B-A3B-Instruct-2507"
if not self.api_key:
raise ValueError("未找到硅基流动API密钥,请在config.py中设置GUIJI_QWEN3_API_KEY")
def optimize_keywords(self, original_query: str, context: str = "") -> KeywordOptimizationResponse:
"""
优化搜索关键词
Args:
original_query: Agent生成的原始搜索查询
context: 额外的上下文信息(如段落标题、内容描述等)
Returns:
KeywordOptimizationResponse: 优化后的关键词列表
"""
print(f"🔍 关键词优化中间件: 处理查询 '{original_query}'")
try:
# 构建优化prompt
system_prompt = self._build_system_prompt()
user_prompt = self._build_user_prompt(original_query, context)
# 调用Qwen API
response = self._call_qwen_api(system_prompt, user_prompt)
if response["success"]:
# 解析响应
content = response["content"]
try:
# 尝试解析JSON格式的响应
if content.strip().startswith('{'):
parsed = json.loads(content)
keywords = parsed.get("keywords", [])
reasoning = parsed.get("reasoning", "")
else:
# 如果不是JSON格式,尝试从文本中提取关键词
keywords = self._extract_keywords_from_text(content)
reasoning = content
# 验证关键词质量
validated_keywords = self._validate_keywords(keywords)
print(f"✅ 优化成功: {len(validated_keywords)}个关键词")
for i, keyword in enumerate(validated_keywords, 1):
print(f" {i}. '{keyword}'")
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=validated_keywords,
reasoning=reasoning,
success=True
)
except Exception as e:
print(f"⚠️ 解析响应失败,使用备用方案: {str(e)}")
# 备用方案:从原始查询中提取关键词
fallback_keywords = self._fallback_keyword_extraction(original_query)
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=fallback_keywords,
reasoning="API响应解析失败,使用备用关键词提取",
success=True
)
else:
print(f"❌ API调用失败: {response['error']}")
# 使用备用方案
fallback_keywords = self._fallback_keyword_extraction(original_query)
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=fallback_keywords,
reasoning="API调用失败,使用备用关键词提取",
success=True,
error_message=response['error']
)
except Exception as e:
print(f"❌ 关键词优化失败: {str(e)}")
# 最终备用方案
fallback_keywords = self._fallback_keyword_extraction(original_query)
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=fallback_keywords,
reasoning="系统错误,使用备用关键词提取",
success=False,
error_message=str(e)
)
def _build_system_prompt(self) -> str:
"""构建系统prompt"""
return """你是一位专业的舆情数据挖掘专家。你的任务是将用户提供的搜索查询优化为更适合在社交媒体舆情数据库中查找的关键词。
**核心原则**
1. **贴近网民语言**:使用普通网友在社交媒体上会使用的词汇
2. **避免专业术语**:不使用"舆情""传播""倾向""展望"等官方词汇
3. **简洁具体**:每个关键词要非常简洁明了,便于数据库匹配
4. **情感丰富**:包含网民常用的情感表达词汇
5. **数量控制**:最少提供10个关键词,最多提供20个关键词
6. **避免重复**:不要脱离初始查询的主题
**输出格式**
请以JSON格式返回结果:
{
"keywords": ["关键词1", "关键词2", "关键词3"],
"reasoning": "选择这些关键词的理由"
}
**示例**
输入:"武汉大学舆情管理 未来展望 发展趋势"
输出:
{
"keywords": ["武大", "武汉大学", "学校管理", "大学", "教育"],
"reasoning": "选择'武大''武汉大学'作为核心词汇,这是网民最常使用的称呼;'学校管理''舆情管理'更贴近日常表达;避免使用'未来展望''发展趋势'等网民很少使用的专业术语"
}"""
def _build_user_prompt(self, original_query: str, context: str) -> str:
"""构建用户prompt"""
prompt = f"请将以下搜索查询优化为适合舆情数据库查询的关键词:\n\n原始查询:{original_query}"
if context:
prompt += f"\n\n上下文信息:{context}"
prompt += "\n\n请记住:要使用网民在社交媒体上真实使用的词汇,避免官方术语和专业词汇。"
return prompt
def _call_qwen_api(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]:
"""调用Qwen API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_tokens": 10000,
"temperature": 0.7
}
try:
response = requests.post(self.base_url, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
return {"success": True, "content": content}
else:
return {"success": False, "error": "API返回格式异常"}
except requests.exceptions.RequestException as e:
return {"success": False, "error": f"网络请求错误: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"API调用异常: {str(e)}"}
def _extract_keywords_from_text(self, text: str) -> List[str]:
"""从文本中提取关键词(当JSON解析失败时使用)"""
# 简单的关键词提取逻辑
lines = text.split('\n')
keywords = []
for line in lines:
line = line.strip()
# 查找可能的关键词
if '' in line or ':' in line:
parts = line.split('') if '' in line else line.split(':')
if len(parts) > 1:
potential_keywords = parts[1].strip()
# 尝试分割关键词
if '' in potential_keywords:
keywords.extend([k.strip() for k in potential_keywords.split('')])
elif ',' in potential_keywords:
keywords.extend([k.strip() for k in potential_keywords.split(',')])
else:
keywords.append(potential_keywords)
# 如果没有找到,尝试其他方法
if not keywords:
# 查找引号中的内容
import re
quoted_content = re.findall(r'["""\'](.*?)["""\']', text)
keywords.extend(quoted_content)
# 清理和验证关键词
cleaned_keywords = []
for keyword in keywords[:20]: # 最多5个
keyword = keyword.strip().strip('"\'""''')
if keyword and len(keyword) <= 20: # 合理长度
cleaned_keywords.append(keyword)
return cleaned_keywords[:20]
def _validate_keywords(self, keywords: List[str]) -> List[str]:
"""验证和清理关键词"""
validated = []
# 不良关键词(过于专业或官方)
bad_keywords = {
'态度分析', '公众反应', '情绪倾向',
'未来展望', '发展趋势', '战略规划', '政策导向', '管理机制'
}
for keyword in keywords:
if isinstance(keyword, str):
keyword = keyword.strip().strip('"\'""''')
# 基本验证
if (keyword and
len(keyword) <= 20 and
len(keyword) >= 1 and
not any(bad_word in keyword for bad_word in bad_keywords)):
validated.append(keyword)
return validated[:20] # 最多返回20个关键词
def _fallback_keyword_extraction(self, original_query: str) -> List[str]:
"""备用关键词提取方案"""
# 简单的关键词提取逻辑
# 移除常见的无用词汇
stop_words = {''}
# 分割查询
import re
# 按空格、标点分割
tokens = re.split(r'[\s,。!?;:、]+', original_query)
keywords = []
for token in tokens:
token = token.strip()
if token and token not in stop_words and len(token) >= 2:
keywords.append(token)
# 如果没有有效关键词,使用原始查询的第一个词
if not keywords:
first_word = original_query.split()[0] if original_query.split() else original_query
keywords = [first_word] if first_word else ["热门"]
return keywords[:20]
# 全局实例
keyword_optimizer = KeywordOptimizer()
+11 -11
View File
@@ -2,7 +2,7 @@
专为 AI Agent 设计的本地舆情数据库查询工具集 (MediaCrawlerDB)
版本: 3.0
最后更新: 2025-08-22
最后更新: 2025-08-23
此脚本将复杂的本地MySQL数据库查询功能封装成一系列目标明确、参数清晰的独立工具,
专为AI Agent调用而设计。Agent只需根据任务意图(如搜索热点、全局搜索话题、
@@ -44,7 +44,7 @@ class QueryResult:
publish_time: Optional[datetime] = None
engagement: Dict[str, int] = field(default_factory=dict)
source_keyword: Optional[str] = None
hotness_score: float = 0.0 # 新增:综合热度分
hotness_score: float = 0.0
source_table: str = ""
@dataclass
@@ -136,14 +136,14 @@ class MediaCrawlerDB:
def search_hot_content(
self,
time_period: Literal['24h', 'week', 'year'] = 'week',
limit: int = 10
limit: int = 50
) -> DBResponse:
"""
【工具】查找热点内容: (已简化) 获取最近一段时间内综合热度最高的内容。
【工具】查找热点内容: 获取最近一段时间内综合热度最高的内容。
Args:
time_period (Literal['24h', 'week', 'year']): 时间范围,默认为 'week'
limit (int): 返回结果的最大数量,默认为 10。
limit (int): 返回结果的最大数量,默认为 50。
Returns:
DBResponse: 包含按综合热度排序后的内容列表。
@@ -190,13 +190,13 @@ class MediaCrawlerDB:
formatted_results = [QueryResult(platform=r['p'], content_type=r['t'], title_or_content=r['title'], author_nickname=r.get('author'), url=r['url'], publish_time=self._to_datetime(r['ts']), engagement=self._extract_engagement(r), hotness_score=r.get('hotness_score', 0.0), source_keyword=r.get('source_keyword'), source_table=r['tbl']) for r in raw_results]
return DBResponse("search_hot_content", params_for_log, results=formatted_results, results_count=len(formatted_results))
def search_topic_globally(self, topic: str, limit_per_table: int = 5) -> DBResponse:
def search_topic_globally(self, topic: str, limit_per_table: int = 100) -> DBResponse:
"""
【工具】全局话题搜索: 在数据库中(内容、评论、标签、来源关键字)全面搜索指定话题。
Args:
topic (str): 要搜索的话题关键词。
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 5
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 100
Returns:
DBResponse: 包含所有匹配结果的聚合列表。
@@ -227,7 +227,7 @@ class MediaCrawlerDB:
))
return DBResponse("search_topic_globally", params_for_log, results=all_results, results_count=len(all_results))
def search_topic_by_date(self, topic: str, start_date: str, end_date: str, limit_per_table: int = 10) -> DBResponse:
def search_topic_by_date(self, topic: str, start_date: str, end_date: str, limit_per_table: int = 100) -> DBResponse:
"""
【工具】按日期搜索话题: 在明确的历史时间段内,搜索与特定话题相关的内容。
@@ -235,7 +235,7 @@ class MediaCrawlerDB:
topic (str): 要搜索的话题关键词。
start_date (str): 开始日期,格式 'YYYY-MM-DD'
end_date (str): 结束日期,格式 'YYYY-MM-DD'
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 10。
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 100
Returns:
DBResponse: 包含在指定日期范围内找到的结果的聚合列表。
@@ -282,13 +282,13 @@ class MediaCrawlerDB:
))
return DBResponse("search_topic_by_date", params_for_log, results=all_results, results_count=len(all_results))
def get_comments_for_topic(self, topic: str, limit: int = 50) -> DBResponse:
def get_comments_for_topic(self, topic: str, limit: int = 500) -> DBResponse:
"""
【工具】获取话题评论: 专门搜索并返回所有平台中与特定话题相关的公众评论数据。
Args:
topic (str): 要搜索的话题关键词。
limit (int): 返回评论的总数量上限,默认为 50。
limit (int): 返回评论的总数量上限,默认为 500
Returns:
DBResponse: 包含匹配的评论列表。