diff --git a/applications/reporter/README.md b/applications/reporter/README.md new file mode 100644 index 0000000..962bcdf --- /dev/null +++ b/applications/reporter/README.md @@ -0,0 +1,254 @@ +# 报告生成器使用说明 + +## 功能概述 + +本模块提供了日报和周报生成功能,主要特点: + +1. **AI智能筛选**:从AI分析结果表获取已筛选的相关内容(是否相关=1) +2. **多格式输出**:同时生成HTML和Markdown格式的报告 +3. **钉钉推送**:支持自动推送到钉钉群 +4. **可扩展数据源**:支持添加多个数据源(RSS、投诉、API等) +5. **灵活模板系统**:支持内置HTML模板和外部HTML模板 + +## 快速开始 + +### 生成日报(24小时内数据) + +```python +from applications.reporter.daily import DailyReporter + +reporter = DailyReporter() +result = reporter.generate() +print(f"日报已生成:") +print(f" HTML: {result.get('html_path')}") +print(f" Markdown: {result.get('markdown_path')}") +``` + +### 生成周报(7天内数据) + +```python +from applications.reporter.weekly import WeeklyReporter + +reporter = WeeklyReporter() +result = reporter.generate() +print(f"周报已生成:") +print(f" HTML: {result.get('html_path')}") +print(f" Markdown: {result.get('markdown_path')}") +``` + +## 钉钉推送配置 + +### 1. 获取钉钉Webhook地址 + +1. 在钉钉群中,点击"群设置" -> "智能群助手" -> "添加机器人" +2. 选择"自定义"机器人 +3. 设置机器人名称和头像 +4. 复制Webhook地址(格式:`https://oapi.dingtalk.com/robot/send?access_token=xxx`) + +### 2. 配置Webhook地址 + +**方式1:通过环境变量(推荐)** + +```bash +export DINGTALK_WEBHOOK="https://oapi.dingtalk.com/robot/send?access_token=xxx" +``` + +**方式2:在config.py中配置** + +```python +DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=xxx" +``` + +**方式3:在代码中指定** + +```python +from applications.reporter.daily import DailyReporter + +reporter = DailyReporter(dingtalk_webhook="https://oapi.dingtalk.com/robot/send?access_token=xxx") +result = reporter.generate() +``` + +### 3. 控制推送行为 + +```python +# 生成报告但不推送到钉钉 +reporter = DailyReporter() +result = reporter.generate(send_dingtalk=False) + +# 不保存Markdown文件 +result = reporter.generate(save_markdown=False) + +# 同时控制 +result = reporter.generate(save_markdown=True, send_dingtalk=True) +``` + +### 4. 钉钉消息格式 + +- 自动使用Markdown格式发送 +- 如果内容过长(超过5000字符),会自动截断并显示摘要 +- 包含报告文件路径提示 + +## 添加自定义数据源 + +### 1. 创建数据源类 + +数据源类需要继承 `DataSource` 基类并实现以下方法: + +```python +from applications.reporter.base_reporter import DataSource +from typing import List, Dict, Any +from datetime import datetime + +class MyCustomDataSource(DataSource): + def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """获取指定时间范围内的数据""" + # 返回格式: + return [ + { + 'title': '标题', + 'link': '链接', + 'summary': '摘要', + 'publish_time': '发布时间', + 'source_url': '来源URL' + } + ] + + def get_source_name(self) -> str: + return "数据源名称" +``` + +### 2. 添加到报告生成器 + +```python +from applications.reporter.daily import DailyReporter +from my_module import MyCustomDataSource + +reporter = DailyReporter() +custom_source = MyCustomDataSource(...) +reporter.add_data_source(custom_source) + +# 生成报告(会自动包含新数据源的数据) +report_path = reporter.generate() +``` + +## 使用外部HTML模板 + +### 1. 创建HTML模板文件 + +创建外部HTML模板文件(如 `custom_template.html`): + +```html + + + + + 自定义报告模板 + + + + + --> + {{content}} + + +``` + +### 2. 使用外部模板生成报告 + +```python +from applications.reporter.daily import DailyReporter + +reporter = DailyReporter() +report_path = reporter.generate(template_path="path/to/custom_template.html") +``` + +## 配置说明 + +### AI配置 + +在 `config.py` 中配置百度AI API: + +```python +BAIDU_AI_CONFIG = { + 'api_key': 'your_api_key', + 'model': 'ernie-x1-turbo-32k', +} +``` + +### 数据库配置 + +确保 `config.py` 中的数据库配置正确: + +```python +MYSQL_CONFIG = { + 'host': 'your_host', + 'port': 3306, + 'user': 'your_user', + 'password': 'your_password', + 'database': 'intelligence_system', +} +``` + +## 输出目录 + +- 日报:`output/reports/daily/` +- 周报:`output/reports/weekly/` + +报告文件名格式: +- HTML:`daily_report_YYYYMMDD_HHMMSS.html` / `weekly_report_YYYYMMDD_HHMMSS.html` +- Markdown:`daily_report_YYYYMMDD_HHMMSS.md` / `weekly_report_YYYYMMDD_HHMMSS.md` + +## 报告内容 + +生成的报告包含: + +1. **报告时间信息**:生成时间和时间范围 +2. **数据统计**:相关文章数 +3. **相关新闻列表**(从AI分析结果表筛选,是否相关=1): + - 标题 + - 分类 + - 标签 + - 摘要 + - 链接 + - 发布时间 + - 相关度评分 + - 分析说明 + +如果没有相关数据,会显示: +- 日报:`昨日无汽车后市场相关的新闻` +- 周报:`上周无汽车后市场相关的新闻` + +## AI筛选说明 + +AI会根据以下定义筛选汽车后市场相关内容: + +- 汽车维修保养 +- 汽车配件 +- 汽车改装 +- 汽车美容 +- 汽车用品 +- 汽车金融 +- 汽车保险 +- 二手车交易 +- 汽车租赁 +- 汽车检测 +- 汽车报废回收 +- 汽车相关法律法规和政策 + +## 扩展示例 + +参考 `data_source_example.py` 查看如何: +- 添加数据库数据源 +- 添加外部API数据源 +- 实现自定义数据源 + +## 注意事项 + +1. 确保数据库连接正常 +2. 确保AI API配置正确且有足够配额 +3. 外部模板文件需要包含内容占位符 +4. 数据源返回的数据格式需要符合规范 + diff --git a/applications/reporter/base_reporter.py b/applications/reporter/base_reporter.py new file mode 100644 index 0000000..34f7f97 --- /dev/null +++ b/applications/reporter/base_reporter.py @@ -0,0 +1,459 @@ +""" +报告生成器基类 +提供数据源接口、AI处理接口等扩展能力 +""" +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +import os +import sys +from loguru import logger + +# 添加父目录到路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from utils.mysql_agent import MySQLAgent +from config import Config + + +class DataSource(ABC): + """数据源接口基类,用于后续扩展其他数据源""" + + @abstractmethod + def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """ + 获取指定时间范围内的数据 + + Args: + start_time: 开始时间 + end_time: 结束时间 + + Returns: + 数据列表,每条数据应包含:标题、链接、摘要、发布时间等字段 + """ + pass + + @abstractmethod + def get_source_name(self) -> str: + """获取数据源名称""" + pass + + +class RSSDataSource(DataSource): + """RSS数据源实现""" + + def __init__(self, db_agent: MySQLAgent, table_name: str = "collector_rss_subscriptions"): + self.db_agent = db_agent + self.table_name = table_name + self.logger = logger.bind(module="RSSDataSource") + + def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """从数据库获取RSS数据""" + try: + sql = f""" + SELECT + `文章标题` as title, + `文章链接` as link, + `文章摘要` as summary, + `发布时间` as publish_time, + `来源URL` as source_url, + `创建时间` as create_time + FROM `{self.table_name}` + WHERE `发布时间` >= %s AND `发布时间` < %s + ORDER BY `发布时间` DESC + """ + + params = ( + start_time.strftime('%Y-%m-%d %H:%M:%S'), + end_time.strftime('%Y-%m-%d %H:%M:%S') + ) + + df = self.db_agent.query_to_df(sql, params=params, is_print=False) + + if df.empty: + self.logger.info(f"时间范围 {start_time} 到 {end_time} 内没有RSS数据") + return [] + + # 转换为字典列表 + data_list = df.to_dict('records') + self.logger.info(f"获取到 {len(data_list)} 条RSS数据") + return data_list + + except Exception as e: + self.logger.error(f"获取RSS数据失败: {str(e)}", exc_info=True) + return [] + + def get_source_name(self) -> str: + return "RSS订阅" + + +class AIAnalysisDataSource(DataSource): + """AI分析结果数据源实现 - 从ai_processor_rss_analysis表获取已筛选的相关内容""" + + def __init__(self, db_agent: MySQLAgent, table_name: str = "ai_processor_rss_analysis"): + self.db_agent = db_agent + self.table_name = table_name + self.logger = logger.bind(module="AIAnalysisDataSource") + + def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """从AI分析结果表获取相关数据(是否相关=1)""" + try: + sql = f""" + SELECT + `文章标题` as title, + `文章链接` as link, + `文章摘要` as summary, + `发布时间` as publish_time, + `来源URL` as source_url, + `分类` as category, + `标签` as tags, + `相关度评分` as relevance_score, + `分析说明` as analysis_note, + `处理时间` as process_time + FROM `{self.table_name}` + WHERE `发布时间` >= %s AND `发布时间` < %s + AND `是否相关` = 1 + ORDER BY `发布时间` DESC, `相关度评分` DESC + """ + + params = ( + start_time.strftime('%Y-%m-%d %H:%M:%S'), + end_time.strftime('%Y-%m-%d %H:%M:%S') + ) + + df = self.db_agent.query_to_df(sql, params=params, is_print=False) + + if df.empty: + self.logger.info(f"时间范围 {start_time} 到 {end_time} 内没有相关数据(是否相关=1)") + return [] + + # 转换为字典列表 + data_list = df.to_dict('records') + self.logger.info(f"获取到 {len(data_list)} 条相关数据(是否相关=1)") + return data_list + + except Exception as e: + self.logger.error(f"获取AI分析数据失败: {str(e)}", exc_info=True) + return [] + + def get_source_name(self) -> str: + return "AI分析结果" + + +class AIProcessor: + """AI处理器,用于筛选和分析内容""" + + def __init__(self, api_key: str = None, model: str = None): + from openai import OpenAI + + self.base_url = 'https://qianfan.baidubce.com/v2' + self.api_key = api_key or Config.BAIDU_AI_CONFIG.get('api_key') + self.model = model or Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k') + self.client = OpenAI( + base_url=self.base_url, + api_key=self.api_key + ) + self.logger = logger.bind(module="AIProcessor") + + def filter_automotive_content(self, articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 筛选与汽车后市场相关的内容 + + Args: + articles: 文章列表 + + Returns: + 筛选后的文章列表(包含AI标记信息) + """ + if not articles: + return [] + + self.logger.info(f"开始AI筛选 {len(articles)} 篇文章") + + # 批量处理,避免API限流 + batch_size = 10 + filtered_articles = [] + + for i in range(0, len(articles), batch_size): + batch = articles[i:i + batch_size] + try: + # 构建批量分析的prompt + articles_text = "" + for idx, article in enumerate(batch): + articles_text += f"\n[{idx + i}] 标题: {article.get('title', '')}\n" + articles_text += f"摘要: {article.get('summary', '')}\n" + + prompt = f"""请分析以下新闻文章,判断哪些与汽车后市场相关。 + +汽车后市场的定义:汽车销售以后,围绕汽车使用过程中的各种服务,包括: +- 汽车维修保养 +- 汽车配件 +- 汽车改装 +- 汽车美容 +- 汽车用品 +- 汽车金融 +- 汽车保险 +- 二手车交易 +- 汽车租赁 +- 汽车检测 +- 汽车报废回收 +- 汽车相关法律法规和政策 + +文章列表: +{articles_text} + +请按以下JSON格式返回结果: +{{ + "related_articles": [ + {{ + "index": 文章的序号(从0开始), + "is_related": true/false, + "reason": "判断理由", + "category": "所属类别(如:维修保养、配件、政策等)" + }} + ] +}} + +只返回JSON,不要其他文字说明。""" + + response = self.client.chat.completions.create( + model=self.model, + messages=[{ + "role": "user", + "content": prompt + }] + ) + + result_text = response.choices[0].message.content.strip() + + # 尝试解析JSON(去除可能的markdown代码块标记) + import json + import re + + # 提取JSON部分(尝试多种方式) + result_json = None + # 方式1:查找markdown代码块中的JSON + json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', result_text, re.DOTALL) + if json_match: + try: + result_json = json.loads(json_match.group(1)) + except: + pass + + # 方式2:直接查找JSON对象 + if result_json is None: + json_match = re.search(r'\{.*\}', result_text, re.DOTALL) + if json_match: + try: + result_json = json.loads(json_match.group()) + except: + pass + + # 方式3:尝试直接解析 + if result_json is None: + try: + result_json = json.loads(result_text) + except: + self.logger.warning(f"无法解析AI返回的JSON: {result_text[:200]}") + result_json = {'related_articles': []} + + # 处理结果 + for item in result_json.get('related_articles', []): + idx = item.get('index', -1) + if 0 <= idx < len(batch): + article = batch[idx] + if item.get('is_related', False): + article['ai_marked'] = True + article['ai_category'] = item.get('category', '其他') + article['ai_reason'] = item.get('reason', '') + filtered_articles.append(article) + + # 避免API限流 + import time + if i + batch_size < len(articles): + time.sleep(1.5) + + except Exception as e: + self.logger.error(f"AI筛选批处理失败: {str(e)}", exc_info=True) + # 如果AI处理失败,保留所有文章但标记为未筛选 + for article in batch: + article['ai_marked'] = False + article['ai_error'] = str(e) + + self.logger.info(f"AI筛选完成,找到 {len(filtered_articles)} 篇相关文章") + return filtered_articles + + def generate_news_summary(self, articles: List[Dict[str, Any]]) -> str: + """ + 生成新闻摘要 + + Args: + articles: 筛选后的文章列表 + + Returns: + Markdown格式的新闻摘要 + """ + if not articles: + return "## 相关新闻\n\n暂无相关新闻。\n" + + articles_text = "" + for idx, article in enumerate(articles, 1): + category = article.get('ai_category', '其他') + reason = article.get('ai_reason', '') + articles_text += f"\n### {idx}. {article.get('title', '无标题')}\n" + articles_text += f"- **类别**: {category}\n" + articles_text += f"- **摘要**: {article.get('summary', '无摘要')}\n" + articles_text += f"- **链接**: [{article.get('link', '')}]({article.get('link', '')})\n" + articles_text += f"- **发布时间**: {article.get('publish_time', '')}\n" + if reason: + articles_text += f"- **相关性说明**: {reason}\n" + articles_text += "\n" + + return f"## 汽车后市场相关新闻\n\n共找到 {len(articles)} 篇相关新闻:\n\n{articles_text}" + + +class BaseReporter: + """报告生成器基类""" + + def __init__(self, data_sources: List[DataSource] = None): + self.data_sources = data_sources or [] + self.ai_processor = AIProcessor() + self.logger = logger.bind(module="BaseReporter") + + def add_data_source(self, data_source: DataSource): + """添加数据源""" + self.data_sources.append(data_source) + self.logger.info(f"添加数据源: {data_source.get_source_name()}") + + def collect_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """从所有数据源收集数据""" + all_data = [] + for source in self.data_sources: + try: + data = source.fetch_data(start_time, end_time) + # 标记数据来源 + for item in data: + item['data_source'] = source.get_source_name() + all_data.extend(data) + except Exception as e: + self.logger.error(f"从 {source.get_source_name()} 收集数据失败: {str(e)}") + + # 按发布时间排序 + all_data.sort(key=lambda x: x.get('publish_time', ''), reverse=True) + return all_data + + def generate_report_content(self, articles: List[Dict[str, Any]], report_type: str = "日报") -> str: + """ + 生成报告内容(Markdown格式) + + Args: + articles: 文章列表(已从AI分析结果表筛选,是否相关=1) + report_type: 报告类型("日报"或"周报"),用于无数据时的提示 + """ + # 数据已经是从AI分析结果表筛选过的(是否相关=1),直接使用 + related_articles = articles + + # 生成统计信息 + related_count = len(related_articles) + + # 如果没有相关数据,返回提示信息 + if related_count == 0: + if report_type == "日报": + message = "昨日无汽车后市场相关的新闻" + else: + message = "上周无汽车后市场相关的新闻" + + return f""" +## 数据统计 + +- **相关文章数**: 0 + +## 相关新闻 + +{message} +""" + + # 生成新闻摘要 + news_summary = self._generate_news_summary_from_analysis(related_articles) + + stats = f""" +## 数据统计 + +- **相关文章数**: {related_count} + +""" + + return stats + news_summary + + def _generate_news_summary_from_analysis(self, articles: List[Dict[str, Any]]) -> str: + """ + 从AI分析结果生成新闻摘要(使用数据库中已有的分类和分析说明) + + Args: + articles: 文章列表(包含category、tags、analysis_note等字段) + + Returns: + Markdown格式的新闻摘要 + """ + if not articles: + return "## 相关新闻\n\n暂无相关新闻。\n" + + articles_text = "" + for idx, article in enumerate(articles, 1): + category = article.get('category', '其他') + tags = article.get('tags', '') + analysis_note = article.get('analysis_note', '') + relevance_score = article.get('relevance_score', '') + + articles_text += f"\n### {idx}. {article.get('title', '无标题')}\n" + articles_text += f"- **分类**: {category}\n" + if tags: + articles_text += f"- **标签**: {tags}\n" + articles_text += f"- **摘要**: {article.get('summary', '无摘要')}\n" + articles_text += f"- **链接**: [{article.get('link', '')}]({article.get('link', '')})\n" + articles_text += f"- **发布时间**: {article.get('publish_time', '')}\n" + if relevance_score: + articles_text += f"- **相关度评分**: {relevance_score}\n" + if analysis_note: + articles_text += f"- **分析说明**: {analysis_note}\n" + articles_text += "\n" + + return f"## 汽车后市场相关新闻\n\n共找到 {len(articles)} 篇相关新闻:\n\n{articles_text}" + + def generate_html_report(self, markdown_content: str, template_path: str = None) -> str: + """生成HTML报告""" + # 使用相对导入避免循环依赖 + from .html_template import HTMLTemplateManager + + template_manager = HTMLTemplateManager() + + if template_path and os.path.exists(template_path): + # 使用外部模板 + html_content = template_manager.render_external_template(template_path, markdown_content) + else: + # 使用内置模板 + html_content = template_manager.render_builtin_template(markdown_content) + + return html_content + + def save_report(self, html_content: str, output_path: str): + """保存HTML报告到文件""" + os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(html_content) + + self.logger.info(f"HTML报告已保存到: {output_path}") + + def save_markdown_report(self, markdown_content: str, output_path: str): + """保存Markdown报告到文件""" + os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(markdown_content) + + self.logger.info(f"Markdown报告已保存到: {output_path}") + diff --git a/applications/reporter/daily.py b/applications/reporter/daily.py index e69de29..6dac17f 100644 --- a/applications/reporter/daily.py +++ b/applications/reporter/daily.py @@ -0,0 +1,139 @@ +""" +日报生成器 - 生成24小时内的汽车后市场情报报告 +""" +import os +import sys +from datetime import datetime, timedelta +from loguru import logger + +# 添加父目录到路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from applications.reporter.base_reporter import BaseReporter, AIAnalysisDataSource +from applications.reporter.dingtalk_webhook import DingTalkWebhook +from utils.mysql_agent import MySQLAgent +from config import Config + + +class DailyReporter(BaseReporter): + """日报生成器""" + + def __init__(self, dingtalk_webhook: str = None): + """ + 初始化日报生成器 + + Args: + dingtalk_webhook: 钉钉Webhook地址(可选) + """ + super().__init__() + # 初始化数据库连接 + db_agent = MySQLAgent(Config.MYSQL_CONFIG) + # 添加AI分析结果数据源(已筛选是否相关=1) + self.add_data_source(AIAnalysisDataSource(db_agent)) + self.logger = logger.bind(module="DailyReporter") + + # 初始化钉钉推送(如果提供了webhook) + self.dingtalk_webhook = dingtalk_webhook or getattr(Config, 'DINGTALK_WEBHOOK', None) + self.dingtalk_client = None + if self.dingtalk_webhook: + self.dingtalk_client = DingTalkWebhook(self.dingtalk_webhook) + self.logger.info("已启用钉钉推送功能") + + def generate(self, output_dir: str = "output/reports/daily", + template_path: str = None, + save_markdown: bool = True, + send_dingtalk: bool = True) -> dict: + """ + 生成日报 + + Args: + output_dir: 输出目录 + template_path: 可选的外部HTML模板路径 + save_markdown: 是否保存Markdown文件 + send_dingtalk: 是否发送到钉钉 + + Returns: + 包含生成文件路径的字典 + """ + self.logger.info("开始生成日报") + + # 计算时间范围:24小时内 + end_time = datetime.now() + start_time = end_time - timedelta(hours=24) + + self.logger.info(f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 至 {end_time.strftime('%Y-%m-%d %H:%M:%S')}") + + # 收集数据 + articles = self.collect_data(start_time, end_time) + + # 生成报告内容(generate_report_content会自动处理空数据情况) + markdown_content = f"""# 汽车后市场情报日报 + +## 报告时间 +**生成时间**: {end_time.strftime('%Y-%m-%d %H:%M:%S')} +**时间范围**: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 至 {end_time.strftime('%Y-%m-%d %H:%M:%S')} + +{self.generate_report_content(articles, report_type="日报")} +""" + + # 生成HTML报告 + html_content = self.generate_html_report(markdown_content, template_path=template_path) + + # 保存报告 + os.makedirs(output_dir, exist_ok=True) + timestamp = end_time.strftime('%Y%m%d_%H%M%S') + + result = {} + + # 保存HTML报告 + html_filename = f"daily_report_{timestamp}.html" + html_path = os.path.join(output_dir, html_filename) + self.save_report(html_content, html_path) + result['html_path'] = html_path + self.logger.info(f"HTML报告已保存: {html_path}") + + # 保存Markdown报告 + markdown_path = None + if save_markdown: + markdown_filename = f"daily_report_{timestamp}.md" + markdown_path = os.path.join(output_dir, markdown_filename) + self.save_markdown_report(markdown_content, markdown_path) + result['markdown_path'] = markdown_path + self.logger.info(f"Markdown报告已保存: {markdown_path}") + + # 发送到钉钉 + if send_dingtalk and self.dingtalk_client: + title = f"汽车后市场情报日报 - {end_time.strftime('%Y-%m-%d')}" + success = self.dingtalk_client.send_report(title, markdown_content, markdown_path) + result['dingtalk_sent'] = success + if success: + self.logger.info("报告已推送到钉钉群") + else: + self.logger.warning("报告推送到钉钉群失败") + + self.logger.info(f"日报生成完成") + return result + + +def main(): + """主函数""" + try: + reporter = DailyReporter() + result = reporter.generate() + print(f"日报已生成:") + print(f" HTML: {result.get('html_path')}") + if 'markdown_path' in result: + print(f" Markdown: {result.get('markdown_path')}") + if 'dingtalk_sent' in result: + print(f" 钉钉推送: {'成功' if result.get('dingtalk_sent') else '失败'}") + except Exception as e: + logger.error(f"生成日报失败: {str(e)}", exc_info=True) + raise + + +if __name__ == "__main__": + main() + diff --git a/applications/reporter/data_source_example.py b/applications/reporter/data_source_example.py new file mode 100644 index 0000000..1b093ce --- /dev/null +++ b/applications/reporter/data_source_example.py @@ -0,0 +1,135 @@ +""" +数据源扩展示例 +演示如何添加新的数据源 +""" +from applications.reporter.base_reporter import DataSource +from typing import List, Dict, Any +from datetime import datetime +from loguru import logger + + +class ComplaintDataSource(DataSource): + """投诉数据源示例(可根据实际情况实现)""" + + def __init__(self, db_agent, table_name: str = "complaint_data"): + """ + Args: + db_agent: MySQLAgent实例 + table_name: 数据表名 + """ + self.db_agent = db_agent + self.table_name = table_name + self.logger = logger.bind(module="ComplaintDataSource") + + def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """从投诉数据表获取数据""" + try: + sql = f""" + SELECT + `标题` as title, + `链接` as link, + `内容` as summary, + `发布时间` as publish_time, + `来源` as source_url + FROM `{self.table_name}` + WHERE `发布时间` >= %s AND `发布时间` < %s + ORDER BY `发布时间` DESC + """ + + params = ( + start_time.strftime('%Y-%m-%d %H:%M:%S'), + end_time.strftime('%Y-%m-%d %H:%M:%S') + ) + + df = self.db_agent.query_to_df(sql, params=params, is_print=False) + + if df.empty: + self.logger.info(f"时间范围 {start_time} 到 {end_time} 内没有投诉数据") + return [] + + data_list = df.to_dict('records') + self.logger.info(f"获取到 {len(data_list)} 条投诉数据") + return data_list + + except Exception as e: + self.logger.error(f"获取投诉数据失败: {str(e)}", exc_info=True) + return [] + + def get_source_name(self) -> str: + return "投诉数据" + + +class CustomAPIDataSource(DataSource): + """外部API数据源示例""" + + def __init__(self, api_url: str, api_key: str = None): + """ + Args: + api_url: API地址 + api_key: API密钥(如果需要) + """ + self.api_url = api_url + self.api_key = api_key + self.logger = logger.bind(module="CustomAPIDataSource") + + def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """从外部API获取数据""" + import requests + + try: + headers = {} + if self.api_key: + headers['Authorization'] = f'Bearer {self.api_key}' + + params = { + 'start_time': start_time.isoformat(), + 'end_time': end_time.isoformat() + } + + response = requests.get(self.api_url, headers=headers, params=params, timeout=30) + response.raise_for_status() + + data = response.json() + + # 将API返回的数据转换为标准格式 + articles = [] + for item in data.get('articles', []): + articles.append({ + 'title': item.get('title', ''), + 'link': item.get('url', ''), + 'summary': item.get('description', ''), + 'publish_time': item.get('published_at', ''), + 'source_url': self.api_url + }) + + self.logger.info(f"从API获取到 {len(articles)} 条数据") + return articles + + except Exception as e: + self.logger.error(f"从API获取数据失败: {str(e)}", exc_info=True) + return [] + + def get_source_name(self) -> str: + return "外部API" + + +# 使用示例: +""" +from applications.reporter.daily import DailyReporter +from applications.reporter.data_source_example import ComplaintDataSource +from utils.mysql_agent import MySQLAgent +from config import Config + +# 创建日报生成器 +reporter = DailyReporter() + +# 添加投诉数据源 +db_agent = MySQLAgent(Config.MYSQL_CONFIG) +complaint_source = ComplaintDataSource(db_agent, table_name="complaint_data") +reporter.add_data_source(complaint_source) + +# 生成报告 +report_path = reporter.generate() +print(f"报告已生成: {report_path}") +""" + diff --git a/applications/reporter/dingtalk_example.py b/applications/reporter/dingtalk_example.py new file mode 100644 index 0000000..57f3fd5 --- /dev/null +++ b/applications/reporter/dingtalk_example.py @@ -0,0 +1,149 @@ +""" +钉钉推送使用示例 +演示如何配置和使用钉钉推送功能 +""" +import os +import sys + +# 添加父目录到路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from applications.reporter.daily import DailyReporter +from applications.reporter.weekly import WeeklyReporter +from applications.reporter.dingtalk_webhook import DingTalkWebhook + + +def example_with_config(): + """示例1:通过config.py配置""" + print("=" * 50) + print("示例1:使用config.py中的配置") + print("=" * 50) + + # 需要在config.py中设置 DINGTALK_WEBHOOK + reporter = DailyReporter() + result = reporter.generate() + print(f"✅ 报告已生成并推送\n") + + +def example_with_env_var(): + """示例2:通过环境变量配置""" + print("=" * 50) + print("示例2:使用环境变量配置") + print("=" * 50) + + # 设置环境变量 + webhook_url = os.getenv('DINGTALK_WEBHOOK', '') + if webhook_url: + reporter = DailyReporter(dingtalk_webhook=webhook_url) + result = reporter.generate() + print(f"✅ 报告已生成并推送\n") + else: + print("⚠️ 未设置环境变量 DINGTALK_WEBHOOK\n") + + +def example_with_direct_url(): + """示例3:直接指定Webhook地址""" + print("=" * 50) + print("示例3:直接指定Webhook地址") + print("=" * 50) + + # 直接指定webhook地址(请替换为实际的webhook地址) + webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN" + + if webhook_url != "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN": + reporter = DailyReporter(dingtalk_webhook=webhook_url) + result = reporter.generate() + print(f"✅ 报告已生成并推送\n") + else: + print("⚠️ 请先设置实际的webhook地址\n") + + +def example_without_push(): + """示例4:生成报告但不推送""" + print("=" * 50) + print("示例4:生成报告但不推送到钉钉") + print("=" * 50) + + reporter = DailyReporter() + result = reporter.generate(send_dingtalk=False) + print(f"✅ 报告已生成(未推送)\n") + + +def example_weekly_report(): + """示例5:生成周报并推送""" + print("=" * 50) + print("示例5:生成周报并推送") + print("=" * 50) + + reporter = WeeklyReporter() + result = reporter.generate() + print(f"✅ 周报已生成并推送\n") + + +def example_test_webhook(): + """示例6:测试钉钉Webhook连接""" + print("=" * 50) + print("示例6:测试钉钉Webhook连接") + print("=" * 50) + + webhook_url = input("请输入钉钉Webhook地址(直接回车跳过): ").strip() + if not webhook_url: + print("⚠️ 未输入Webhook地址,跳过测试\n") + return + + client = DingTalkWebhook(webhook_url) + + # 发送测试消息 + success = client.send_text("这是一条测试消息", at_all=False) + + if success: + print("✅ 测试消息发送成功,Webhook配置正确\n") + else: + print("❌ 测试消息发送失败,请检查Webhook地址是否正确\n") + + +def main(): + """主函数""" + print("\n" + "=" * 50) + print("钉钉推送功能使用示例") + print("=" * 50 + "\n") + + print("请选择要运行的示例:") + print("1. 通过config.py配置(需要先在config.py中设置DINGTALK_WEBHOOK)") + print("2. 通过环境变量配置") + print("3. 直接指定Webhook地址") + print("4. 生成报告但不推送") + print("5. 生成周报并推送") + print("6. 测试钉钉Webhook连接") + print("0. 退出") + + choice = input("\n请输入选项(0-6): ").strip() + + if choice == "1": + example_with_config() + elif choice == "2": + example_with_env_var() + elif choice == "3": + example_with_direct_url() + elif choice == "4": + example_without_push() + elif choice == "5": + example_weekly_report() + elif choice == "6": + example_test_webhook() + elif choice == "0": + print("退出") + else: + print("无效选项") + + print("=" * 50) + print("示例运行完成!") + print("=" * 50 + "\n") + + +if __name__ == "__main__": + main() + diff --git a/applications/reporter/dingtalk_webhook.py b/applications/reporter/dingtalk_webhook.py new file mode 100644 index 0000000..80fb9db --- /dev/null +++ b/applications/reporter/dingtalk_webhook.py @@ -0,0 +1,236 @@ +""" +钉钉Webhook推送工具 +支持推送Markdown格式消息到钉钉群 +""" +import requests +import json +from typing import Optional, Dict, Any +from loguru import logger + + +class DingTalkWebhook: + """钉钉Webhook推送工具""" + + def __init__(self, webhook_url: str): + """ + 初始化钉钉Webhook + + Args: + webhook_url: 钉钉机器人Webhook地址 + """ + self.webhook_url = webhook_url + self.logger = logger.bind(module="DingTalkWebhook") + + def send_text(self, content: str, at_mobiles: list = None, at_all: bool = False) -> bool: + """ + 发送文本消息 + + Args: + content: 消息内容 + at_mobiles: 要@的手机号列表 + at_all: 是否@所有人 + + Returns: + 是否发送成功 + """ + data = { + "msgtype": "text", + "text": { + "content": content + } + } + + if at_mobiles or at_all: + data["at"] = {} + if at_mobiles: + data["at"]["atMobiles"] = at_mobiles + if at_all: + data["at"]["isAtAll"] = True + + return self._send(data) + + def send_markdown(self, title: str, text: str, at_mobiles: list = None, at_all: bool = False) -> bool: + """ + 发送Markdown消息 + + Args: + title: 消息标题 + text: Markdown内容(钉钉支持的格式) + at_mobiles: 要@的手机号列表 + at_all: 是否@所有人 + + Returns: + 是否发送成功 + """ + # 钉钉markdown消息有长度限制,需要截断 + max_length = 5000 + if len(text) > max_length: + text = text[:max_length - 100] + "\n\n...(内容已截断,完整内容请查看附件)" + self.logger.warning(f"Markdown内容过长,已截断至{max_length}字符") + + data = { + "msgtype": "markdown", + "markdown": { + "title": title, + "text": text + } + } + + if at_mobiles or at_all: + data["at"] = {} + if at_mobiles: + data["at"]["atMobiles"] = at_mobiles + if at_all: + data["at"]["isAtAll"] = True + + return self._send(data) + + def send_markdown_from_file(self, title: str, markdown_file: str, + max_length: int = 5000, at_mobiles: list = None, + at_all: bool = False) -> bool: + """ + 从Markdown文件发送消息 + + Args: + title: 消息标题 + markdown_file: Markdown文件路径 + max_length: 最大长度限制(默认5000字符) + at_mobiles: 要@的手机号列表 + at_all: 是否@所有人 + + Returns: + 是否发送成功 + """ + try: + with open(markdown_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 转换为钉钉markdown格式(简化一些不支持的语法) + text = self._convert_to_dingtalk_markdown(content, max_length) + + return self.send_markdown(title, text, at_mobiles, at_all) + + except Exception as e: + self.logger.error(f"读取Markdown文件失败: {str(e)}", exc_info=True) + return False + + def _convert_to_dingtalk_markdown(self, content: str, max_length: int = 5000) -> str: + """ + 将标准Markdown转换为钉钉支持的格式 + + 钉钉Markdown支持的语法: + - 标题:# ## ### + - 加粗:**text** + - 链接:[text](url) + - 列表:- 或 1. + - 引用:> + - 代码:`code` + - 换行:两个换行符 + + 不支持: + - 表格(需要转换为文本) + - HTML标签 + - 复杂嵌套 + """ + # 如果内容太长,截断并添加提示 + if len(content) > max_length: + content = content[:max_length - 200] + "\n\n---\n\n**提示**: 内容已截断,完整内容请查看报告文件。" + + # 钉钉markdown基本兼容标准markdown,但需要清理一些不支持的语法 + # 保留基本格式即可 + text = content + + return text + + def _send(self, data: Dict[str, Any]) -> bool: + """ + 发送消息到钉钉 + + Args: + data: 消息数据 + + Returns: + 是否发送成功 + """ + try: + headers = { + 'Content-Type': 'application/json' + } + + response = requests.post( + self.webhook_url, + headers=headers, + data=json.dumps(data), + timeout=10 + ) + + response.raise_for_status() + result = response.json() + + if result.get('errcode') == 0: + self.logger.info("消息发送成功") + return True + else: + self.logger.error(f"消息发送失败: {result.get('errmsg', '未知错误')}") + return False + + except requests.exceptions.RequestException as e: + self.logger.error(f"发送消息请求失败: {str(e)}", exc_info=True) + return False + except Exception as e: + self.logger.error(f"发送消息失败: {str(e)}", exc_info=True) + return False + + def send_report(self, title: str, markdown_content: str, markdown_file: str = None) -> bool: + """ + 发送报告消息(优化版本,自动处理长内容) + + Args: + title: 消息标题 + markdown_content: Markdown内容 + markdown_file: Markdown文件路径(可选,用于提示) + + Returns: + 是否发送成功 + """ + # 钉钉markdown有长度限制,需要截断或分段 + max_length = 4500 # 留一些余量 + + if len(markdown_content) <= max_length: + # 内容不长,直接发送 + text = markdown_content + if markdown_file: + text += f"\n\n---\n\n**完整报告**: 已保存到 `{markdown_file}`" + return self.send_markdown(title, text) + else: + # 内容太长,发送摘要 + # 提取关键部分(标题、统计、前几条新闻) + lines = markdown_content.split('\n') + summary_lines = [] + news_count = 0 + max_news_items = 5 + + for line in lines: + summary_lines.append(line) + # 计算已添加的新闻条目数 + if line.startswith('### ') and news_count < max_news_items: + news_count += 1 + # 添加接下来的几行(摘要、链接等) + continue + elif news_count >= max_news_items and line.startswith('### '): + # 达到最大条目数,停止添加 + break + + summary = '\n'.join(summary_lines) + + # 如果还有更多内容,添加提示 + if len(markdown_content) > len(summary): + remaining_count = markdown_content.count('### ') - news_count + summary += f"\n\n---\n\n**提示**: 报告内容较长,已显示前{news_count}条新闻。" + if remaining_count > 0: + summary += f" 还有{remaining_count}条新闻未显示。" + if markdown_file: + summary += f"\n\n**完整报告**: 已保存到 `{markdown_file}`" + + return self.send_markdown(title, summary) + diff --git a/applications/reporter/example_usage.py b/applications/reporter/example_usage.py new file mode 100644 index 0000000..c2f51c2 --- /dev/null +++ b/applications/reporter/example_usage.py @@ -0,0 +1,136 @@ +""" +报告生成器使用示例 +展示各种使用场景 +""" +import os +import sys + +# 添加父目录到路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from applications.reporter.daily import DailyReporter +from applications.reporter.weekly import WeeklyReporter + + +def example_daily_report(): + """示例1:生成简单日报""" + print("=" * 50) + print("示例1:生成日报(使用内置模板)") + print("=" * 50) + + reporter = DailyReporter() + report_path = reporter.generate() + print(f"✅ 日报已生成: {report_path}\n") + + +def example_weekly_report(): + """示例2:生成简单周报""" + print("=" * 50) + print("示例2:生成周报(使用内置模板)") + print("=" * 50) + + reporter = WeeklyReporter() + report_path = reporter.generate() + print(f"✅ 周报已生成: {report_path}\n") + + +def example_custom_template(): + """示例3:使用外部模板""" + print("=" * 50) + print("示例3:使用外部HTML模板生成日报") + print("=" * 50) + + # 获取模板路径(相对于当前文件) + template_path = os.path.join( + os.path.dirname(__file__), + 'templates', + 'custom_template_example.html' + ) + + if os.path.exists(template_path): + reporter = DailyReporter() + report_path = reporter.generate(template_path=template_path) + print(f"✅ 使用外部模板生成的日报: {report_path}\n") + else: + print(f"⚠️ 模板文件不存在: {template_path}\n") + + +def example_custom_output_dir(): + """示例4:指定输出目录""" + print("=" * 50) + print("示例4:指定自定义输出目录") + print("=" * 50) + + reporter = DailyReporter() + custom_dir = "output/reports/custom" + report_path = reporter.generate(output_dir=custom_dir) + print(f"✅ 报告已保存到自定义目录: {report_path}\n") + + +def example_add_data_source(): + """示例5:添加自定义数据源""" + print("=" * 50) + print("示例5:添加自定义数据源") + print("=" * 50) + + try: + from applications.reporter.data_source_example import ComplaintDataSource + from utils.mysql_agent import MySQLAgent + from config import Config + + reporter = DailyReporter() + + # 添加投诉数据源(如果数据库中有该表) + db_agent = MySQLAgent(Config.MYSQL_CONFIG) + complaint_source = ComplaintDataSource(db_agent, table_name="complaint_data") + reporter.add_data_source(complaint_source) + + report_path = reporter.generate() + print(f"✅ 包含自定义数据源的报告已生成: {report_path}\n") + except Exception as e: + print(f"⚠️ 添加自定义数据源失败(可能是表不存在): {str(e)}\n") + + +def main(): + """主函数""" + print("\n" + "=" * 50) + print("汽车后市场情报报告生成器 - 使用示例") + print("=" * 50 + "\n") + + # 运行各种示例 + try: + example_daily_report() + except Exception as e: + print(f"❌ 示例1失败: {str(e)}\n") + + try: + example_weekly_report() + except Exception as e: + print(f"❌ 示例2失败: {str(e)}\n") + + try: + example_custom_template() + except Exception as e: + print(f"❌ 示例3失败: {str(e)}\n") + + try: + example_custom_output_dir() + except Exception as e: + print(f"❌ 示例4失败: {str(e)}\n") + + try: + example_add_data_source() + except Exception as e: + print(f"❌ 示例5失败: {str(e)}\n") + + print("=" * 50) + print("所有示例运行完成!") + print("=" * 50 + "\n") + + +if __name__ == "__main__": + main() + diff --git a/applications/reporter/html_template.py b/applications/reporter/html_template.py new file mode 100644 index 0000000..3a77da3 --- /dev/null +++ b/applications/reporter/html_template.py @@ -0,0 +1,399 @@ +""" +HTML模板管理器 +支持内置模板和外部HTML模板 +""" +import os +import markdown +from bs4 import BeautifulSoup +import re +from typing import Optional +from loguru import logger + + +class HTMLTemplateManager: + """HTML模板管理器""" + + def __init__(self): + self.logger = logger.bind(module="HTMLTemplateManager") + + def markdown_to_html(self, markdown_content: str) -> str: + """将Markdown转换为HTML""" + html = markdown.markdown( + markdown_content, + extensions=['tables', 'fenced_code', 'codehilite'] + ) + return html + + def render_builtin_template(self, markdown_content: str) -> str: + """使用内置模板渲染HTML""" + html_body = self.markdown_to_html(markdown_content) + + # 增强HTML结构 + soup = BeautifulSoup(html_body, 'html.parser') + self._enhance_html_structure(soup) + + # 生成完整HTML + html_template = f""" + + + + + 汽车后市场情报报告 + + + + +
+ {str(soup)} +
+ +""" + + return html_template + + def render_external_template(self, template_path: str, markdown_content: str) -> str: + """ + 使用外部HTML模板渲染 + + Args: + template_path: 外部模板文件路径 + markdown_content: Markdown内容 + + Returns: + 渲染后的HTML内容 + """ + try: + with open(template_path, 'r', encoding='utf-8') as f: + template = f.read() + + html_body = self.markdown_to_html(markdown_content) + + # 查找模板中的占位符并替换 + # 支持 {{content}} 或 {content} 等格式 + patterns = [ + r'\{\{content\}\}', + r'\{content\}', + r'', + ] + + replaced = False + for pattern in patterns: + if re.search(pattern, template, re.IGNORECASE): + template = re.sub(pattern, html_body, template, flags=re.IGNORECASE) + replaced = True + break + + if not replaced: + # 如果没有找到占位符,在body标签内追加内容 + soup = BeautifulSoup(template, 'html.parser') + body = soup.find('body') + if body: + body.append(BeautifulSoup(html_body, 'html.parser')) + else: + # 如果没有body标签,在html末尾追加 + template += html_body + template = str(soup) if soup else template + + self.logger.info(f"使用外部模板渲染: {template_path}") + return template + + except Exception as e: + self.logger.error(f"使用外部模板失败: {str(e)},回退到内置模板", exc_info=True) + return self.render_builtin_template(markdown_content) + + def _enhance_html_structure(self, soup: BeautifulSoup): + """增强HTML结构""" + # 增强表格 + for table in soup.find_all('table'): + if not table.get('class'): + table['class'] = 'data-table' + + # 增强列表项 + for ul in soup.find_all('ul'): + # 检查是否是新闻列表 + if any('新闻' in str(item) for item in ul.find_all('li')): + ul['class'] = 'news-list' + + # 增强链接 + for a in soup.find_all('a'): + if not a.get('target'): + a['target'] = '_blank' + a['rel'] = 'noopener noreferrer' + diff --git a/applications/reporter/templates/custom_template_example.html b/applications/reporter/templates/custom_template_example.html new file mode 100644 index 0000000..a852d08 --- /dev/null +++ b/applications/reporter/templates/custom_template_example.html @@ -0,0 +1,50 @@ + + + + + + 自定义报告模板示例 + + + +
+ + --> + {{content}} +
+ + + diff --git a/applications/reporter/weekly.py b/applications/reporter/weekly.py new file mode 100644 index 0000000..4521db1 --- /dev/null +++ b/applications/reporter/weekly.py @@ -0,0 +1,139 @@ +""" +周报生成器 - 生成7天内的汽车后市场情报报告 +""" +import os +import sys +from datetime import datetime, timedelta +from loguru import logger + +# 添加父目录到路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from applications.reporter.base_reporter import BaseReporter, AIAnalysisDataSource +from applications.reporter.dingtalk_webhook import DingTalkWebhook +from utils.mysql_agent import MySQLAgent +from config import Config + + +class WeeklyReporter(BaseReporter): + """周报生成器""" + + def __init__(self, dingtalk_webhook: str = None): + """ + 初始化周报生成器 + + Args: + dingtalk_webhook: 钉钉Webhook地址(可选) + """ + super().__init__() + # 初始化数据库连接 + db_agent = MySQLAgent(Config.MYSQL_CONFIG) + # 添加AI分析结果数据源(已筛选是否相关=1) + self.add_data_source(AIAnalysisDataSource(db_agent)) + self.logger = logger.bind(module="WeeklyReporter") + + # 初始化钉钉推送(如果提供了webhook) + self.dingtalk_webhook = dingtalk_webhook or getattr(Config, 'DINGTALK_WEBHOOK', None) + self.dingtalk_client = None + if self.dingtalk_webhook: + self.dingtalk_client = DingTalkWebhook(self.dingtalk_webhook) + self.logger.info("已启用钉钉推送功能") + + def generate(self, output_dir: str = "output/reports/weekly", + template_path: str = None, + save_markdown: bool = True, + send_dingtalk: bool = True) -> dict: + """ + 生成周报 + + Args: + output_dir: 输出目录 + template_path: 可选的外部HTML模板路径 + save_markdown: 是否保存Markdown文件 + send_dingtalk: 是否发送到钉钉 + + Returns: + 包含生成文件路径的字典 + """ + self.logger.info("开始生成周报") + + # 计算时间范围:7天内 + end_time = datetime.now() + start_time = end_time - timedelta(days=7) + + self.logger.info(f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 至 {end_time.strftime('%Y-%m-%d %H:%M:%S')}") + + # 收集数据 + articles = self.collect_data(start_time, end_time) + + # 生成报告内容(generate_report_content会自动处理空数据情况) + markdown_content = f"""# 汽车后市场情报周报 + +## 报告时间 +**生成时间**: {end_time.strftime('%Y-%m-%d %H:%M:%S')} +**时间范围**: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 至 {end_time.strftime('%Y-%m-%d %H:%M:%S')} + +{self.generate_report_content(articles, report_type="周报")} +""" + + # 生成HTML报告 + html_content = self.generate_html_report(markdown_content, template_path=template_path) + + # 保存报告 + os.makedirs(output_dir, exist_ok=True) + timestamp = end_time.strftime('%Y%m%d_%H%M%S') + + result = {} + + # 保存HTML报告 + html_filename = f"weekly_report_{timestamp}.html" + html_path = os.path.join(output_dir, html_filename) + self.save_report(html_content, html_path) + result['html_path'] = html_path + self.logger.info(f"HTML报告已保存: {html_path}") + + # 保存Markdown报告 + markdown_path = None + if save_markdown: + markdown_filename = f"weekly_report_{timestamp}.md" + markdown_path = os.path.join(output_dir, markdown_filename) + self.save_markdown_report(markdown_content, markdown_path) + result['markdown_path'] = markdown_path + self.logger.info(f"Markdown报告已保存: {markdown_path}") + + # 发送到钉钉 + if send_dingtalk and self.dingtalk_client: + title = f"汽车后市场情报周报 - {start_time.strftime('%Y-%m-%d')} 至 {end_time.strftime('%Y-%m-%d')}" + success = self.dingtalk_client.send_report(title, markdown_content, markdown_path) + result['dingtalk_sent'] = success + if success: + self.logger.info("报告已推送到钉钉群") + else: + self.logger.warning("报告推送到钉钉群失败") + + self.logger.info(f"周报生成完成") + return result + + +def main(): + """主函数""" + try: + reporter = WeeklyReporter() + result = reporter.generate() + print(f"周报已生成:") + print(f" HTML: {result.get('html_path')}") + if 'markdown_path' in result: + print(f" Markdown: {result.get('markdown_path')}") + if 'dingtalk_sent' in result: + print(f" 钉钉推送: {'成功' if result.get('dingtalk_sent') else '失败'}") + except Exception as e: + logger.error(f"生成周报失败: {str(e)}", exc_info=True) + raise + + +if __name__ == "__main__": + main() + diff --git a/applications/reporter/调用api.py b/applications/reporter/调用api.py new file mode 100644 index 0000000..e6c0dc5 --- /dev/null +++ b/applications/reporter/调用api.py @@ -0,0 +1,234 @@ +from openai import OpenAI +import markdown +from bs4 import BeautifulSoup +import re +import json + + +def process_markdown_content(raw_content): + """智能处理各种可能的Markdown格式输入,包括图表语法""" + # 处理图表代码块(支持mermaid、vega-lite等) + chart_patterns = [ + r'```mermaid(.*?)```', + r'```vega-lite(.*?)```', + r'```chart(.*?)```' + ] + + # 保留原始图表代码块 + for pattern in chart_patterns: + raw_content = re.sub(pattern, lambda m: f'
{m.group(0)}
', raw_content, + flags=re.DOTALL) + + # 处理普通代码块 + code_block_patterns = [ + r'```markdown(.*?)```', + r'```(.*?)```', + r'~~~(.*?)~~~' + ] + + for pattern in code_block_patterns: + matches = re.findall(pattern, raw_content, re.DOTALL) + if matches: + return matches[0].strip() + + return raw_content.strip() + + +def enhance_html_structure(soup): + """增强HTML结构,特别处理图表""" + # 图表块特殊处理 + for pre in soup.find_all('pre', class_='chart-block'): + chart_type = 'mermaid' if 'mermaid' in pre.get_text() else 'vega-lite' if 'vega-lite' in pre.get_text() else 'chart' + pre['class'] = f'chart-container {chart_type}-container' + pre['data-chart-type'] = chart_type + + # 添加图表渲染占位符 + div = soup.new_tag('div', **{ + 'class': 'rendered-chart', + 'data-chart-spec': pre.get_text() + }) + pre.insert_after(div) + + # 表格增强 + for table in soup.find_all('table'): + table['class'] = 'data-table' + if not table.find('thead'): + first_row = table.find('tr') + if first_row: + first_row.name = 'thead' + for cell in first_row.find_all('td'): + cell.name = 'th' + + # 代码块增强 + for pre in soup.find_all('pre'): + if not any( + cls in pre.get('class', []) for cls in ['chart-container', 'mermaid-container', 'vega-lite-container']): + if not pre.find('code'): + code = soup.new_tag('code') + code.string = pre.get_text() + pre.clear() + pre.append(code) + pre['class'] = 'code-block' + + return soup + + +def generate_analysis_report(markdown_file): + """生成自适应分析报告的主函数""" + # 1. 读取Markdown文件 + with open(markdown_file, 'r', encoding='utf-8') as file: + input_content = file.read() + + # 2. 调用API + client = OpenAI( + base_url='https://qianfan.baidubce.com/v2', + api_key='bce-v3/ALTAK-X8C1AorvpdAI3ILPiRerh/4022de183e6b0a38e6b3baeb8af19e937f4a73d4' + ) + + response = client.chat.completions.create( + model="ernie-x1-turbo-32k", + messages=[{ + "role": "user", + "content": f"{input_content}\n\n请生成专业的数据分析报告,要求:\n" + "1. 使用规范的Markdown格式\n" + "2. 包含业务表现分析、产品结构洞察、优化实施方案\n" + "3. 增加门店维度的数据分析" + "4.每个分析下需要展示对应的明细数据" + }] + ) + + # 3. 处理API返回内容 + raw_content = response.choices[0].message.content + processed_md = process_markdown_content(raw_content) + + # 4. 转换为HTML并增强结构 + html_content = markdown.markdown(processed_md, extensions=['tables', 'fenced_code', 'codehilite']) + soup = BeautifulSoup(html_content, 'html.parser') + enhanced_soup = enhance_html_structure(soup) + + # 5. 生成完整HTML报告(包含图表渲染支持) + html_template = f""" + + + + + AI数据分析报告(含图表) + + + + + + + + + +
+ {enhanced_soup} +
+ + + +""" + + # 6. 保存报告 + output_file = 'chart_analysis_report.html' + with open(output_file, 'w', encoding='utf-8') as f: + f.write(html_template) + + return output_file + + +# 使用示例 +if __name__ == "__main__": + report_file = generate_analysis_report('analysis_report.md') + print(f"带图表的分析报告已生成: {report_file}") \ No newline at end of file diff --git a/config.py b/config.py index 796c2c2..d45b367 100644 --- a/config.py +++ b/config.py @@ -41,4 +41,10 @@ class Config: 'delay': 1.5, # 每条记录之间的延迟(秒),避免API限流 'source_table': 'processed_rss_data', # 源数据表 'result_table': 'ai_processor_rss_analysis', # AI分析结果表 - } \ No newline at end of file + } + + # 钉钉Webhook配置 + # 优先从环境变量读取,如果没有则使用下面的默认值(需要用户自行配置) + # 请将下面的空字符串替换为你的钉钉Webhook地址,格式:https://oapi.dingtalk.com/robot/send?access_token=xxx + DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '') # 钉钉机器人Webhook地址 + # 例如:DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', 'https://oapi.dingtalk.com/robot/send?access_token=your_token_here') \ No newline at end of file diff --git a/deploy/start.bat b/deploy/start.bat new file mode 100644 index 0000000..fa7d2af --- /dev/null +++ b/deploy/start.bat @@ -0,0 +1,134 @@ +@echo off +REM 情报数据处理系统 - 简化启动脚本 +REM 功能: Python环境检测 + 系统启动 +REM 作者: AI Assistant +REM 版本: 1.0 +REM 日期: 2025-10-29 + +setlocal enabledelayedexpansion + +REM 设置颜色 +for /f %%a in ('echo prompt $E ^| cmd') do set "ESC=%%a" +set "GREEN=%ESC%[32m" +set "RED=%ESC%[31m" +set "YELLOW=%ESC%[33m" +set "CYAN=%ESC%[36m" +set "RESET=%ESC%[0m" + +REM 配置变量 +set "CONDA_ENV_NAME=intelligence_env" +set "PROJECT_PATH=%~dp0.." +set "PYTHON_VERSION=3.11" + +echo %CYAN%===============================================%RESET% +echo %CYAN% 情报数据处理系统启动器%RESET% +echo %CYAN%===============================================%RESET% +echo. + +REM 检查项目路径 +if not exist "%PROJECT_PATH%\main.py" ( + echo %RED%错误: 项目路径不存在或main.py文件未找到%RESET% + echo %YELLOW%当前路径: %PROJECT_PATH%%RESET% + pause + exit /b 1 +) + +echo %GREEN%✓ 项目路径检查通过%RESET% + +REM 检查Python是否安装 +echo %CYAN%检查Python环境...%RESET% +python --version >nul 2>&1 +if %errorLevel% neq 0 ( + echo %RED%Python未安装或未添加到PATH%RESET% + echo %YELLOW%正在尝试检测Anaconda...%RESET% + + REM 检查Anaconda + where conda >nul 2>&1 + if %errorLevel% neq 0 ( + echo %RED%Anaconda未安装%RESET% + echo %YELLOW%请安装Python或Anaconda后重试%RESET% + echo %CYAN%下载地址: https://www.python.org/downloads/%RESET% + echo %CYAN%或: https://www.anaconda.com/products/distribution%RESET% + pause + exit /b 1 + ) else ( + echo %GREEN%✓ 检测到Anaconda%RESET% + conda --version + ) +) else ( + echo %GREEN%✓ Python已安装%RESET% + python --version +) + +REM 检查Conda环境 +echo %CYAN%检查Conda环境: %CONDA_ENV_NAME%%RESET% +conda env list | findstr /i "%CONDA_ENV_NAME%" >nul 2>&1 +if %errorLevel% neq 0 ( + echo %YELLOW%环境不存在,正在创建...%RESET% + conda create -n %CONDA_ENV_NAME% python=%PYTHON_VERSION% -y + if %errorLevel% neq 0 ( + echo %RED%环境创建失败%RESET% + pause + exit /b 1 + ) + echo %GREEN%✓ 环境创建成功%RESET% +) else ( + echo %GREEN%✓ 环境已存在%RESET% +) + +REM 激活环境 +echo %CYAN%激活Conda环境...%RESET% +call conda activate %CONDA_ENV_NAME% +if %errorLevel% neq 0 ( + echo %RED%环境激活失败%RESET% + pause + exit /b 1 +) +echo %GREEN%✓ 环境激活成功%RESET% + +REM 检查依赖 +echo %CYAN%检查Python依赖...%RESET% +if exist "%PROJECT_PATH%\requirements.txt" ( + echo %YELLOW%安装/更新依赖包...%RESET% + pip install -r "%PROJECT_PATH%\requirements.txt" --quiet + if %errorLevel% neq 0 ( + echo %YELLOW%依赖安装失败,尝试继续运行...%RESET% + ) else ( + echo %GREEN%✓ 依赖安装完成%RESET% + ) +) else ( + echo %YELLOW%未找到requirements.txt,跳过依赖安装%RESET% +) + +REM 切换到项目目录 +echo %CYAN%切换到项目目录: %PROJECT_PATH%%RESET% +cd /d "%PROJECT_PATH%" + +REM 检查配置文件 +if not exist "config.py" ( + echo %YELLOW%警告: 未找到config.py配置文件%RESET% + echo %CYAN%将使用默认配置运行%RESET% +) + +REM 显示启动信息 +echo. +echo %GREEN%===============================================%RESET% +echo %GREEN% 启动情报数据处理系统%RESET% +echo %GREEN%===============================================%RESET% +echo. +echo %CYAN%环境信息:%RESET% +echo Conda环境: %CONDA_ENV_NAME% +echo 项目路径: %PROJECT_PATH% +echo Python版本: +python --version +echo. +echo %YELLOW%按 Ctrl+C 停止系统%RESET% +echo. + +REM 启动系统 +echo %CYAN%启动情报数据处理系统主程序...%RESET% +python main.py + +echo. +echo %CYAN%情报数据处理系统已停止%RESET% +pause diff --git a/logs/application.log b/logs/application.log index 199ec4d..6cb59df 100644 --- a/logs/application.log +++ b/logs/application.log @@ -133927,3 +133927,46 @@ → module: 'RSSDataAIProcessor' 2025-10-29 10:30:40.130 | DEBUG | ai_processor_rss_data:112 - 处理记录 182 (2/3) → module: 'RSSDataAIProcessor' +2025-10-29 17:34:56.620 | INFO | base_reporter:329 - 添加数据源: AI分析结果 + → module: 'BaseReporter' +2025-10-29 17:34:56.621 | INFO | daily:42 - 开始生成日报 + → module: 'DailyReporter' +2025-10-29 17:34:56.621 | INFO | daily:48 - 时间范围: 2025-10-28 17:34:56 至 2025-10-29 17:34:56 + → module: 'DailyReporter' +2025-10-29 17:34:56.621 | DEBUG | mysql_agent:116 - 执行SQL查询 + → module: 'MySQLAgent(Windows)' + → sql: '\n SELECT \n `文章标题` as title,\n `文章链接` as link,\n `文章摘要` as summary,\n `发布时间` as publish_time,\n `来源URL` as source_url,\n `分类` as category,\n `标签` as tags,\n `相关度评分` as relevance_score,\n `分析说明` as analysis_note,\n `处理时间` as process_time\n FROM `ai_processor_rss_analysis`\n ... +2025-10-29 17:34:57.326 | INFO | base_reporter:130 - 时间范围 2025-10-28 17:34:56.621364 到 2025-10-29 17:34:56.621364 内没有相关数据(是否相关=1) + → module: 'AIAnalysisDataSource' +2025-10-29 17:34:57.329 | ERROR | daily:83 - 生成日报失败: No module named 'markdown' + → exc_info: True +2025-10-29 17:35:46.546 | INFO | base_reporter:329 - 添加数据源: AI分析结果 + → module: 'BaseReporter' +2025-10-29 17:35:46.547 | INFO | daily:42 - 开始生成日报 + → module: 'DailyReporter' +2025-10-29 17:35:46.547 | INFO | daily:48 - 时间范围: 2025-10-28 17:35:46 至 2025-10-29 17:35:46 + → module: 'DailyReporter' +2025-10-29 17:35:46.547 | DEBUG | mysql_agent:116 - 执行SQL查询 + → module: 'MySQLAgent(Windows)' + → sql: '\n SELECT \n `文章标题` as title,\n `文章链接` as link,\n `文章摘要` as summary,\n `发布时间` as publish_time,\n `来源URL` as source_url,\n `分类` as category,\n `标签` as tags,\n `相关度评分` as relevance_score,\n `分析说明` as analysis_note,\n `处理时间` as process_time\n FROM `ai_processor_rss_analysis`\n ... +2025-10-29 17:35:47.054 | INFO | base_reporter:130 - 时间范围 2025-10-28 17:35:46.547444 到 2025-10-29 17:35:46.547444 内没有相关数据(是否相关=1) + → module: 'AIAnalysisDataSource' +2025-10-29 17:35:47.358 | INFO | base_reporter:449 - 报告已保存到: output/reports/daily\daily_report_20251029_173546.html + → module: 'DailyReporter' +2025-10-29 17:35:47.360 | INFO | daily:72 - 日报生成完成: output/reports/daily\daily_report_20251029_173546.html + → module: 'DailyReporter' +2025-10-29 17:36:37.665 | INFO | base_reporter:329 - 添加数据源: AI分析结果 + → module: 'BaseReporter' +2025-10-29 17:36:37.666 | INFO | weekly:42 - 开始生成周报 + → module: 'WeeklyReporter' +2025-10-29 17:36:37.666 | INFO | weekly:48 - 时间范围: 2025-10-22 17:36:37 至 2025-10-29 17:36:37 + → module: 'WeeklyReporter' +2025-10-29 17:36:37.667 | DEBUG | mysql_agent:116 - 执行SQL查询 + → module: 'MySQLAgent(Windows)' + → sql: '\n SELECT \n `文章标题` as title,\n `文章链接` as link,\n `文章摘要` as summary,\n `发布时间` as publish_time,\n `来源URL` as source_url,\n `分类` as category,\n `标签` as tags,\n `相关度评分` as relevance_score,\n `分析说明` as analysis_note,\n `处理时间` as process_time\n FROM `ai_processor_rss_analysis`\n ... +2025-10-29 17:36:38.112 | INFO | base_reporter:135 - 获取到 1 条相关数据(是否相关=1) + → module: 'AIAnalysisDataSource' +2025-10-29 17:36:38.234 | INFO | base_reporter:449 - 报告已保存到: output/reports/weekly\weekly_report_20251029_173637.html + → module: 'WeeklyReporter' +2025-10-29 17:36:38.235 | INFO | weekly:72 - 周报生成完成: output/reports/weekly\weekly_report_20251029_173637.html + → module: 'WeeklyReporter' diff --git a/logs/errors.log b/logs/errors.log index bf49e72..555bea5 100644 --- a/logs/errors.log +++ b/logs/errors.log @@ -71071,3 +71071,6 @@ Traceback (most recent call last): └ 'RSSDataProcessor' AttributeError: 类 RSSDataProcessor 中未找到方法 main +2025-10-29 17:34:57.329 | ERROR | daily:83 - 生成日报失败: No module named 'markdown' + → exc_info: True + diff --git a/output/reports/daily/daily_report_20251029_173546.html b/output/reports/daily/daily_report_20251029_173546.html new file mode 100644 index 0000000..3186229 --- /dev/null +++ b/output/reports/daily/daily_report_20251029_173546.html @@ -0,0 +1,301 @@ + + + + + + 汽车后市场情报报告 + + + + +
+

汽车后市场情报日报

+

报告时间

+

生成时间: 2025-10-29 17:35:46 +时间范围: 2025-10-28 17:35:46 至 2025-10-29 17:35:46

+

数据统计

+ +

相关新闻

+

昨日无汽车后市场相关的新闻

+
+ + \ No newline at end of file diff --git a/output/reports/weekly/weekly_report_20251029_173637.html b/output/reports/weekly/weekly_report_20251029_173637.html new file mode 100644 index 0000000..db08af1 --- /dev/null +++ b/output/reports/weekly/weekly_report_20251029_173637.html @@ -0,0 +1,311 @@ + + + + + + 汽车后市场情报报告 + + + + +
+

汽车后市场情报周报

+

报告时间

+

生成时间: 2025-10-29 17:36:37 +时间范围: 2025-10-22 17:36:37 至 2025-10-29 17:36:37

+

数据统计

+ +

汽车后市场相关新闻

+

共找到 1 篇相关新闻:

+

1. 2025年全国汽车以旧换新补贴申请量突破1000万份

+ +
+ + \ No newline at end of file