Files
intelligence_system/applications/reporter/weekly.py
T
2025-10-30 09:54:47 +08:00

140 lines
5.0 KiB
Python

"""
周报生成器 - 生成7天内的汽车后市场情报报告
"""
import os
import sys
from datetime import datetime, timedelta
from loguru import logger
# 添加父目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from applications.reporter.base_reporter import BaseReporter, AIAnalysisDataSource
from applications.reporter.dingtalk_webhook import DingTalkWebhook
from utils.mysql_agent import MySQLAgent
from config import Config
class WeeklyReporter(BaseReporter):
"""周报生成器"""
def __init__(self, dingtalk_webhook: str = None):
"""
初始化周报生成器
Args:
dingtalk_webhook: 钉钉Webhook地址(可选)
"""
super().__init__()
# 初始化数据库连接
db_agent = MySQLAgent(Config.MYSQL_CONFIG)
# 添加AI分析结果数据源(已筛选是否相关=1)
self.add_data_source(AIAnalysisDataSource(db_agent))
self.logger = logger.bind(module="WeeklyReporter")
# 初始化钉钉推送(如果提供了webhook)
self.dingtalk_webhook = dingtalk_webhook or getattr(Config, 'DINGTALK_WEBHOOK', None)
self.dingtalk_client = None
if self.dingtalk_webhook:
self.dingtalk_client = DingTalkWebhook(self.dingtalk_webhook)
self.logger.info("已启用钉钉推送功能")
def generate(self, output_dir: str = "output/reports/weekly",
template_path: str = None,
save_markdown: bool = True,
send_dingtalk: bool = True) -> dict:
"""
生成周报
Args:
output_dir: 输出目录
template_path: 可选的外部HTML模板路径
save_markdown: 是否保存Markdown文件
send_dingtalk: 是否发送到钉钉
Returns:
包含生成文件路径的字典
"""
self.logger.info("开始生成周报")
# 计算时间范围:7天内
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
self.logger.info(f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')}{end_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 收集数据
articles = self.collect_data(start_time, end_time)
# 生成报告内容(generate_report_content会自动处理空数据情况)
markdown_content = f"""# 汽车后市场情报周报
## 报告时间
**生成时间**: {end_time.strftime('%Y-%m-%d %H:%M:%S')}
**时间范围**: {start_time.strftime('%Y-%m-%d %H:%M:%S')}{end_time.strftime('%Y-%m-%d %H:%M:%S')}
{self.generate_report_content(articles, report_type="周报")}
"""
# 生成HTML报告
html_content = self.generate_html_report(markdown_content, template_path=template_path)
# 保存报告
os.makedirs(output_dir, exist_ok=True)
timestamp = end_time.strftime('%Y%m%d_%H%M%S')
result = {}
# 保存HTML报告
html_filename = f"weekly_report_{timestamp}.html"
html_path = os.path.join(output_dir, html_filename)
self.save_report(html_content, html_path)
result['html_path'] = html_path
self.logger.info(f"HTML报告已保存: {html_path}")
# 保存Markdown报告
markdown_path = None
if save_markdown:
markdown_filename = f"weekly_report_{timestamp}.md"
markdown_path = os.path.join(output_dir, markdown_filename)
self.save_markdown_report(markdown_content, markdown_path)
result['markdown_path'] = markdown_path
self.logger.info(f"Markdown报告已保存: {markdown_path}")
# 发送到钉钉
if send_dingtalk and self.dingtalk_client:
title = f"汽车后市场情报周报 - {start_time.strftime('%Y-%m-%d')}{end_time.strftime('%Y-%m-%d')}"
success = self.dingtalk_client.send_report(title, markdown_content, markdown_path)
result['dingtalk_sent'] = success
if success:
self.logger.info("报告已推送到钉钉群")
else:
self.logger.warning("报告推送到钉钉群失败")
self.logger.info(f"周报生成完成")
return result
def main():
"""主函数"""
try:
reporter = WeeklyReporter()
result = reporter.generate()
print(f"周报已生成:")
print(f" HTML: {result.get('html_path')}")
if 'markdown_path' in result:
print(f" Markdown: {result.get('markdown_path')}")
if 'dingtalk_sent' in result:
print(f" 钉钉推送: {'成功' if result.get('dingtalk_sent') else '失败'}")
except Exception as e:
logger.error(f"生成周报失败: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
main()