460 lines
17 KiB
Python
460 lines
17 KiB
Python
"""
|
||
报告生成器基类
|
||
提供数据源接口、AI处理接口等扩展能力
|
||
"""
|
||
from abc import ABC, abstractmethod
|
||
from typing import List, Dict, Any, Optional
|
||
from datetime import datetime, timedelta
|
||
import os
|
||
import sys
|
||
from loguru import logger
|
||
|
||
# 添加父目录到路径
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
parent_dir = os.path.dirname(os.path.dirname(current_dir))
|
||
if parent_dir not in sys.path:
|
||
sys.path.insert(0, parent_dir)
|
||
|
||
from utils.mysql_agent import MySQLAgent
|
||
from config import Config
|
||
|
||
|
||
class DataSource(ABC):
|
||
"""数据源接口基类,用于后续扩展其他数据源"""
|
||
|
||
@abstractmethod
|
||
def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取指定时间范围内的数据
|
||
|
||
Args:
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
|
||
Returns:
|
||
数据列表,每条数据应包含:标题、链接、摘要、发布时间等字段
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_source_name(self) -> str:
|
||
"""获取数据源名称"""
|
||
pass
|
||
|
||
|
||
class RSSDataSource(DataSource):
|
||
"""RSS数据源实现"""
|
||
|
||
def __init__(self, db_agent: MySQLAgent, table_name: str = "collector_rss_subscriptions"):
|
||
self.db_agent = db_agent
|
||
self.table_name = table_name
|
||
self.logger = logger.bind(module="RSSDataSource")
|
||
|
||
def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
|
||
"""从数据库获取RSS数据"""
|
||
try:
|
||
sql = f"""
|
||
SELECT
|
||
`文章标题` as title,
|
||
`文章链接` as link,
|
||
`文章摘要` as summary,
|
||
`发布时间` as publish_time,
|
||
`来源URL` as source_url,
|
||
`创建时间` as create_time
|
||
FROM `{self.table_name}`
|
||
WHERE `发布时间` >= %s AND `发布时间` < %s
|
||
ORDER BY `发布时间` DESC
|
||
"""
|
||
|
||
params = (
|
||
start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
end_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
)
|
||
|
||
df = self.db_agent.query_to_df(sql, params=params, is_print=False)
|
||
|
||
if df.empty:
|
||
self.logger.info(f"时间范围 {start_time} 到 {end_time} 内没有RSS数据")
|
||
return []
|
||
|
||
# 转换为字典列表
|
||
data_list = df.to_dict('records')
|
||
self.logger.info(f"获取到 {len(data_list)} 条RSS数据")
|
||
return data_list
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"获取RSS数据失败: {str(e)}", exc_info=True)
|
||
return []
|
||
|
||
def get_source_name(self) -> str:
|
||
return "RSS订阅"
|
||
|
||
|
||
class AIAnalysisDataSource(DataSource):
|
||
"""AI分析结果数据源实现 - 从ai_processor_rss_analysis表获取已筛选的相关内容"""
|
||
|
||
def __init__(self, db_agent: MySQLAgent, table_name: str = "ai_processor_rss_analysis"):
|
||
self.db_agent = db_agent
|
||
self.table_name = table_name
|
||
self.logger = logger.bind(module="AIAnalysisDataSource")
|
||
|
||
def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
|
||
"""从AI分析结果表获取相关数据(是否相关=1)"""
|
||
try:
|
||
sql = f"""
|
||
SELECT
|
||
`文章标题` as title,
|
||
`文章链接` as link,
|
||
`文章摘要` as summary,
|
||
`发布时间` as publish_time,
|
||
`来源URL` as source_url,
|
||
`分类` as category,
|
||
`标签` as tags,
|
||
`相关度评分` as relevance_score,
|
||
`分析说明` as analysis_note,
|
||
`处理时间` as process_time
|
||
FROM `{self.table_name}`
|
||
WHERE `发布时间` >= %s AND `发布时间` < %s
|
||
AND `是否相关` = 1
|
||
ORDER BY `发布时间` DESC, `相关度评分` DESC
|
||
"""
|
||
|
||
params = (
|
||
start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
end_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
)
|
||
|
||
df = self.db_agent.query_to_df(sql, params=params, is_print=False)
|
||
|
||
if df.empty:
|
||
self.logger.info(f"时间范围 {start_time} 到 {end_time} 内没有相关数据(是否相关=1)")
|
||
return []
|
||
|
||
# 转换为字典列表
|
||
data_list = df.to_dict('records')
|
||
self.logger.info(f"获取到 {len(data_list)} 条相关数据(是否相关=1)")
|
||
return data_list
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"获取AI分析数据失败: {str(e)}", exc_info=True)
|
||
return []
|
||
|
||
def get_source_name(self) -> str:
|
||
return "AI分析结果"
|
||
|
||
|
||
class AIProcessor:
|
||
"""AI处理器,用于筛选和分析内容"""
|
||
|
||
def __init__(self, api_key: str = None, model: str = None):
|
||
from openai import OpenAI
|
||
|
||
self.base_url = 'https://qianfan.baidubce.com/v2'
|
||
self.api_key = api_key or Config.BAIDU_AI_CONFIG.get('api_key')
|
||
self.model = model or Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k')
|
||
self.client = OpenAI(
|
||
base_url=self.base_url,
|
||
api_key=self.api_key
|
||
)
|
||
self.logger = logger.bind(module="AIProcessor")
|
||
|
||
def filter_automotive_content(self, articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""
|
||
筛选与汽车后市场相关的内容
|
||
|
||
Args:
|
||
articles: 文章列表
|
||
|
||
Returns:
|
||
筛选后的文章列表(包含AI标记信息)
|
||
"""
|
||
if not articles:
|
||
return []
|
||
|
||
self.logger.info(f"开始AI筛选 {len(articles)} 篇文章")
|
||
|
||
# 批量处理,避免API限流
|
||
batch_size = 10
|
||
filtered_articles = []
|
||
|
||
for i in range(0, len(articles), batch_size):
|
||
batch = articles[i:i + batch_size]
|
||
try:
|
||
# 构建批量分析的prompt
|
||
articles_text = ""
|
||
for idx, article in enumerate(batch):
|
||
articles_text += f"\n[{idx + i}] 标题: {article.get('title', '')}\n"
|
||
articles_text += f"摘要: {article.get('summary', '')}\n"
|
||
|
||
prompt = f"""请分析以下新闻文章,判断哪些与汽车后市场相关。
|
||
|
||
汽车后市场的定义:汽车销售以后,围绕汽车使用过程中的各种服务,包括:
|
||
- 汽车维修保养
|
||
- 汽车配件
|
||
- 汽车改装
|
||
- 汽车美容
|
||
- 汽车用品
|
||
- 汽车金融
|
||
- 汽车保险
|
||
- 二手车交易
|
||
- 汽车租赁
|
||
- 汽车检测
|
||
- 汽车报废回收
|
||
- 汽车相关法律法规和政策
|
||
|
||
文章列表:
|
||
{articles_text}
|
||
|
||
请按以下JSON格式返回结果:
|
||
{{
|
||
"related_articles": [
|
||
{{
|
||
"index": 文章的序号(从0开始),
|
||
"is_related": true/false,
|
||
"reason": "判断理由",
|
||
"category": "所属类别(如:维修保养、配件、政策等)"
|
||
}}
|
||
]
|
||
}}
|
||
|
||
只返回JSON,不要其他文字说明。"""
|
||
|
||
response = self.client.chat.completions.create(
|
||
model=self.model,
|
||
messages=[{
|
||
"role": "user",
|
||
"content": prompt
|
||
}]
|
||
)
|
||
|
||
result_text = response.choices[0].message.content.strip()
|
||
|
||
# 尝试解析JSON(去除可能的markdown代码块标记)
|
||
import json
|
||
import re
|
||
|
||
# 提取JSON部分(尝试多种方式)
|
||
result_json = None
|
||
# 方式1:查找markdown代码块中的JSON
|
||
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', result_text, re.DOTALL)
|
||
if json_match:
|
||
try:
|
||
result_json = json.loads(json_match.group(1))
|
||
except:
|
||
pass
|
||
|
||
# 方式2:直接查找JSON对象
|
||
if result_json is None:
|
||
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
|
||
if json_match:
|
||
try:
|
||
result_json = json.loads(json_match.group())
|
||
except:
|
||
pass
|
||
|
||
# 方式3:尝试直接解析
|
||
if result_json is None:
|
||
try:
|
||
result_json = json.loads(result_text)
|
||
except:
|
||
self.logger.warning(f"无法解析AI返回的JSON: {result_text[:200]}")
|
||
result_json = {'related_articles': []}
|
||
|
||
# 处理结果
|
||
for item in result_json.get('related_articles', []):
|
||
idx = item.get('index', -1)
|
||
if 0 <= idx < len(batch):
|
||
article = batch[idx]
|
||
if item.get('is_related', False):
|
||
article['ai_marked'] = True
|
||
article['ai_category'] = item.get('category', '其他')
|
||
article['ai_reason'] = item.get('reason', '')
|
||
filtered_articles.append(article)
|
||
|
||
# 避免API限流
|
||
import time
|
||
if i + batch_size < len(articles):
|
||
time.sleep(1.5)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"AI筛选批处理失败: {str(e)}", exc_info=True)
|
||
# 如果AI处理失败,保留所有文章但标记为未筛选
|
||
for article in batch:
|
||
article['ai_marked'] = False
|
||
article['ai_error'] = str(e)
|
||
|
||
self.logger.info(f"AI筛选完成,找到 {len(filtered_articles)} 篇相关文章")
|
||
return filtered_articles
|
||
|
||
def generate_news_summary(self, articles: List[Dict[str, Any]]) -> str:
|
||
"""
|
||
生成新闻摘要
|
||
|
||
Args:
|
||
articles: 筛选后的文章列表
|
||
|
||
Returns:
|
||
Markdown格式的新闻摘要
|
||
"""
|
||
if not articles:
|
||
return "## 相关新闻\n\n暂无相关新闻。\n"
|
||
|
||
articles_text = ""
|
||
for idx, article in enumerate(articles, 1):
|
||
category = article.get('ai_category', '其他')
|
||
reason = article.get('ai_reason', '')
|
||
articles_text += f"\n### {idx}. {article.get('title', '无标题')}\n"
|
||
articles_text += f"- **类别**: {category}\n"
|
||
articles_text += f"- **摘要**: {article.get('summary', '无摘要')}\n"
|
||
articles_text += f"- **链接**: [{article.get('link', '')}]({article.get('link', '')})\n"
|
||
articles_text += f"- **发布时间**: {article.get('publish_time', '')}\n"
|
||
if reason:
|
||
articles_text += f"- **相关性说明**: {reason}\n"
|
||
articles_text += "\n"
|
||
|
||
return f"## 汽车后市场相关新闻\n\n共找到 {len(articles)} 篇相关新闻:\n\n{articles_text}"
|
||
|
||
|
||
class BaseReporter:
|
||
"""报告生成器基类"""
|
||
|
||
def __init__(self, data_sources: List[DataSource] = None):
|
||
self.data_sources = data_sources or []
|
||
self.ai_processor = AIProcessor()
|
||
self.logger = logger.bind(module="BaseReporter")
|
||
|
||
def add_data_source(self, data_source: DataSource):
|
||
"""添加数据源"""
|
||
self.data_sources.append(data_source)
|
||
self.logger.info(f"添加数据源: {data_source.get_source_name()}")
|
||
|
||
def collect_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
|
||
"""从所有数据源收集数据"""
|
||
all_data = []
|
||
for source in self.data_sources:
|
||
try:
|
||
data = source.fetch_data(start_time, end_time)
|
||
# 标记数据来源
|
||
for item in data:
|
||
item['data_source'] = source.get_source_name()
|
||
all_data.extend(data)
|
||
except Exception as e:
|
||
self.logger.error(f"从 {source.get_source_name()} 收集数据失败: {str(e)}")
|
||
|
||
# 按发布时间排序
|
||
all_data.sort(key=lambda x: x.get('publish_time', ''), reverse=True)
|
||
return all_data
|
||
|
||
def generate_report_content(self, articles: List[Dict[str, Any]], report_type: str = "日报") -> str:
|
||
"""
|
||
生成报告内容(Markdown格式)
|
||
|
||
Args:
|
||
articles: 文章列表(已从AI分析结果表筛选,是否相关=1)
|
||
report_type: 报告类型("日报"或"周报"),用于无数据时的提示
|
||
"""
|
||
# 数据已经是从AI分析结果表筛选过的(是否相关=1),直接使用
|
||
related_articles = articles
|
||
|
||
# 生成统计信息
|
||
related_count = len(related_articles)
|
||
|
||
# 如果没有相关数据,返回提示信息
|
||
if related_count == 0:
|
||
if report_type == "日报":
|
||
message = "昨日无汽车后市场相关的新闻"
|
||
else:
|
||
message = "上周无汽车后市场相关的新闻"
|
||
|
||
return f"""
|
||
## 数据统计
|
||
|
||
- **相关文章数**: 0
|
||
|
||
## 相关新闻
|
||
|
||
{message}
|
||
"""
|
||
|
||
# 生成新闻摘要
|
||
news_summary = self._generate_news_summary_from_analysis(related_articles)
|
||
|
||
stats = f"""
|
||
## 数据统计
|
||
|
||
- **相关文章数**: {related_count}
|
||
|
||
"""
|
||
|
||
return stats + news_summary
|
||
|
||
def _generate_news_summary_from_analysis(self, articles: List[Dict[str, Any]]) -> str:
|
||
"""
|
||
从AI分析结果生成新闻摘要(使用数据库中已有的分类和分析说明)
|
||
|
||
Args:
|
||
articles: 文章列表(包含category、tags、analysis_note等字段)
|
||
|
||
Returns:
|
||
Markdown格式的新闻摘要
|
||
"""
|
||
if not articles:
|
||
return "## 相关新闻\n\n暂无相关新闻。\n"
|
||
|
||
articles_text = ""
|
||
for idx, article in enumerate(articles, 1):
|
||
category = article.get('category', '其他')
|
||
tags = article.get('tags', '')
|
||
analysis_note = article.get('analysis_note', '')
|
||
relevance_score = article.get('relevance_score', '')
|
||
|
||
articles_text += f"\n### {idx}. {article.get('title', '无标题')}\n"
|
||
articles_text += f"- **分类**: {category}\n"
|
||
if tags:
|
||
articles_text += f"- **标签**: {tags}\n"
|
||
articles_text += f"- **摘要**: {article.get('summary', '无摘要')}\n"
|
||
articles_text += f"- **链接**: [{article.get('link', '')}]({article.get('link', '')})\n"
|
||
articles_text += f"- **发布时间**: {article.get('publish_time', '')}\n"
|
||
if relevance_score:
|
||
articles_text += f"- **相关度评分**: {relevance_score}\n"
|
||
if analysis_note:
|
||
articles_text += f"- **分析说明**: {analysis_note}\n"
|
||
articles_text += "\n"
|
||
|
||
return f"## 汽车后市场相关新闻\n\n共找到 {len(articles)} 篇相关新闻:\n\n{articles_text}"
|
||
|
||
def generate_html_report(self, markdown_content: str, template_path: str = None) -> str:
|
||
"""生成HTML报告"""
|
||
# 使用相对导入避免循环依赖
|
||
from .html_template import HTMLTemplateManager
|
||
|
||
template_manager = HTMLTemplateManager()
|
||
|
||
if template_path and os.path.exists(template_path):
|
||
# 使用外部模板
|
||
html_content = template_manager.render_external_template(template_path, markdown_content)
|
||
else:
|
||
# 使用内置模板
|
||
html_content = template_manager.render_builtin_template(markdown_content)
|
||
|
||
return html_content
|
||
|
||
def save_report(self, html_content: str, output_path: str):
|
||
"""保存HTML报告到文件"""
|
||
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
|
||
self.logger.info(f"HTML报告已保存到: {output_path}")
|
||
|
||
def save_markdown_report(self, markdown_content: str, output_path: str):
|
||
"""保存Markdown报告到文件"""
|
||
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(markdown_content)
|
||
|
||
self.logger.info(f"Markdown报告已保存到: {output_path}")
|
||
|