The final report agent has been largely completed.

This commit is contained in:
戒酒的李白
2025-08-26 17:34:36 +08:00
parent 197e68f7ba
commit f0788b64f3
52 changed files with 7853 additions and 825 deletions
+13
View File
@@ -0,0 +1,13 @@
"""
Report Engine
一个智能报告生成AI代理实现
基于三个子agent的输出和论坛日志生成综合HTML报告
"""
from .agent import ReportAgent, create_agent
from .utils.config import Config, load_config
__version__ = "1.0.0"
__author__ = "Report Engine Team"
__all__ = ["ReportAgent", "create_agent", "Config", "load_config"]
+521
View File
@@ -0,0 +1,521 @@
"""
Report Agent主类
整合所有模块,实现完整的报告生成流程
"""
import json
import os
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List
from .llms import GeminiLLM, BaseLLM
from .nodes import (
TemplateSelectionNode,
HTMLGenerationNode
)
from .state import ReportState
from .utils import Config, load_config
class FileCountBaseline:
"""文件数量基准管理器"""
def __init__(self):
self.baseline_file = 'logs/report_baseline.json'
self.baseline_data = self._load_baseline()
def _load_baseline(self) -> Dict[str, int]:
"""加载基准数据"""
try:
if os.path.exists(self.baseline_file):
with open(self.baseline_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载基准数据失败: {e}")
return {}
def _save_baseline(self):
"""保存基准数据"""
try:
os.makedirs(os.path.dirname(self.baseline_file), exist_ok=True)
with open(self.baseline_file, 'w', encoding='utf-8') as f:
json.dump(self.baseline_data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存基准数据失败: {e}")
def initialize_baseline(self, directories: Dict[str, str]) -> Dict[str, int]:
"""初始化文件数量基准"""
current_counts = {}
for engine, directory in directories.items():
if os.path.exists(directory):
md_files = [f for f in os.listdir(directory) if f.endswith('.md')]
current_counts[engine] = len(md_files)
else:
current_counts[engine] = 0
# 保存基准数据
self.baseline_data = current_counts.copy()
self._save_baseline()
print(f"文件数量基准已初始化: {current_counts}")
return current_counts
def check_new_files(self, directories: Dict[str, str]) -> Dict[str, Any]:
"""检查是否有新文件"""
current_counts = {}
new_files_found = {}
all_have_new = True
for engine, directory in directories.items():
if os.path.exists(directory):
md_files = [f for f in os.listdir(directory) if f.endswith('.md')]
current_counts[engine] = len(md_files)
baseline_count = self.baseline_data.get(engine, 0)
if current_counts[engine] > baseline_count:
new_files_found[engine] = current_counts[engine] - baseline_count
else:
new_files_found[engine] = 0
all_have_new = False
else:
current_counts[engine] = 0
new_files_found[engine] = 0
all_have_new = False
return {
'ready': all_have_new,
'baseline_counts': self.baseline_data,
'current_counts': current_counts,
'new_files_found': new_files_found,
'missing_engines': [engine for engine, count in new_files_found.items() if count == 0]
}
def get_latest_files(self, directories: Dict[str, str]) -> Dict[str, str]:
"""获取每个目录的最新文件"""
latest_files = {}
for engine, directory in directories.items():
if os.path.exists(directory):
md_files = [f for f in os.listdir(directory) if f.endswith('.md')]
if md_files:
latest_file = max(md_files, key=lambda x: os.path.getmtime(os.path.join(directory, x)))
latest_files[engine] = os.path.join(directory, latest_file)
return latest_files
class ReportAgent:
"""Report Agent主类"""
def __init__(self, config: Optional[Config] = None):
"""
初始化Report Agent
Args:
config: 配置对象,如果不提供则自动加载
"""
# 加载配置
self.config = config or load_config()
# 初始化文件基准管理器
self.file_baseline = FileCountBaseline()
# 初始化日志
self._setup_logging()
# 初始化LLM客户端
self.llm_client = self._initialize_llm()
# 初始化节点
self._initialize_nodes()
# 初始化文件数量基准
self._initialize_file_baseline()
# 状态
self.state = ReportState()
# 确保输出目录存在
os.makedirs(self.config.output_dir, exist_ok=True)
self.logger.info("Report Agent已初始化")
self.logger.info(f"使用LLM: {self.llm_client.get_model_info()}")
def _setup_logging(self):
"""设置日志"""
# 确保日志目录存在
log_dir = os.path.dirname(self.config.log_file)
os.makedirs(log_dir, exist_ok=True)
# 创建专用的logger,避免与其他模块冲突
self.logger = logging.getLogger('ReportEngine')
self.logger.setLevel(logging.INFO)
# 清除已有的handlers
if self.logger.handlers:
self.logger.handlers.clear()
# 创建文件handler
file_handler = logging.FileHandler(self.config.log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 创建控制台handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加handlers
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
# 防止日志向上传播
self.logger.propagate = False
def _initialize_file_baseline(self):
"""初始化文件数量基准"""
directories = {
'insight': 'insight_engine_streamlit_reports',
'media': 'media_engine_streamlit_reports',
'query': 'query_engine_streamlit_reports'
}
self.file_baseline.initialize_baseline(directories)
def _initialize_llm(self) -> BaseLLM:
"""初始化LLM客户端"""
if self.config.default_llm_provider == "gemini":
return GeminiLLM(
api_key=self.config.gemini_api_key,
model_name=self.config.gemini_model
)
else:
raise ValueError(f"不支持的LLM提供商: {self.config.default_llm_provider}")
def _initialize_nodes(self):
"""初始化处理节点"""
self.template_selection_node = TemplateSelectionNode(
self.llm_client,
self.config.template_dir
)
self.html_generation_node = HTMLGenerationNode(self.llm_client)
def generate_report(self, query: str, reports: List[Any], forum_logs: str = "",
custom_template: str = "", save_report: bool = True) -> str:
"""
生成综合报告
Args:
query: 原始查询
reports: 三个子agent的报告列表(按顺序:QueryEngine, MediaEngine, InsightEngine
forum_logs: 论坛日志内容
custom_template: 用户自定义模板(可选)
save_report: 是否保存报告到文件
Returns:
最终HTML报告内容
"""
start_time = datetime.now()
self.logger.info(f"开始生成报告: {query}")
self.logger.info(f"输入数据 - 报告数量: {len(reports)}, 论坛日志长度: {len(forum_logs)}")
try:
# Step 1: 模板选择
template_result = self._select_template(query, reports, forum_logs, custom_template)
# Step 2: 直接生成HTML报告
html_report = self._generate_html_report(query, reports, forum_logs, template_result)
# Step 3: 保存报告
if save_report:
self._save_report(html_report)
# 更新生成时间
end_time = datetime.now()
generation_time = (end_time - start_time).total_seconds()
self.state.metadata.generation_time = generation_time
self.logger.info(f"报告生成完成,耗时: {generation_time:.2f}")
return html_report
except Exception as e:
self.logger.error(f"报告生成过程中发生错误: {str(e)}")
raise e
def _select_template(self, query: str, reports: List[Any], forum_logs: str, custom_template: str):
"""选择报告模板"""
self.logger.info("选择报告模板...")
# 如果用户提供了自定义模板,直接使用
if custom_template:
self.logger.info("使用用户自定义模板")
return {
'template_name': 'custom',
'template_content': custom_template,
'selection_reason': '用户指定的自定义模板'
}
template_input = {
'query': query,
'reports': reports,
'forum_logs': forum_logs
}
try:
template_result = self.template_selection_node.run(template_input)
# 更新状态
self.state.metadata.template_used = template_result['template_name']
self.logger.info(f"选择模板: {template_result['template_name']}")
self.logger.info(f"选择理由: {template_result['selection_reason']}")
return template_result
except Exception as e:
self.logger.error(f"模板选择失败,使用默认模板: {str(e)}")
# 直接使用备用模板
fallback_template = {
'template_name': '社会公共热点事件分析报告模板',
'template_content': self._get_fallback_template_content(),
'selection_reason': '模板选择失败,使用默认社会热点事件分析模板'
}
self.state.metadata.template_used = fallback_template['template_name']
return fallback_template
def _generate_html_report(self, query: str, reports: List[Any], forum_logs: str, template_result: Dict[str, Any]) -> str:
"""生成HTML报告"""
self.logger.info("多轮生成HTML报告...")
# 准备报告内容,确保有3个报告
query_report = reports[0] if len(reports) > 0 else ""
media_report = reports[1] if len(reports) > 1 else ""
insight_report = reports[2] if len(reports) > 2 else ""
# 转换为字符串格式
query_report = str(query_report) if query_report else ""
media_report = str(media_report) if media_report else ""
insight_report = str(insight_report) if insight_report else ""
html_input = {
'query': query,
'query_engine_report': query_report,
'media_engine_report': media_report,
'insight_engine_report': insight_report,
'forum_logs': forum_logs,
'selected_template': template_result.get('template_content', '')
}
# 使用HTML生成节点生成报告
html_content = self.html_generation_node.run(html_input)
# 更新状态
self.state.html_content = html_content
self.state.mark_completed()
self.logger.info("HTML报告生成完成")
return html_content
def _get_fallback_template_content(self) -> str:
"""获取备用模板内容"""
return """# 社会公共热点事件分析报告
## 执行摘要
本报告针对当前社会热点事件进行综合分析,整合了多方信息源的观点和数据。
## 事件概况
### 基本信息
- 事件性质:{event_nature}
- 发生时间:{event_time}
- 涉及范围:{event_scope}
## 舆情态势分析
### 整体趋势
{sentiment_analysis}
### 主要观点分布
{opinion_distribution}
## 媒体报道分析
### 主流媒体态度
{media_analysis}
### 报道重点
{report_focus}
## 社会影响评估
### 直接影响
{direct_impact}
### 潜在影响
{potential_impact}
## 应对建议
### 即时措施
{immediate_actions}
### 长期策略
{long_term_strategy}
## 结论与展望
{conclusion}
---
*报告类型:社会公共热点事件分析*
*生成时间:{generation_time}*
"""
def _save_report(self, html_content: str):
"""保存报告到文件"""
# 生成文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
query_safe = "".join(c for c in self.state.metadata.query if c.isalnum() or c in (' ', '-', '_')).rstrip()
query_safe = query_safe.replace(' ', '_')[:30]
filename = f"final_report_{query_safe}_{timestamp}.html"
filepath = os.path.join(self.config.output_dir, filename)
# 保存HTML报告
with open(filepath, 'w', encoding='utf-8') as f:
f.write(html_content)
self.logger.info(f"报告已保存到: {filepath}")
# 保存状态
state_filename = f"report_state_{query_safe}_{timestamp}.json"
state_filepath = os.path.join(self.config.output_dir, state_filename)
self.state.save_to_file(state_filepath)
self.logger.info(f"状态已保存到: {state_filepath}")
def get_progress_summary(self) -> Dict[str, Any]:
"""获取进度摘要"""
return self.state.to_dict()
def load_state(self, filepath: str):
"""从文件加载状态"""
self.state = ReportState.load_from_file(filepath)
self.logger.info(f"状态已从 {filepath} 加载")
def save_state(self, filepath: str):
"""保存状态到文件"""
self.state.save_to_file(filepath)
self.logger.info(f"状态已保存到 {filepath}")
def check_input_files(self, insight_dir: str, media_dir: str, query_dir: str, forum_log_path: str) -> Dict[str, Any]:
"""
检查输入文件是否准备就绪(基于文件数量增加)
Args:
insight_dir: InsightEngine报告目录
media_dir: MediaEngine报告目录
query_dir: QueryEngine报告目录
forum_log_path: 论坛日志文件路径
Returns:
检查结果字典
"""
# 检查各个报告目录的文件数量变化
directories = {
'insight': insight_dir,
'media': media_dir,
'query': query_dir
}
# 使用文件基准管理器检查新文件
check_result = self.file_baseline.check_new_files(directories)
# 检查论坛日志
forum_ready = os.path.exists(forum_log_path)
# 构建返回结果
result = {
'ready': check_result['ready'] and forum_ready,
'baseline_counts': check_result['baseline_counts'],
'current_counts': check_result['current_counts'],
'new_files_found': check_result['new_files_found'],
'missing_files': [],
'files_found': [],
'latest_files': {}
}
# 构建详细信息
for engine, new_count in check_result['new_files_found'].items():
current_count = check_result['current_counts'][engine]
baseline_count = check_result['baseline_counts'].get(engine, 0)
if new_count > 0:
result['files_found'].append(f"{engine}: {current_count}个文件 (新增{new_count}个)")
else:
result['missing_files'].append(f"{engine}: {current_count}个文件 (基准{baseline_count}个,无新增)")
# 检查论坛日志
if forum_ready:
result['files_found'].append(f"forum: {os.path.basename(forum_log_path)}")
else:
result['missing_files'].append("forum: 日志文件不存在")
# 获取最新文件路径(用于实际报告生成)
if result['ready']:
result['latest_files'] = self.file_baseline.get_latest_files(directories)
if forum_ready:
result['latest_files']['forum'] = forum_log_path
return result
def load_input_files(self, file_paths: Dict[str, str]) -> Dict[str, Any]:
"""
加载输入文件内容
Args:
file_paths: 文件路径字典
Returns:
加载的内容字典
"""
content = {
'reports': [],
'forum_logs': ''
}
# 加载报告文件
engines = ['query', 'media', 'insight']
for engine in engines:
if engine in file_paths:
try:
with open(file_paths[engine], 'r', encoding='utf-8') as f:
report_content = f.read()
content['reports'].append(report_content)
self.logger.info(f"已加载 {engine} 报告: {len(report_content)} 字符")
except Exception as e:
self.logger.error(f"加载 {engine} 报告失败: {str(e)}")
content['reports'].append("")
# 加载论坛日志
if 'forum' in file_paths:
try:
with open(file_paths['forum'], 'r', encoding='utf-8') as f:
content['forum_logs'] = f.read()
self.logger.info(f"已加载论坛日志: {len(content['forum_logs'])} 字符")
except Exception as e:
self.logger.error(f"加载论坛日志失败: {str(e)}")
return content
def create_agent(config_file: Optional[str] = None) -> ReportAgent:
"""
创建Report Agent实例的便捷函数
Args:
config_file: 配置文件路径
Returns:
ReportAgent实例
"""
config = load_config(config_file)
return ReportAgent(config)
+469
View File
@@ -0,0 +1,469 @@
"""
Report Engine Flask接口
提供HTTP API用于报告生成
"""
import os
import json
import threading
import time
from datetime import datetime
from flask import Blueprint, request, jsonify, Response
from typing import Dict, Any
from .agent import ReportAgent, create_agent
from .utils.config import load_config
# 创建Blueprint
report_bp = Blueprint('report_engine', __name__)
# 全局变量
report_agent = None
current_task = None
task_lock = threading.Lock()
def initialize_report_engine():
"""初始化Report Engine"""
global report_agent
try:
config = load_config()
report_agent = create_agent()
print("Report Engine初始化成功")
return True
except Exception as e:
print(f"Report Engine初始化失败: {str(e)}")
return False
class ReportTask:
"""报告生成任务"""
def __init__(self, query: str, task_id: str):
self.task_id = task_id
self.query = query
self.status = "pending" # pending, running, completed, error
self.progress = 0
self.result = None
self.error_message = ""
self.created_at = datetime.now()
self.updated_at = datetime.now()
self.html_content = ""
def update_status(self, status: str, progress: int = None, error_message: str = ""):
"""更新任务状态"""
self.status = status
if progress is not None:
self.progress = progress
if error_message:
self.error_message = error_message
self.updated_at = datetime.now()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'task_id': self.task_id,
'query': self.query,
'status': self.status,
'progress': self.progress,
'error_message': self.error_message,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat(),
'has_result': bool(self.html_content)
}
def check_engines_ready() -> Dict[str, Any]:
"""检查三个子引擎是否都有新文件"""
directories = {
'insight': 'insight_engine_streamlit_reports',
'media': 'media_engine_streamlit_reports',
'query': 'query_engine_streamlit_reports'
}
forum_log_path = 'logs/forum.log'
if not report_agent:
return {
'ready': False,
'error': 'Report Engine未初始化'
}
return report_agent.check_input_files(
directories['insight'],
directories['media'],
directories['query'],
forum_log_path
)
def run_report_generation(task: ReportTask, query: str):
"""在后台线程中运行报告生成"""
global current_task
try:
task.update_status("running", 10)
# 检查输入文件
check_result = check_engines_ready()
if not check_result['ready']:
task.update_status("error", 0, f"输入文件未准备就绪: {check_result.get('missing_files', [])}")
return
task.update_status("running", 30)
# 加载输入文件
content = report_agent.load_input_files(check_result['latest_files'])
task.update_status("running", 50)
# 生成报告
html_report = report_agent.generate_report(
query=query,
reports=content['reports'],
forum_logs=content['forum_logs'],
save_report=True
)
task.update_status("running", 90)
# 保存结果
task.html_content = html_report
task.update_status("completed", 100)
except Exception as e:
task.update_status("error", 0, str(e))
# 只在出错时清理任务
with task_lock:
if current_task and current_task.task_id == task.task_id:
current_task = None
@report_bp.route('/status', methods=['GET'])
def get_status():
"""获取Report Engine状态"""
try:
engines_status = check_engines_ready()
return jsonify({
'success': True,
'initialized': report_agent is not None,
'engines_ready': engines_status['ready'],
'files_found': engines_status.get('files_found', []),
'missing_files': engines_status.get('missing_files', []),
'current_task': current_task.to_dict() if current_task else None
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@report_bp.route('/generate', methods=['POST'])
def generate_report():
"""开始生成报告"""
global current_task
try:
# 检查是否有任务在运行
with task_lock:
if current_task and current_task.status == "running":
return jsonify({
'success': False,
'error': '已有报告生成任务在运行中',
'current_task': current_task.to_dict()
}), 400
# 如果有已完成的任务,清理它
if current_task and current_task.status in ["completed", "error"]:
current_task = None
# 获取请求参数
data = request.get_json() or {}
query = data.get('query', '智能舆情分析报告')
# 清空日志文件
clear_report_log()
# 检查Report Engine是否初始化
if not report_agent:
return jsonify({
'success': False,
'error': 'Report Engine未初始化'
}), 500
# 检查输入文件是否准备就绪
engines_status = check_engines_ready()
if not engines_status['ready']:
return jsonify({
'success': False,
'error': '输入文件未准备就绪',
'missing_files': engines_status.get('missing_files', [])
}), 400
# 创建新任务
task_id = f"report_{int(time.time())}"
task = ReportTask(query, task_id)
with task_lock:
current_task = task
# 在后台线程中运行报告生成
thread = threading.Thread(
target=run_report_generation,
args=(task, query),
daemon=True
)
thread.start()
return jsonify({
'success': True,
'task_id': task_id,
'message': '报告生成已启动',
'task': task.to_dict()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@report_bp.route('/progress/<task_id>', methods=['GET'])
def get_progress(task_id: str):
"""获取报告生成进度"""
try:
if not current_task or current_task.task_id != task_id:
# 如果任务不存在,可能是已经完成并被清理了
# 返回一个默认的完成状态而不是404
return jsonify({
'success': True,
'task': {
'task_id': task_id,
'status': 'completed',
'progress': 100,
'error_message': '',
'has_result': True
}
})
return jsonify({
'success': True,
'task': current_task.to_dict()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@report_bp.route('/result/<task_id>', methods=['GET'])
def get_result(task_id: str):
"""获取报告生成结果"""
try:
if not current_task or current_task.task_id != task_id:
return jsonify({
'success': False,
'error': '任务不存在'
}), 404
if current_task.status != "completed":
return jsonify({
'success': False,
'error': '报告尚未完成',
'task': current_task.to_dict()
}), 400
return Response(
current_task.html_content,
mimetype='text/html'
)
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@report_bp.route('/result/<task_id>/json', methods=['GET'])
def get_result_json(task_id: str):
"""获取报告生成结果(JSON格式)"""
try:
if not current_task or current_task.task_id != task_id:
return jsonify({
'success': False,
'error': '任务不存在'
}), 404
if current_task.status != "completed":
return jsonify({
'success': False,
'error': '报告尚未完成',
'task': current_task.to_dict()
}), 400
return jsonify({
'success': True,
'task': current_task.to_dict(),
'html_content': current_task.html_content
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@report_bp.route('/cancel/<task_id>', methods=['POST'])
def cancel_task(task_id: str):
"""取消报告生成任务"""
global current_task
try:
with task_lock:
if current_task and current_task.task_id == task_id:
if current_task.status == "running":
current_task.update_status("cancelled", 0, "用户取消任务")
current_task = None
return jsonify({
'success': True,
'message': '任务已取消'
})
else:
return jsonify({
'success': False,
'error': '任务不存在或无法取消'
}), 404
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@report_bp.route('/templates', methods=['GET'])
def get_templates():
"""获取可用模板列表"""
try:
if not report_agent:
return jsonify({
'success': False,
'error': 'Report Engine未初始化'
}), 500
template_dir = report_agent.config.template_dir
templates = []
if os.path.exists(template_dir):
for filename in os.listdir(template_dir):
if filename.endswith('.md'):
template_path = os.path.join(template_dir, filename)
try:
with open(template_path, 'r', encoding='utf-8') as f:
content = f.read()
templates.append({
'name': filename.replace('.md', ''),
'filename': filename,
'description': content.split('\n')[0] if content else '无描述',
'size': len(content)
})
except Exception as e:
print(f"读取模板失败 {filename}: {str(e)}")
return jsonify({
'success': True,
'templates': templates,
'template_dir': template_dir
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
# 错误处理
@report_bp.errorhandler(404)
def not_found(error):
return jsonify({
'success': False,
'error': 'API端点不存在'
}), 404
@report_bp.errorhandler(500)
def internal_error(error):
return jsonify({
'success': False,
'error': '服务器内部错误'
}), 500
def clear_report_log():
"""清空report.log文件"""
try:
config = load_config()
log_file = config.log_file
with open(log_file, 'w', encoding='utf-8') as f:
f.write('')
print(f"已清空日志文件: {log_file}")
except Exception as e:
print(f"清空日志文件失败: {str(e)}")
@report_bp.route('/log', methods=['GET'])
def get_report_log():
"""获取report.log内容"""
try:
config = load_config()
log_file = config.log_file
if not os.path.exists(log_file):
return jsonify({
'success': True,
'log_lines': []
})
with open(log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 清理行尾的换行符
log_lines = [line.rstrip('\n\r') for line in lines if line.strip()]
return jsonify({
'success': True,
'log_lines': log_lines
})
except Exception as e:
return jsonify({
'success': False,
'error': f'读取日志失败: {str(e)}'
}), 500
@report_bp.route('/log/clear', methods=['POST'])
def clear_log():
"""手动清空日志"""
try:
clear_report_log()
return jsonify({
'success': True,
'message': '日志已清空'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'清空日志失败: {str(e)}'
}), 500
+9
View File
@@ -0,0 +1,9 @@
"""
Report Engine LLM模块
包含各种大语言模型的接口实现
"""
from .base import BaseLLM
from .gemini_llm import GeminiLLM
__all__ = ["BaseLLM", "GeminiLLM"]
+95
View File
@@ -0,0 +1,95 @@
"""
Report Engine LLM基类
定义所有LLM实现的基础接口
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class BaseLLM(ABC):
"""LLM基类"""
def __init__(self, api_key: str, model_name: Optional[str] = None):
"""
初始化LLM客户端
Args:
api_key: API密钥
model_name: 模型名称
"""
self.api_key = api_key
self.model_name = model_name
@abstractmethod
def invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> str:
"""
调用LLM生成回复
Args:
system_prompt: 系统提示词
user_prompt: 用户输入
**kwargs: 其他参数
Returns:
生成的回复文本
"""
pass
@abstractmethod
def get_model_info(self) -> Dict[str, Any]:
"""
获取当前模型信息
Returns:
模型信息字典
"""
pass
@abstractmethod
def get_default_model(self) -> str:
"""
获取默认模型名称
Returns:
默认模型名称
"""
pass
def validate_response(self, response: str) -> str:
"""
验证和清理响应内容
Args:
response: 原始响应
Returns:
清理后的响应
"""
if not response:
return ""
# 移除多余的空白字符
response = response.strip()
# 确保响应不为空
if not response:
return "抱歉,生成的内容为空。"
return response
def estimate_tokens(self, text: str) -> int:
"""
估算文本的token数量(简单实现)
Args:
text: 输入文本
Returns:
估算的token数量
"""
# 简单估算:中文字符按1.5个token计算,英文单词按1个token计算
chinese_chars = len([c for c in text if '\u4e00' <= c <= '\u9fff'])
english_words = len(text.split()) - chinese_chars
return int(chinese_chars * 1.5 + english_words)
+129
View File
@@ -0,0 +1,129 @@
"""
Report Engine Gemini LLM实现
使用Gemini 2.5-pro中转API进行文本生成
"""
import os
import sys
from typing import Optional, Dict, Any
from openai import OpenAI
from .base import BaseLLM
# 导入根目录的config
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(os.path.dirname(current_dir))
if root_dir not in sys.path:
sys.path.append(root_dir)
import config
except ImportError:
config = None
# 添加utils目录到Python路径并导入重试模块
try:
if root_dir:
utils_dir = os.path.join(root_dir, 'utils')
if utils_dir not in sys.path:
sys.path.append(utils_dir)
from retry_helper import with_retry, with_graceful_retry, LLM_RETRY_CONFIG
except ImportError:
# 如果无法导入重试模块,使用空装饰器避免报错
def with_retry(config):
def decorator(func):
return func
return decorator
LLM_RETRY_CONFIG = None
class GeminiLLM(BaseLLM):
"""Report Engine Gemini LLM实现类"""
def __init__(self, api_key: Optional[str] = None, model_name: Optional[str] = None):
"""
初始化Gemini客户端
Args:
api_key: Gemini API密钥,如果不提供则从config或环境变量读取
model_name: 模型名称,默认使用gemini-2.5-pro
"""
if api_key is None:
# 优先从根目录config读取
if config and hasattr(config, 'GEMINI_API_KEY'):
api_key = config.GEMINI_API_KEY
else:
# 备选方案:从环境变量读取
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("Gemini API Key未找到!请在config.py中设置GEMINI_API_KEY或设置环境变量")
super().__init__(api_key, model_name)
# 初始化OpenAI客户端,使用Gemini的中转endpoint
self.client = OpenAI(
api_key=self.api_key,
base_url="https://www.chataiapi.com/v1"
)
self.default_model = model_name or self.get_default_model()
def get_default_model(self) -> str:
"""获取默认模型名称"""
return "gemini-2.5-pro"
@with_retry(LLM_RETRY_CONFIG)
def invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> str:
"""
调用Gemini API生成回复
Args:
system_prompt: 系统提示词
user_prompt: 用户输入
**kwargs: 其他参数,如temperature、max_tokens等
Returns:
Gemini生成的回复文本
"""
try:
# 构建消息
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# 设置默认参数
params = {
"model": self.default_model,
"messages": messages,
"temperature": kwargs.get("temperature", 0.7),
"max_tokens": kwargs.get("max_tokens", 8000),
"stream": False
}
# 调用API
response = self.client.chat.completions.create(**params)
# 提取回复内容
if response.choices and response.choices[0].message:
content = response.choices[0].message.content
return self.validate_response(content)
else:
return ""
except Exception as e:
print(f"Report Engine Gemini API调用错误: {str(e)}")
raise e
def get_model_info(self) -> Dict[str, Any]:
"""
获取当前模型信息
Returns:
模型信息字典
"""
return {
"provider": "Gemini",
"model": self.default_model,
"api_base": "https://www.chataiapi.com/v1",
"purpose": "Report Generation"
}
+15
View File
@@ -0,0 +1,15 @@
"""
Report Engine节点处理模块
实现报告生成的各个处理步骤
"""
from .base_node import BaseNode, StateMutationNode
from .template_selection_node import TemplateSelectionNode
from .html_generation_node import HTMLGenerationNode
__all__ = [
"BaseNode",
"StateMutationNode",
"TemplateSelectionNode",
"HTMLGenerationNode"
]
+93
View File
@@ -0,0 +1,93 @@
"""
Report Engine节点基类
定义所有处理节点的基础接口
"""
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from ..llms.base import BaseLLM
from ..state.state import ReportState
class BaseNode(ABC):
"""节点基类"""
def __init__(self, llm_client: BaseLLM, node_name: str = ""):
"""
初始化节点
Args:
llm_client: LLM客户端
node_name: 节点名称
"""
self.llm_client = llm_client
self.node_name = node_name or self.__class__.__name__
self.logger = logging.getLogger('ReportEngine')
@abstractmethod
def run(self, input_data: Any, **kwargs) -> Any:
"""
执行节点处理逻辑
Args:
input_data: 输入数据
**kwargs: 额外参数
Returns:
处理结果
"""
pass
def validate_input(self, input_data: Any) -> bool:
"""
验证输入数据
Args:
input_data: 输入数据
Returns:
验证是否通过
"""
return True
def process_output(self, output: Any) -> Any:
"""
处理输出数据
Args:
output: 原始输出
Returns:
处理后的输出
"""
return output
def log_info(self, message: str):
"""记录信息日志"""
formatted_message = f"[{self.node_name}] {message}"
self.logger.info(formatted_message)
def log_error(self, message: str):
"""记录错误日志"""
formatted_message = f"[{self.node_name}] {message}"
self.logger.error(formatted_message)
class StateMutationNode(BaseNode):
"""带状态修改功能的节点基类"""
@abstractmethod
def mutate_state(self, input_data: Any, state: ReportState, **kwargs) -> ReportState:
"""
修改状态
Args:
input_data: 输入数据
state: 当前状态
**kwargs: 额外参数
Returns:
修改后的状态
"""
pass
+340
View File
@@ -0,0 +1,340 @@
"""
HTML生成节点
将整合后的内容转换为美观的HTML报告
"""
import json
from datetime import datetime
from typing import Dict, Any
from .base_node import StateMutationNode
from ..llms.base import BaseLLM
from ..state.state import ReportState
from ..prompts import SYSTEM_PROMPT_HTML_GENERATION
# 不再需要text_processing依赖
class HTMLGenerationNode(StateMutationNode):
"""HTML生成处理节点"""
def __init__(self, llm_client: BaseLLM):
"""
初始化HTML生成节点
Args:
llm_client: LLM客户端
"""
super().__init__(llm_client, "HTMLGenerationNode")
def run(self, input_data: Dict[str, Any], **kwargs) -> str:
"""
执行HTML生成
Args:
input_data: 包含报告数据的字典
- query: 原始查询
- query_engine_report: QueryEngine报告内容
- media_engine_report: MediaEngine报告内容
- insight_engine_report: InsightEngine报告内容
- forum_logs: 论坛日志内容
- selected_template: 选择的模板内容
Returns:
生成的HTML内容
"""
self.log_info("开始生成HTML报告...")
try:
# 准备LLM输入数据
llm_input = {
"query": input_data.get('query', ''),
"query_engine_report": input_data.get('query_engine_report', ''),
"media_engine_report": input_data.get('media_engine_report', ''),
"insight_engine_report": input_data.get('insight_engine_report', ''),
"forum_logs": input_data.get('forum_logs', ''),
"selected_template": input_data.get('selected_template', '')
}
# 转换为JSON格式
message = json.dumps(llm_input, ensure_ascii=False, indent=2)
# 调用LLM生成HTML
response = self.llm_client.invoke(SYSTEM_PROMPT_HTML_GENERATION, message)
# 处理响应
processed_response = self.process_output(response)
self.log_info("HTML报告生成完成")
return processed_response
except Exception as e:
self.log_error(f"HTML生成失败: {str(e)}")
# 返回备用HTML
return self._generate_fallback_html(input_data)
def mutate_state(self, input_data: Dict[str, Any], state: ReportState, **kwargs) -> ReportState:
"""
修改报告状态,添加生成的HTML内容
Args:
input_data: 输入数据
state: 当前报告状态
**kwargs: 额外参数
Returns:
更新后的报告状态
"""
# 生成HTML
html_content = self.run(input_data, **kwargs)
# 更新状态
state.html_content = html_content
state.mark_completed()
return state
def process_output(self, output: str) -> str:
"""
处理LLM输出,提取HTML内容
Args:
output: LLM原始输出
Returns:
清理后的HTML内容
"""
try:
self.log_info(f"处理LLM原始输出,长度: {len(output)} 字符")
html_content = ""
# 尝试解析JSON响应
try:
result = json.loads(output)
html_content = result.get('html_content', '')
self.log_info("成功从JSON中提取html_content")
except json.JSONDecodeError:
self.log_info("不是JSON格式,直接使用原始输出")
html_content = output
# 如果还是没有内容,尝试其他提取方法
if not html_content.strip():
# 查找HTML标记
if '<!DOCTYPE html>' in output:
start_idx = output.find('<!DOCTYPE html>')
html_content = output[start_idx:]
elif '<html' in output:
start_idx = output.find('<html')
html_content = output[start_idx:]
else:
html_content = output
# 清理markdown代码块标记
if html_content.startswith('```html'):
html_content = html_content.replace('```html', '').replace('```', '').strip()
elif html_content.startswith('```'):
html_content = html_content.replace('```', '').strip()
# 处理转义字符
html_content = html_content.replace('\\n', '\n')
html_content = html_content.replace('\\t', '\t')
html_content = html_content.replace('\\r', '\r')
html_content = html_content.replace('\\"', '"')
html_content = html_content.replace("\\'", "'")
# 验证HTML内容
if not html_content.strip():
raise ValueError("生成的HTML内容为空")
# 确保HTML有基本结构
if not html_content.strip().startswith('<!DOCTYPE') and not html_content.strip().startswith('<html'):
self.log_info("HTML缺少基本结构,添加包装")
html_content = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>智能舆情分析报告</title>
</head>
<body>
{html_content}
</body>
</html>"""
self.log_info(f"HTML处理完成,最终长度: {len(html_content)} 字符")
return html_content.strip()
except Exception as e:
self.log_error(f"处理HTML输出失败: {str(e)}")
return self._generate_error_html(str(e))
def _generate_fallback_html(self, input_data: Dict[str, Any]) -> str:
"""
生成备用HTML报告(当LLM失败时使用)
Args:
input_data: 输入数据
Returns:
备用HTML内容
"""
self.log_info("使用备用HTML生成方法")
query = input_data.get('query', '智能舆情分析报告')
query_report = input_data.get('query_engine_report', '')
media_report = input_data.get('media_engine_report', '')
insight_report = input_data.get('insight_engine_report', '')
forum_logs = input_data.get('forum_logs', '')
generation_time = datetime.now().strftime("%Y年%m月%d%H:%M:%S")
html_content = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{query} - 智能舆情分析报告</title>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
line-height: 1.6;
color: #333;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
}}
.container {{
background: white;
padding: 40px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}}
h1 {{
color: #2c3e50;
border-bottom: 3px solid #3498db;
padding-bottom: 10px;
}}
h2 {{
color: #34495e;
margin-top: 30px;
margin-bottom: 15px;
}}
.section {{
margin-bottom: 30px;
padding: 20px;
border-left: 4px solid #3498db;
background: #f8f9fa;
}}
.meta {{
background: #e9ecef;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}}
.footer {{
margin-top: 40px;
padding-top: 20px;
border-top: 1px solid #eee;
text-align: center;
color: #666;
}}
pre {{
background: #f4f4f4;
padding: 15px;
border-radius: 5px;
overflow-x: auto;
white-space: pre-wrap;
}}
</style>
</head>
<body>
<div class="container">
<h1>{query}</h1>
<div class="meta">
<strong>报告生成时间:</strong> {generation_time}<br>
<strong>数据来源:</strong> QueryEngine、MediaEngine、InsightEngine、ForumEngine<br>
<strong>报告类型:</strong> 综合舆情分析报告
</div>
<h2>执行摘要</h2>
<div class="section">
本报告整合了多个分析引擎的研究结果,为您提供全面的舆情分析洞察。
通过对查询主题"{query}"的深度分析,我们从多个维度展现了当前的舆情态势。
</div>
{f'<h2>QueryEngine分析结果</h2><div class="section"><pre>{query_report}</pre></div>' if query_report else ''}
{f'<h2>MediaEngine分析结果</h2><div class="section"><pre>{media_report}</pre></div>' if media_report else ''}
{f'<h2>InsightEngine分析结果</h2><div class="section"><pre>{insight_report}</pre></div>' if insight_report else ''}
{f'<h2>论坛监控数据</h2><div class="section"><pre>{forum_logs[:2000]}{"..." if len(forum_logs) > 2000 else ""}</pre></div>' if forum_logs else ''}
<h2>综合结论</h2>
<div class="section">
基于多个分析引擎的综合研究,我们对"{query}"主题进行了全面分析。
各引擎从不同角度提供了深入洞察,为决策提供了重要参考。
</div>
<div class="footer">
<p>本报告由智能舆情分析平台自动生成</p>
<p>ReportEngine v1.0 | 生成时间: {generation_time}</p>
</div>
</div>
</body>
</html>"""
return html_content
def _generate_error_html(self, error_message: str) -> str:
"""
生成错误HTML页面
Args:
error_message: 错误信息
Returns:
错误HTML内容
"""
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>报告生成失败</title>
<style>
body {{
font-family: Arial, sans-serif;
text-align: center;
padding: 50px;
background: #f8f9fa;
}}
.error-container {{
background: white;
padding: 40px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
max-width: 600px;
margin: 0 auto;
}}
.error-title {{
color: #e74c3c;
font-size: 24px;
margin-bottom: 20px;
}}
.error-message {{
color: #666;
margin-bottom: 20px;
}}
</style>
</head>
<body>
<div class="error-container">
<div class="error-title">报告生成失败</div>
<div class="error-message">错误信息: {error_message}</div>
<p>请检查输入数据或稍后重试。</p>
</div>
</body>
</html>"""
@@ -0,0 +1,274 @@
"""
模板选择节点
根据查询内容和可用模板选择最合适的报告模板
"""
import os
import json
from typing import Dict, Any, List, Optional
from .base_node import BaseNode
from ..prompts import SYSTEM_PROMPT_TEMPLATE_SELECTION
class TemplateSelectionNode(BaseNode):
"""模板选择处理节点"""
def __init__(self, llm_client, template_dir: str = "ReportEngine/report_template"):
"""
初始化模板选择节点
Args:
llm_client: LLM客户端
template_dir: 模板目录路径
"""
super().__init__(llm_client, "TemplateSelectionNode")
self.template_dir = template_dir
def run(self, input_data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""
执行模板选择
Args:
input_data: 包含查询和报告内容的字典
- query: 原始查询
- reports: 三个子agent的报告列表
- forum_logs: 论坛日志内容
Returns:
选择的模板信息
"""
self.log_info("开始模板选择...")
query = input_data.get('query', '')
reports = input_data.get('reports', [])
forum_logs = input_data.get('forum_logs', '')
# 获取可用模板
available_templates = self._get_available_templates()
if not available_templates:
self.log_info("未找到预设模板,使用内置默认模板")
return self._get_fallback_template()
# 首先尝试简单关键词匹配
simple_match = self._simple_keyword_matching(query, available_templates)
if simple_match:
self.log_info(f"通过关键词匹配选择模板: {simple_match['template_name']}")
return simple_match
# 如果关键词匹配失败,尝试LLM选择
try:
llm_result = self._llm_template_selection(query, reports, forum_logs, available_templates)
if llm_result:
return llm_result
except Exception as e:
self.log_error(f"LLM模板选择失败: {str(e)}")
# 所有方法都失败,使用默认的社会热点事件模板
default_template = self._get_default_social_event_template(available_templates)
if default_template:
return default_template
# 最后备选方案
return self._get_fallback_template()
def _simple_keyword_matching(self, query: str, available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""基于关键词的简单模板匹配"""
query_lower = query.lower()
# 关键词映射
keyword_mapping = {
'企业': ['企业品牌'],
'品牌': ['企业品牌'],
'声誉': ['企业品牌'],
'市场': ['市场竞争'],
'竞争': ['市场竞争'],
'格局': ['市场竞争'],
'政策': ['政策', '行业'],
'行业': ['政策', '行业'],
'动态': ['政策', '行业'],
'突发': ['突发事件', '危机'],
'危机': ['突发事件', '危机'],
'公关': ['突发事件', '危机'],
'日常': ['日常', '定期'],
'定期': ['日常', '定期'],
'监测': ['日常', '定期'],
'热点': ['社会公共热点'],
'社会': ['社会公共热点'],
'事件': ['社会公共热点'],
}
# 检查查询中的关键词
for keyword, template_keywords in keyword_mapping.items():
if keyword in query_lower:
# 查找匹配的模板
for template in available_templates:
for template_keyword in template_keywords:
if template_keyword in template['name']:
return {
'template_name': template['name'],
'template_content': template['content'],
'selection_reason': f'基于关键词"{keyword}"匹配选择'
}
return None
def _llm_template_selection(self, query: str, reports: List[Any], forum_logs: str,
available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""使用LLM进行模板选择"""
self.log_info("尝试使用LLM进行模板选择...")
# 构建模板列表
template_list = "\n".join([f"- {t['name']}: {t['description']}" for t in available_templates])
user_message = f"""查询内容: {query}
报告数量: {len(reports)} 个分析引擎报告
论坛日志: {'' if forum_logs else ''}
可用模板:
{template_list}
请选择最合适的模板。"""
# 调用LLM
response = self.llm_client.invoke(SYSTEM_PROMPT_TEMPLATE_SELECTION, user_message)
# 检查响应是否为空
if not response or not response.strip():
self.log_error("LLM返回空响应")
return None
self.log_info(f"LLM原始响应: {response[:200]}...")
# 尝试解析JSON响应
try:
# 清理响应文本
cleaned_response = self._clean_llm_response(response)
result = json.loads(cleaned_response)
# 验证选择的模板是否存在
selected_template_name = result.get('template_name', '')
for template in available_templates:
if template['name'] == selected_template_name or selected_template_name in template['name']:
self.log_info(f"LLM选择模板: {selected_template_name}")
return {
'template_name': template['name'],
'template_content': template['content'],
'selection_reason': result.get('selection_reason', 'LLM智能选择')
}
self.log_error(f"LLM选择的模板不存在: {selected_template_name}")
return None
except json.JSONDecodeError as e:
self.log_error(f"JSON解析失败: {str(e)}")
# 尝试从文本响应中提取模板信息
return self._extract_template_from_text(response, available_templates)
def _clean_llm_response(self, response: str) -> str:
"""清理LLM响应"""
# 移除可能的markdown代码块标记
if '```json' in response:
response = response.split('```json')[1].split('```')[0]
elif '```' in response:
response = response.split('```')[1].split('```')[0]
# 移除前后空白
response = response.strip()
return response
def _extract_template_from_text(self, response: str, available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""从文本响应中提取模板信息"""
self.log_info("尝试从文本响应中提取模板信息")
# 查找响应中是否包含模板名称
for template in available_templates:
template_name_variants = [
template['name'],
template['name'].replace('.md', ''),
template['name'].replace('模板', ''),
]
for variant in template_name_variants:
if variant in response:
self.log_info(f"在响应中找到模板: {template['name']}")
return {
'template_name': template['name'],
'template_content': template['content'],
'selection_reason': '从文本响应中提取'
}
return None
def _get_available_templates(self) -> List[Dict[str, Any]]:
"""获取可用的模板列表"""
templates = []
if not os.path.exists(self.template_dir):
self.log_error(f"模板目录不存在: {self.template_dir}")
return templates
# 查找所有markdown模板文件
for filename in os.listdir(self.template_dir):
if filename.endswith('.md'):
template_path = os.path.join(self.template_dir, filename)
try:
with open(template_path, 'r', encoding='utf-8') as f:
content = f.read()
template_name = filename.replace('.md', '')
description = self._extract_template_description(template_name)
templates.append({
'name': template_name,
'path': template_path,
'content': content,
'description': description
})
except Exception as e:
self.log_error(f"读取模板文件失败 {filename}: {str(e)}")
return templates
def _extract_template_description(self, template_name: str) -> str:
"""根据模板名称生成描述"""
if '企业品牌' in template_name:
return "适用于企业品牌声誉和形象分析"
elif '市场竞争' in template_name:
return "适用于市场竞争格局和对手分析"
elif '日常' in template_name or '定期' in template_name:
return "适用于日常监测和定期汇报"
elif '政策' in template_name or '行业' in template_name:
return "适用于政策影响和行业动态分析"
elif '热点' in template_name or '社会' in template_name:
return "适用于社会热点和公共事件分析"
elif '突发' in template_name or '危机' in template_name:
return "适用于突发事件和危机公关"
return "通用报告模板"
def _get_default_social_event_template(self, available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""获取默认的社会热点事件分析模板"""
# 查找社会热点事件分析模板
for template in available_templates:
if '社会公共热点事件' in template['name'] or '热点' in template['name']:
self.log_info(f"使用默认模板: {template['name']}")
return {
'template_name': template['name'],
'template_content': template['content'],
'selection_reason': '默认使用社会热点事件分析模板'
}
return None
def _get_fallback_template(self) -> Dict[str, Any]:
"""获取备用默认模板(空模板,让LLM自行发挥)"""
self.log_info("未找到合适模板,使用空模板让LLM自行发挥")
return {
'template_name': '自由发挥模板',
'template_content': '',
'selection_reason': '未找到合适的预设模板,让LLM根据内容自行设计报告结构'
}
+20
View File
@@ -0,0 +1,20 @@
"""
Report Engine提示词模块
定义报告生成各个阶段使用的系统提示词
"""
from .prompts import (
SYSTEM_PROMPT_TEMPLATE_SELECTION,
SYSTEM_PROMPT_HTML_GENERATION,
output_schema_template_selection,
output_schema_html_generation,
input_schema_html_generation
)
__all__ = [
"SYSTEM_PROMPT_TEMPLATE_SELECTION",
"SYSTEM_PROMPT_HTML_GENERATION",
"output_schema_template_selection",
"output_schema_html_generation",
"input_schema_html_generation"
]
+140
View File
@@ -0,0 +1,140 @@
"""
Report Engine 的所有提示词定义
参考MediaEngine的结构,专门用于报告生成
"""
import json
# ===== JSON Schema 定义 =====
# 模板选择输出Schema
output_schema_template_selection = {
"type": "object",
"properties": {
"template_name": {"type": "string"},
"selection_reason": {"type": "string"}
},
"required": ["template_name", "selection_reason"]
}
# HTML报告生成输入Schema
input_schema_html_generation = {
"type": "object",
"properties": {
"query": {"type": "string"},
"query_engine_report": {"type": "string"},
"media_engine_report": {"type": "string"},
"insight_engine_report": {"type": "string"},
"forum_logs": {"type": "string"},
"selected_template": {"type": "string"}
}
}
# HTML报告生成输出Schema
output_schema_html_generation = {
"type": "object",
"properties": {
"html_content": {"type": "string"}
},
"required": ["html_content"]
}
# ===== 系统提示词定义 =====
# 模板选择的系统提示词
SYSTEM_PROMPT_TEMPLATE_SELECTION = f"""
你是一个智能报告模板选择助手。根据用户的查询内容和报告特征,从可用模板中选择最合适的一个。
选择标准:
1. 查询内容的主题类型(企业品牌、市场竞争、政策分析等)
2. 报告的紧急程度和时效性
3. 分析的深度和广度要求
4. 目标受众和使用场景
可用模板类型:
- 企业品牌声誉分析报告模板:适用于品牌形象、声誉管理分析
- 市场竞争格局舆情分析报告模板:适用于竞争对手、市场份额分析
- 日常或定期舆情监测报告模板:适用于常规监控、定期汇报
- 特定政策或行业动态舆情分析报告:适用于政策影响、行业变化分析
- 社会公共热点事件分析报告模板:适用于热点事件、社会话题分析
- 突发事件与危机公关舆情报告模板:适用于危机管理、应急响应
请按照以下JSON模式定义格式化输出:
<OUTPUT JSON SCHEMA>
{json.dumps(output_schema_template_selection, indent=2, ensure_ascii=False)}
</OUTPUT JSON SCHEMA>
确保输出是一个符合上述输出JSON模式定义的JSON对象。
只返回JSON对象,不要有解释或额外文本。
"""
# HTML报告生成的系统提示词
SYSTEM_PROMPT_HTML_GENERATION = f"""
你是一位专业的HTML报告生成专家。你将接收来自三个分析引擎的报告内容、论坛监控日志以及选定的报告模板,需要生成一份完整的HTML格式分析报告。
<INPUT JSON SCHEMA>
{json.dumps(input_schema_html_generation, indent=2, ensure_ascii=False)}
</INPUT JSON SCHEMA>
**你的任务:**
1. 整合三个引擎的分析结果,避免重复内容
2. 结合论坛日志数据,提供用户行为洞察
3. 按照选定模板的结构组织内容
4. 生成包含数据可视化的完整HTML报告
**HTML报告要求:**
1. **完整的HTML结构**
- 包含DOCTYPE、html、head、body标签
- 响应式CSS样式
- JavaScript交互功能
2. **美观的设计**
- 现代化的UI设计
- 合理的色彩搭配
- 清晰的排版布局
- 适配移动设备
3. **数据可视化**
- 使用Chart.js生成图表
- 情感分析饼图
- 趋势分析折线图
- 数据源分布图
- 论坛活动统计图
4. **内容结构**
- 报告标题和摘要
- 各引擎分析结果整合
- 论坛数据分析
- 综合结论和建议
- 数据附录
5. **交互功能**
- 目录导航
- 章节折叠展开
- 图表交互
- 打印和PDF导出按钮
- 暗色模式切换
**CSS样式要求:**
- 使用现代CSS特性(Flexbox、Grid
- 响应式设计,支持各种屏幕尺寸
- 优雅的动画效果
- 专业的配色方案
**JavaScript功能要求:**
- Chart.js图表渲染
- 页面交互逻辑
- 导出功能
- 主题切换
请按照以下JSON模式定义格式化输出:
<OUTPUT JSON SCHEMA>
{json.dumps(output_schema_html_generation, indent=2, ensure_ascii=False)}
</OUTPUT JSON SCHEMA>
确保生成的HTML是完整可用的,包含所有必要的样式和脚本。
只返回JSON对象,不要有解释或额外文本。
"""
@@ -0,0 +1,30 @@
### **企业品牌声誉分析报告模板**
- **1.0 摘要与核心发现**
- 1.1 品牌声誉总览
- 1.2 关键指标表现
- 1.3 主要结论与战略启示
- **2.0 品牌声量与影响力分析**
- 2.1 整体声量趋势
- 2.2 渠道声量分布
- 2.3 区域声量分布
- **3.0 本周期关键事件回顾**
- 3.1 关键营销活动时间线
- 3.2 重大舆情事件时间线
- **4.0 品牌形象与用户认知**
- 4.1 情感态度分析
- 4.2 品牌联想词云
- 4.3 核心议题分析
- **5.0 用户画像分析**
- 5.1 人群属性
- 5.2 兴趣标签
- 5.3 核心触媒习惯
- **6.0 声誉风险与机遇洞察**
- 6.1 主要负面议题追踪
- 6.2 潜在风险预警
- 6.3 正面机遇挖掘
- **7.0 结论与战略建议**
- 7.1 品牌SWOT分析总结
- 7.2 品牌沟通优化建议
- 7.3 产品与服务提升建议
- 7.4 下一周期监测重点
@@ -0,0 +1,28 @@
### **市场竞争格局舆情分析报告模板**
- **1.0 市场竞争态势摘要**
- 1.1 核心结论
- 1.2 关键数据对比
- 1.3 核心策略建议
- **2.0 市场声量对比分析**
- 2.1 总体声量份额(SOV
- 2.2 趋势对比
- **3.0 本周期市场关键动态时间线**
- 3.1 我方关键动作
- 3.2 竞争对手关键动作
- **4.0 产品口碑与用户反馈对比**
- 4.1 情感分布对比
- 4.2 优缺点对比
- 4.3 核心功能/卖点讨论热度对比
- **5.0 营销与传播策略对比**
- 5.1 重大营销战役分析
- 5.2 核心传播议题对比
- 5.3 KOL/媒体合作策略分析
- **6.0 竞争机会与威胁识别**
- 6.1 市场机会点
- 6.2 潜在威胁
- 6.3 差异化定位建议
- **7.0 总结与行动建议**
- 7.1 竞争格局总结
- 7.2 学习与借鉴
- 7.3 应对与反制
@@ -0,0 +1,23 @@
### **日常/定期舆情监测报告**模板
- **1.0 本周/月舆情概览**
- 1.1 核心数据看板
- 1.2 本期舆情热度TOP 3
- 1.3 重点预警
- **2.0 关键数据趋势**
- 2.1 声量走势
- 2.2 情感趋势
- **3.0 本周期舆情动态时间轴**
- 3.1 每日/周舆情大事记
- **4.0 热点话题追踪**
- 4.1 本期热点事件/话题详情
- 4.2 新增/突发话题
- **5.0 重点渠道表现**
- 5.1 核心社交媒体表现
- 5.2 核心内容平台表现
- **6.0 负面与风险监测**
- 6.1 负面信息汇总
- 6.2 潜在风险提示
- **7.0 简报与关注点**
- 7.1 本期小结
- 7.2 下期关注重点
@@ -0,0 +1,25 @@
### **特定政策/行业动态舆情分析报告模板**
- **1.0 摘要:政策/动态的核心影响与舆论反应**
- 1.1 核心内容解读
- 1.2 舆论场核心观点
- 1.3 关键影响预判
- **2.0 政策/动态背景与传播分析**
- 2.1 发布背景与解读
- 2.2 发展与发酵时间线
- 2.3 传播声量分析
- 2.4 权威解读与媒体关注点
- **3.0 公众态度与社会情绪**
- 3.1 舆论情绪分布
- 3.2 各方观点聚焦
- **4.0 潜在影响与机遇挑战分析**
- 4.1 对行业格局的影响
- 4.2 对本企业的影响(机遇与挑战)
- 4.3 对关联产业的影响
- **5.0 行业标杆案例与反应**
- 5.1 竞争对手/头部企业的反应
- 5.2 行业协会/组织的观点
- **6.0 结论与应对建议**
- 6.1 趋势研判
- 6.2 战略应对建议
- 6.3 公关沟通建议
@@ -0,0 +1,25 @@
### **社会公共热点事件分析报告模板**
- **1.0 报告摘要**
- 1.1 事件定性
- 1.2 核心洞察
- 1.3 关联性与建议
- **2.0 事件全景与演变脉络**
- 2.1 事件背景与起源
- 2.2 舆论发酵时间线
- 2.3 当前态势
- **3.0 传播路径与引爆点分析**
- 3.1 核心传播链条
- 3.2 引爆点分析
- 3.3 关键传播角色
- **4.0 舆论场多方观点与情绪光谱**
- 4.1 核心议题与讨论焦点
- 4.2 多元观点呈现
- 4.3 社会情绪分析
- **5.0 深层动因与价值观探讨**
- 5.1 事件背后的社会心态
- 5.2 衍生文化与网络Meme
- **6.0 关联性评估与行动建议**
- 6.1 风险评估
- 6.2 机遇评估
- 6.3 最终行动建议(介入/关注/规避)
@@ -0,0 +1,27 @@
### **突发事件与危机公关舆情报告模板**
- **1.0 报告摘要**
- 1.1 事件定性
- 1.2 核心结论
- 1.3 关键建议
- **2.0 事件溯源与发展脉络**
- 2.1 事件背景与首发
- 2.2 关键发展时间线
- 2.3 当前态势
- **3.0 舆情传播分析**
- 3.1 传播声量趋势
- 3.2 核心传播渠道
- 3.3 关键传播节点(KOL/媒体)
- **4.0 舆论场核心焦点与公众态度**
- 4.1 舆论焦点分析
- 4.2 网民情绪分布
- 4.3 主要观点摘录
- **5.0 风险研判**
- 5.1 短期风险
- 5.2 长期风险
- 5.3 次生/衍生风险
- **6.0 应对策略与处置建议**
- 6.1 黄金应对期建议
- 6.2 口径与声明建议
- 6.3 内外部沟通策略
- 6.4 后续行动规划
+8
View File
@@ -0,0 +1,8 @@
"""
Report Engine状态管理模块
定义报告生成过程中的简化状态数据结构
"""
from .state import ReportState, ReportMetadata
__all__ = ["ReportState", "ReportMetadata"]
+138
View File
@@ -0,0 +1,138 @@
"""
Report Engine状态管理
定义报告生成过程中的简化状态数据结构
"""
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
import json
from datetime import datetime
@dataclass
class ReportMetadata:
"""简化的报告元数据"""
query: str = "" # 原始查询
template_used: str = "" # 使用的模板名称
generation_time: float = 0.0 # 生成耗时(秒)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"query": self.query,
"template_used": self.template_used,
"generation_time": self.generation_time,
"timestamp": self.timestamp
}
@dataclass
class ReportState:
"""简化的报告状态管理"""
# 基本信息
task_id: str = "" # 任务ID
query: str = "" # 原始查询
status: str = "pending" # 状态: pending, processing, completed, failed
# 输入数据
query_engine_report: str = "" # QueryEngine报告
media_engine_report: str = "" # MediaEngine报告
insight_engine_report: str = "" # InsightEngine报告
forum_logs: str = "" # 论坛日志
# 处理结果
selected_template: str = "" # 选择的模板
html_content: str = "" # 最终HTML内容
# 元数据
metadata: ReportMetadata = field(default_factory=ReportMetadata)
def __post_init__(self):
"""初始化后处理"""
if not self.task_id:
self.task_id = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.metadata.query = self.query
def mark_processing(self):
"""标记为处理中"""
self.status = "processing"
def mark_completed(self):
"""标记为完成"""
self.status = "completed"
def mark_failed(self, error_message: str = ""):
"""标记为失败"""
self.status = "failed"
self.error_message = error_message
def is_completed(self) -> bool:
"""检查是否完成"""
return self.status == "completed" and bool(self.html_content)
def get_progress(self) -> float:
"""获取进度百分比"""
if self.status == "completed":
return 100.0
elif self.status == "processing":
# 简单的进度计算
progress = 0.0
if self.selected_template:
progress += 30.0
if self.html_content:
progress += 70.0
return progress
else:
return 0.0
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"task_id": self.task_id,
"query": self.query,
"status": self.status,
"progress": self.get_progress(),
"selected_template": self.selected_template,
"has_html_content": bool(self.html_content),
"html_content_length": len(self.html_content) if self.html_content else 0,
"metadata": self.metadata.to_dict()
}
def save_to_file(self, file_path: str):
"""保存状态到文件"""
try:
state_data = self.to_dict()
# 不保存完整的HTML内容到状态文件(太大)
state_data.pop("html_content", None)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(state_data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存状态文件失败: {str(e)}")
@classmethod
def load_from_file(cls, file_path: str) -> Optional["ReportState"]:
"""从文件加载状态"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 创建ReportState对象
state = cls(
task_id=data.get("task_id", ""),
query=data.get("query", ""),
status=data.get("status", "pending"),
selected_template=data.get("selected_template", "")
)
# 设置元数据
metadata_data = data.get("metadata", {})
state.metadata.template_used = metadata_data.get("template_used", "")
state.metadata.generation_time = metadata_data.get("generation_time", 0.0)
return state
except Exception as e:
print(f"加载状态文件失败: {str(e)}")
return None
+11
View File
@@ -0,0 +1,11 @@
"""
Report Engine工具模块
包含配置管理
"""
from .config import Config, load_config
__all__ = [
"Config",
"load_config"
]
+134
View File
@@ -0,0 +1,134 @@
"""
Report Engine配置管理模块
处理环境变量和配置参数
"""
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class Config:
"""Report Engine配置类"""
# API密钥
gemini_api_key: Optional[str] = None
# 模型配置
default_llm_provider: str = "gemini"
gemini_model: str = "gemini-2.5-pro"
# 报告配置
max_content_length: int = 50000
output_dir: str = "final_reports"
template_dir: str = "ReportEngine/report_template"
# 日志配置
log_file: str = "logs/report.log"
# HTML导出配置
enable_pdf_export: bool = True
chart_style: str = "modern" # modern, classic, minimal
def validate(self) -> bool:
"""验证配置"""
if not self.gemini_api_key:
print("错误: Gemini API Key未设置")
return False
return True
@classmethod
def from_file(cls, config_file: str) -> "Config":
"""从配置文件创建配置"""
if config_file.endswith('.py'):
# Python配置文件
import importlib.util
# 动态导入配置文件
spec = importlib.util.spec_from_file_location("config", config_file)
config_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config_module)
return cls(
gemini_api_key=getattr(config_module, "GEMINI_API_KEY", None),
default_llm_provider=getattr(config_module, "DEFAULT_LLM_PROVIDER", "gemini"),
gemini_model=getattr(config_module, "GEMINI_MODEL", "gemini-2.5-pro"),
max_content_length=getattr(config_module, "MAX_CONTENT_LENGTH", 50000),
output_dir=getattr(config_module, "REPORT_OUTPUT_DIR", "final_reports"),
template_dir=getattr(config_module, "TEMPLATE_DIR", "ReportEngine/report_template"),
log_file=getattr(config_module, "REPORT_LOG_FILE", "logs/report.log"),
enable_pdf_export=getattr(config_module, "ENABLE_PDF_EXPORT", True),
chart_style=getattr(config_module, "CHART_STYLE", "modern")
)
else:
# .env格式配置文件
config_dict = {}
if os.path.exists(config_file):
with open(config_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
config_dict[key.strip()] = value.strip()
return cls(
gemini_api_key=config_dict.get("GEMINI_API_KEY"),
default_llm_provider=config_dict.get("DEFAULT_LLM_PROVIDER", "gemini"),
gemini_model=config_dict.get("GEMINI_MODEL", "gemini-2.5-pro"),
max_content_length=int(config_dict.get("MAX_CONTENT_LENGTH", "50000")),
output_dir=config_dict.get("REPORT_OUTPUT_DIR", "final_reports"),
template_dir=config_dict.get("TEMPLATE_DIR", "ReportEngine/report_template"),
log_file=config_dict.get("REPORT_LOG_FILE", "logs/report.log"),
enable_pdf_export=config_dict.get("ENABLE_PDF_EXPORT", "true").lower() == "true",
chart_style=config_dict.get("CHART_STYLE", "modern")
)
def load_config(config_file: Optional[str] = None) -> Config:
"""
加载配置
Args:
config_file: 配置文件路径,如果不指定则使用默认路径
Returns:
配置对象
"""
# 确定配置文件路径
if config_file:
if not os.path.exists(config_file):
raise FileNotFoundError(f"配置文件不存在: {config_file}")
file_to_load = config_file
else:
# 尝试加载常见的配置文件
for config_path in ["config.py", "config.env", ".env"]:
if os.path.exists(config_path):
file_to_load = config_path
break
else:
raise FileNotFoundError("未找到配置文件,请创建 config.py 文件")
# 创建配置对象
config = Config.from_file(file_to_load)
# 验证配置
if not config.validate():
raise ValueError("Report Engine配置验证失败,请检查配置文件中的API密钥")
return config
def print_config(config: Config):
"""打印配置信息(隐藏敏感信息)"""
print("\n=== Report Engine配置 ===")
print(f"LLM提供商: {config.default_llm_provider}")
print(f"Gemini模型: {config.gemini_model}")
print(f"最大内容长度: {config.max_content_length}")
print(f"输出目录: {config.output_dir}")
print(f"模板目录: {config.template_dir}")
print(f"日志文件: {config.log_file}")
print(f"PDF导出: {config.enable_pdf_export}")
print(f"图表样式: {config.chart_style}")
print(f"Gemini API Key: {'已设置' if config.gemini_api_key else '未设置'}")
print("========================\n")