Compare commits

..

7 Commits

Author SHA1 Message Date
panda b0bf0fa9bc 优化连接不上时创建表 2025-11-05 09:50:55 +08:00
panda 4154eb452f 钉钉api 2025-10-30 17:24:28 +08:00
panda c5a5a0a99c 生成日报、周报 2025-10-30 09:54:47 +08:00
panda c894e344aa test 2025-10-29 10:45:08 +08:00
panda 5d1155bd20 test 2025-10-29 10:44:53 +08:00
panda fc18fa74c3 娣诲姞RSS鏁版嵁澶勭悊鍣ㄥ拰浠诲姟璋冨害鍔熻兘锛屾洿鏂伴厤缃拰鏃ュ織鏂囦欢 2025-10-29 10:39:13 +08:00
panda c5f6e8288d ai提取rss相关数据 2025-10-28 13:43:06 +08:00
44 changed files with 59830 additions and 170 deletions
Binary file not shown.
Binary file not shown.
+459
View File
@@ -0,0 +1,459 @@
"""
报告生成器基类
提供数据源接口、AI处理接口等扩展能力
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import os
import sys
from loguru import logger
# 添加父目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from utils.mysql_agent import MySQLAgent
from config import Config
class DataSource(ABC):
"""数据源接口基类,用于后续扩展其他数据源"""
@abstractmethod
def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""
获取指定时间范围内的数据
Args:
start_time: 开始时间
end_time: 结束时间
Returns:
数据列表,每条数据应包含:标题、链接、摘要、发布时间等字段
"""
pass
@abstractmethod
def get_source_name(self) -> str:
"""获取数据源名称"""
pass
class RSSDataSource(DataSource):
"""RSS数据源实现"""
def __init__(self, db_agent: MySQLAgent, table_name: str = "collector_rss_subscriptions"):
self.db_agent = db_agent
self.table_name = table_name
self.logger = logger.bind(module="RSSDataSource")
def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""从数据库获取RSS数据"""
try:
sql = f"""
SELECT
`文章标题` as title,
`文章链接` as link,
`文章摘要` as summary,
`发布时间` as publish_time,
`来源URL` as source_url,
`创建时间` as create_time
FROM `{self.table_name}`
WHERE `发布时间` >= %s AND `发布时间` < %s
ORDER BY `发布时间` DESC
"""
params = (
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S')
)
df = self.db_agent.query_to_df(sql, params=params, is_print=False)
if df.empty:
self.logger.info(f"时间范围 {start_time}{end_time} 内没有RSS数据")
return []
# 转换为字典列表
data_list = df.to_dict('records')
self.logger.info(f"获取到 {len(data_list)} 条RSS数据")
return data_list
except Exception as e:
self.logger.error(f"获取RSS数据失败: {str(e)}", exc_info=True)
return []
def get_source_name(self) -> str:
return "RSS订阅"
class AIAnalysisDataSource(DataSource):
"""AI分析结果数据源实现 - 从ai_processor_rss_analysis表获取已筛选的相关内容"""
def __init__(self, db_agent: MySQLAgent, table_name: str = "ai_processor_rss_analysis"):
self.db_agent = db_agent
self.table_name = table_name
self.logger = logger.bind(module="AIAnalysisDataSource")
def fetch_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""从AI分析结果表获取相关数据(是否相关=1)"""
try:
sql = f"""
SELECT
`文章标题` as title,
`文章链接` as link,
`文章摘要` as summary,
`发布时间` as publish_time,
`来源URL` as source_url,
`分类` as category,
`标签` as tags,
`相关度评分` as relevance_score,
`分析说明` as analysis_note,
`处理时间` as process_time
FROM `{self.table_name}`
WHERE `发布时间` >= %s AND `发布时间` < %s
AND `是否相关` = 1
ORDER BY `发布时间` DESC, `相关度评分` DESC
"""
params = (
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S')
)
df = self.db_agent.query_to_df(sql, params=params, is_print=False)
if df.empty:
self.logger.info(f"时间范围 {start_time}{end_time} 内没有相关数据(是否相关=1")
return []
# 转换为字典列表
data_list = df.to_dict('records')
self.logger.info(f"获取到 {len(data_list)} 条相关数据(是否相关=1")
return data_list
except Exception as e:
self.logger.error(f"获取AI分析数据失败: {str(e)}", exc_info=True)
return []
def get_source_name(self) -> str:
return "AI分析结果"
class AIProcessor:
"""AI处理器,用于筛选和分析内容"""
def __init__(self, api_key: str = None, model: str = None):
from openai import OpenAI
self.base_url = 'https://qianfan.baidubce.com/v2'
self.api_key = api_key or Config.BAIDU_AI_CONFIG.get('api_key')
self.model = model or Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k')
self.client = OpenAI(
base_url=self.base_url,
api_key=self.api_key
)
self.logger = logger.bind(module="AIProcessor")
def filter_automotive_content(self, articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
筛选与汽车后市场相关的内容
Args:
articles: 文章列表
Returns:
筛选后的文章列表(包含AI标记信息)
"""
if not articles:
return []
self.logger.info(f"开始AI筛选 {len(articles)} 篇文章")
# 批量处理,避免API限流
batch_size = 10
filtered_articles = []
for i in range(0, len(articles), batch_size):
batch = articles[i:i + batch_size]
try:
# 构建批量分析的prompt
articles_text = ""
for idx, article in enumerate(batch):
articles_text += f"\n[{idx + i}] 标题: {article.get('title', '')}\n"
articles_text += f"摘要: {article.get('summary', '')}\n"
prompt = f"""请分析以下新闻文章,判断哪些与汽车后市场相关。
汽车后市场的定义:汽车销售以后,围绕汽车使用过程中的各种服务,包括:
- 汽车维修保养
- 汽车配件
- 汽车改装
- 汽车美容
- 汽车用品
- 汽车金融
- 汽车保险
- 二手车交易
- 汽车租赁
- 汽车检测
- 汽车报废回收
- 汽车相关法律法规和政策
文章列表:
{articles_text}
请按以下JSON格式返回结果:
{{
"related_articles": [
{{
"index": 文章的序号(从0开始),
"is_related": true/false,
"reason": "判断理由",
"category": "所属类别(如:维修保养、配件、政策等)"
}}
]
}}
只返回JSON,不要其他文字说明。"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": prompt
}]
)
result_text = response.choices[0].message.content.strip()
# 尝试解析JSON(去除可能的markdown代码块标记)
import json
import re
# 提取JSON部分(尝试多种方式)
result_json = None
# 方式1:查找markdown代码块中的JSON
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', result_text, re.DOTALL)
if json_match:
try:
result_json = json.loads(json_match.group(1))
except:
pass
# 方式2:直接查找JSON对象
if result_json is None:
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
if json_match:
try:
result_json = json.loads(json_match.group())
except:
pass
# 方式3:尝试直接解析
if result_json is None:
try:
result_json = json.loads(result_text)
except:
self.logger.warning(f"无法解析AI返回的JSON: {result_text[:200]}")
result_json = {'related_articles': []}
# 处理结果
for item in result_json.get('related_articles', []):
idx = item.get('index', -1)
if 0 <= idx < len(batch):
article = batch[idx]
if item.get('is_related', False):
article['ai_marked'] = True
article['ai_category'] = item.get('category', '其他')
article['ai_reason'] = item.get('reason', '')
filtered_articles.append(article)
# 避免API限流
import time
if i + batch_size < len(articles):
time.sleep(1.5)
except Exception as e:
self.logger.error(f"AI筛选批处理失败: {str(e)}", exc_info=True)
# 如果AI处理失败,保留所有文章但标记为未筛选
for article in batch:
article['ai_marked'] = False
article['ai_error'] = str(e)
self.logger.info(f"AI筛选完成,找到 {len(filtered_articles)} 篇相关文章")
return filtered_articles
def generate_news_summary(self, articles: List[Dict[str, Any]]) -> str:
"""
生成新闻摘要
Args:
articles: 筛选后的文章列表
Returns:
Markdown格式的新闻摘要
"""
if not articles:
return "## 相关新闻\n\n暂无相关新闻。\n"
articles_text = ""
for idx, article in enumerate(articles, 1):
category = article.get('ai_category', '其他')
reason = article.get('ai_reason', '')
articles_text += f"\n### {idx}. {article.get('title', '无标题')}\n"
articles_text += f"- **类别**: {category}\n"
articles_text += f"- **摘要**: {article.get('summary', '无摘要')}\n"
articles_text += f"- **链接**: [{article.get('link', '')}]({article.get('link', '')})\n"
articles_text += f"- **发布时间**: {article.get('publish_time', '')}\n"
if reason:
articles_text += f"- **相关性说明**: {reason}\n"
articles_text += "\n"
return f"## 汽车后市场相关新闻\n\n共找到 {len(articles)} 篇相关新闻:\n\n{articles_text}"
class BaseReporter:
"""报告生成器基类"""
def __init__(self, data_sources: List[DataSource] = None):
self.data_sources = data_sources or []
self.ai_processor = AIProcessor()
self.logger = logger.bind(module="BaseReporter")
def add_data_source(self, data_source: DataSource):
"""添加数据源"""
self.data_sources.append(data_source)
self.logger.info(f"添加数据源: {data_source.get_source_name()}")
def collect_data(self, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""从所有数据源收集数据"""
all_data = []
for source in self.data_sources:
try:
data = source.fetch_data(start_time, end_time)
# 标记数据来源
for item in data:
item['data_source'] = source.get_source_name()
all_data.extend(data)
except Exception as e:
self.logger.error(f"{source.get_source_name()} 收集数据失败: {str(e)}")
# 按发布时间排序
all_data.sort(key=lambda x: x.get('publish_time', ''), reverse=True)
return all_data
def generate_report_content(self, articles: List[Dict[str, Any]], report_type: str = "日报") -> str:
"""
生成报告内容(Markdown格式)
Args:
articles: 文章列表(已从AI分析结果表筛选,是否相关=1)
report_type: 报告类型("日报""周报"),用于无数据时的提示
"""
# 数据已经是从AI分析结果表筛选过的(是否相关=1),直接使用
related_articles = articles
# 生成统计信息
related_count = len(related_articles)
# 如果没有相关数据,返回提示信息
if related_count == 0:
if report_type == "日报":
message = "昨日无汽车后市场相关的新闻"
else:
message = "上周无汽车后市场相关的新闻"
return f"""
## 数据统计
- **相关文章数**: 0
## 相关新闻
{message}
"""
# 生成新闻摘要
news_summary = self._generate_news_summary_from_analysis(related_articles)
stats = f"""
## 数据统计
- **相关文章数**: {related_count}
"""
return stats + news_summary
def _generate_news_summary_from_analysis(self, articles: List[Dict[str, Any]]) -> str:
"""
从AI分析结果生成新闻摘要(使用数据库中已有的分类和分析说明)
Args:
articles: 文章列表(包含category、tags、analysis_note等字段)
Returns:
Markdown格式的新闻摘要
"""
if not articles:
return "## 相关新闻\n\n暂无相关新闻。\n"
articles_text = ""
for idx, article in enumerate(articles, 1):
category = article.get('category', '其他')
tags = article.get('tags', '')
analysis_note = article.get('analysis_note', '')
relevance_score = article.get('relevance_score', '')
articles_text += f"\n### {idx}. {article.get('title', '无标题')}\n"
articles_text += f"- **分类**: {category}\n"
if tags:
articles_text += f"- **标签**: {tags}\n"
articles_text += f"- **摘要**: {article.get('summary', '无摘要')}\n"
articles_text += f"- **链接**: [{article.get('link', '')}]({article.get('link', '')})\n"
articles_text += f"- **发布时间**: {article.get('publish_time', '')}\n"
if relevance_score:
articles_text += f"- **相关度评分**: {relevance_score}\n"
if analysis_note:
articles_text += f"- **分析说明**: {analysis_note}\n"
articles_text += "\n"
return f"## 汽车后市场相关新闻\n\n共找到 {len(articles)} 篇相关新闻:\n\n{articles_text}"
def generate_html_report(self, markdown_content: str, template_path: str = None) -> str:
"""生成HTML报告"""
# 使用相对导入避免循环依赖
from .html_template import HTMLTemplateManager
template_manager = HTMLTemplateManager()
if template_path and os.path.exists(template_path):
# 使用外部模板
html_content = template_manager.render_external_template(template_path, markdown_content)
else:
# 使用内置模板
html_content = template_manager.render_builtin_template(markdown_content)
return html_content
def save_report(self, html_content: str, output_path: str):
"""保存HTML报告到文件"""
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
self.logger.info(f"HTML报告已保存到: {output_path}")
def save_markdown_report(self, markdown_content: str, output_path: str):
"""保存Markdown报告到文件"""
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
self.logger.info(f"Markdown报告已保存到: {output_path}")
+139
View File
@@ -0,0 +1,139 @@
"""
日报生成器 - 生成24小时内的汽车后市场情报报告
"""
import os
import sys
from datetime import datetime, timedelta
from loguru import logger
# 添加父目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from applications.reporter.base_reporter import BaseReporter, AIAnalysisDataSource
from applications.reporter.dingtalk_webhook import DingTalkWebhook
from utils.mysql_agent import MySQLAgent
from config import Config
class DailyReporter(BaseReporter):
"""日报生成器"""
def __init__(self, dingtalk_webhook: str = None):
"""
初始化日报生成器
Args:
dingtalk_webhook: 钉钉Webhook地址(可选)
"""
super().__init__()
# 初始化数据库连接
db_agent = MySQLAgent(Config.MYSQL_CONFIG)
# 添加AI分析结果数据源(已筛选是否相关=1)
self.add_data_source(AIAnalysisDataSource(db_agent))
self.logger = logger.bind(module="DailyReporter")
# 初始化钉钉推送(如果提供了webhook)
self.dingtalk_webhook = dingtalk_webhook or getattr(Config, 'DINGTALK_WEBHOOK', None)
self.dingtalk_client = None
if self.dingtalk_webhook:
self.dingtalk_client = DingTalkWebhook(self.dingtalk_webhook)
self.logger.info("已启用钉钉推送功能")
def generate(self, output_dir: str = "output/reports/daily",
template_path: str = None,
save_markdown: bool = True,
send_dingtalk: bool = True) -> dict:
"""
生成日报
Args:
output_dir: 输出目录
template_path: 可选的外部HTML模板路径
save_markdown: 是否保存Markdown文件
send_dingtalk: 是否发送到钉钉
Returns:
包含生成文件路径的字典
"""
self.logger.info("开始生成日报")
# 计算时间范围:24小时内
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
self.logger.info(f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')}{end_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 收集数据
articles = self.collect_data(start_time, end_time)
# 生成报告内容(generate_report_content会自动处理空数据情况)
markdown_content = f"""# 汽车后市场情报日报
## 报告时间
**生成时间**: {end_time.strftime('%Y-%m-%d %H:%M:%S')}
**时间范围**: {start_time.strftime('%Y-%m-%d %H:%M:%S')}{end_time.strftime('%Y-%m-%d %H:%M:%S')}
{self.generate_report_content(articles, report_type="日报")}
"""
# 生成HTML报告
html_content = self.generate_html_report(markdown_content, template_path=template_path)
# 保存报告
os.makedirs(output_dir, exist_ok=True)
timestamp = end_time.strftime('%Y%m%d_%H%M%S')
result = {}
# 保存HTML报告
html_filename = f"daily_report_{timestamp}.html"
html_path = os.path.join(output_dir, html_filename)
self.save_report(html_content, html_path)
result['html_path'] = html_path
self.logger.info(f"HTML报告已保存: {html_path}")
# 保存Markdown报告
markdown_path = None
if save_markdown:
markdown_filename = f"daily_report_{timestamp}.md"
markdown_path = os.path.join(output_dir, markdown_filename)
self.save_markdown_report(markdown_content, markdown_path)
result['markdown_path'] = markdown_path
self.logger.info(f"Markdown报告已保存: {markdown_path}")
# 发送到钉钉
if send_dingtalk and self.dingtalk_client:
title = f"汽车后市场情报日报 - {end_time.strftime('%Y-%m-%d')}"
success = self.dingtalk_client.send_report(title, markdown_content, markdown_path)
result['dingtalk_sent'] = success
if success:
self.logger.info("报告已推送到钉钉群")
else:
self.logger.warning("报告推送到钉钉群失败")
self.logger.info(f"日报生成完成")
return result
def main():
"""主函数"""
try:
reporter = DailyReporter()
result = reporter.generate()
print(f"日报已生成:")
print(f" HTML: {result.get('html_path')}")
if 'markdown_path' in result:
print(f" Markdown: {result.get('markdown_path')}")
if 'dingtalk_sent' in result:
print(f" 钉钉推送: {'成功' if result.get('dingtalk_sent') else '失败'}")
except Exception as e:
logger.error(f"生成日报失败: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
main()
+399
View File
@@ -0,0 +1,399 @@
"""
HTML模板管理器
支持内置模板和外部HTML模板
"""
import os
import markdown
from bs4 import BeautifulSoup
import re
from typing import Optional
from loguru import logger
class HTMLTemplateManager:
"""HTML模板管理器"""
def __init__(self):
self.logger = logger.bind(module="HTMLTemplateManager")
def markdown_to_html(self, markdown_content: str) -> str:
"""将Markdown转换为HTML"""
html = markdown.markdown(
markdown_content,
extensions=['tables', 'fenced_code', 'codehilite']
)
return html
def render_builtin_template(self, markdown_content: str) -> str:
"""使用内置模板渲染HTML"""
html_body = self.markdown_to_html(markdown_content)
# 增强HTML结构
soup = BeautifulSoup(html_body, 'html.parser')
self._enhance_html_structure(soup)
# 生成完整HTML
html_template = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>汽车后市场情报报告</title>
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans+SC:wght@300;400;500;700&display=swap" rel="stylesheet">
<style>
:root {{
--primary: #3498db;
--secondary: #2ecc71;
--accent: #e74c3c;
--dark: #2c3e50;
--light: #f8f9fa;
--border: #e0e0e0;
}}
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
body {{
font-family: 'Noto Sans SC', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.8;
color: #333;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
padding: 20px;
}}
.report-container {{
max-width: 1200px;
margin: 0 auto;
padding: 40px;
background: white;
box-shadow: 0 10px 40px rgba(0,0,0,0.1);
border-radius: 12px;
}}
.report-header {{
text-align: center;
padding-bottom: 30px;
border-bottom: 3px solid var(--primary);
margin-bottom: 40px;
}}
.report-header h1 {{
color: var(--dark);
font-size: 2.5em;
margin-bottom: 10px;
}}
.report-header .report-date {{
color: #666;
font-size: 1.1em;
}}
h1 {{
color: var(--dark);
font-size: 2em;
margin: 30px 0 20px 0;
padding-bottom: 10px;
border-bottom: 2px solid var(--primary);
}}
h2 {{
color: var(--dark);
font-size: 1.6em;
margin: 25px 0 15px 0;
padding-left: 10px;
border-left: 4px solid var(--primary);
}}
h3 {{
color: var(--dark);
font-size: 1.3em;
margin: 20px 0 10px 0;
}}
h4 {{
color: #555;
font-size: 1.1em;
margin: 15px 0 8px 0;
}}
p {{
margin: 12px 0;
text-align: justify;
}}
ul, ol {{
margin: 15px 0;
padding-left: 30px;
}}
li {{
margin: 8px 0;
}}
/* 表格样式 */
table {{
width: 100%;
border-collapse: collapse;
margin: 25px 0;
box-shadow: 0 2px 15px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}}
table thead {{
background: linear-gradient(135deg, var(--primary) 0%, #2980b9 100%);
color: white;
}}
table th {{
padding: 15px;
text-align: left;
font-weight: 600;
}}
table td {{
padding: 12px 15px;
border-bottom: 1px solid var(--border);
}}
table tbody tr:hover {{
background-color: #f5f5f5;
}}
table tbody tr:last-child td {{
border-bottom: none;
}}
/* 代码块样式 */
pre {{
background: #f4f4f4;
border: 1px solid var(--border);
border-radius: 6px;
padding: 15px;
overflow-x: auto;
margin: 20px 0;
}}
code {{
background: #f4f4f4;
padding: 2px 6px;
border-radius: 3px;
font-family: 'Courier New', monospace;
font-size: 0.9em;
}}
pre code {{
background: none;
padding: 0;
}}
/* 链接样式 */
a {{
color: var(--primary);
text-decoration: none;
border-bottom: 1px dotted var(--primary);
transition: all 0.3s;
}}
a:hover {{
color: var(--accent);
border-bottom-color: var(--accent);
}}
/* 新闻列表样式 */
.news-item {{
background: #f9f9f9;
border-left: 4px solid var(--secondary);
padding: 15px 20px;
margin: 15px 0;
border-radius: 6px;
transition: all 0.3s;
}}
.news-item:hover {{
background: #f0f0f0;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}}
.news-item h3 {{
margin-top: 0;
color: var(--dark);
}}
.news-item .news-meta {{
color: #666;
font-size: 0.9em;
margin-top: 10px;
}}
.news-item .news-category {{
display: inline-block;
background: var(--secondary);
color: white;
padding: 3px 10px;
border-radius: 12px;
font-size: 0.85em;
margin-right: 10px;
}}
/* 统计信息样式 */
.stats-box {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 25px;
border-radius: 10px;
margin: 25px 0;
}}
.stats-box h2 {{
color: white;
border: none;
padding: 0;
margin: 0 0 15px 0;
}}
.stats-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin-top: 20px;
}}
.stat-item {{
text-align: center;
}}
.stat-number {{
font-size: 2.5em;
font-weight: bold;
margin-bottom: 5px;
}}
.stat-label {{
font-size: 0.9em;
opacity: 0.9;
}}
/* 响应式设计 */
@media (max-width: 768px) {{
.report-container {{
padding: 20px;
}}
.report-header h1 {{
font-size: 1.8em;
}}
h1 {{
font-size: 1.6em;
}}
h2 {{
font-size: 1.3em;
}}
table {{
font-size: 0.9em;
}}
table th,
table td {{
padding: 8px;
}}
}}
/* 打印样式 */
@media print {{
body {{
background: white;
padding: 0;
}}
.report-container {{
box-shadow: none;
padding: 0;
}}
}}
</style>
</head>
<body>
<div class="report-container">
{str(soup)}
</div>
</body>
</html>"""
return html_template
def render_external_template(self, template_path: str, markdown_content: str) -> str:
"""
使用外部HTML模板渲染
Args:
template_path: 外部模板文件路径
markdown_content: Markdown内容
Returns:
渲染后的HTML内容
"""
try:
with open(template_path, 'r', encoding='utf-8') as f:
template = f.read()
html_body = self.markdown_to_html(markdown_content)
# 查找模板中的占位符并替换
# 支持 {{content}} 或 {content} 等格式
patterns = [
r'\{\{content\}\}',
r'\{content\}',
r'<!--\s*content\s*-->',
]
replaced = False
for pattern in patterns:
if re.search(pattern, template, re.IGNORECASE):
template = re.sub(pattern, html_body, template, flags=re.IGNORECASE)
replaced = True
break
if not replaced:
# 如果没有找到占位符,在body标签内追加内容
soup = BeautifulSoup(template, 'html.parser')
body = soup.find('body')
if body:
body.append(BeautifulSoup(html_body, 'html.parser'))
else:
# 如果没有body标签,在html末尾追加
template += html_body
template = str(soup) if soup else template
self.logger.info(f"使用外部模板渲染: {template_path}")
return template
except Exception as e:
self.logger.error(f"使用外部模板失败: {str(e)},回退到内置模板", exc_info=True)
return self.render_builtin_template(markdown_content)
def _enhance_html_structure(self, soup: BeautifulSoup):
"""增强HTML结构"""
# 增强表格
for table in soup.find_all('table'):
if not table.get('class'):
table['class'] = 'data-table'
# 增强列表项
for ul in soup.find_all('ul'):
# 检查是否是新闻列表
if any('新闻' in str(item) for item in ul.find_all('li')):
ul['class'] = 'news-list'
# 增强链接
for a in soup.find_all('a'):
if not a.get('target'):
a['target'] = '_blank'
a['rel'] = 'noopener noreferrer'
@@ -0,0 +1,50 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>自定义报告模板示例</title>
<style>
/* 自定义样式示例 */
body {
font-family: 'Microsoft YaHei', Arial, sans-serif;
background: #f0f2f5;
padding: 20px;
margin: 0;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
/* 内容区域样式 */
#content {
line-height: 1.8;
}
h1 {
color: #1890ff;
border-bottom: 2px solid #1890ff;
padding-bottom: 10px;
}
h2 {
color: #333;
margin-top: 30px;
}
</style>
</head>
<body>
<div class="container">
<!-- 占位符:内容将在这里插入 -->
<!-- 支持以下格式之一:{{content}} 或 {content} 或 <!-- content --> -->
{{content}}
</div>
</body>
</html>
+139
View File
@@ -0,0 +1,139 @@
"""
周报生成器 - 生成7天内的汽车后市场情报报告
"""
import os
import sys
from datetime import datetime, timedelta
from loguru import logger
# 添加父目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from applications.reporter.base_reporter import BaseReporter, AIAnalysisDataSource
from applications.reporter.dingtalk_webhook import DingTalkWebhook
from utils.mysql_agent import MySQLAgent
from config import Config
class WeeklyReporter(BaseReporter):
"""周报生成器"""
def __init__(self, dingtalk_webhook: str = None):
"""
初始化周报生成器
Args:
dingtalk_webhook: 钉钉Webhook地址(可选)
"""
super().__init__()
# 初始化数据库连接
db_agent = MySQLAgent(Config.MYSQL_CONFIG)
# 添加AI分析结果数据源(已筛选是否相关=1)
self.add_data_source(AIAnalysisDataSource(db_agent))
self.logger = logger.bind(module="WeeklyReporter")
# 初始化钉钉推送(如果提供了webhook)
self.dingtalk_webhook = dingtalk_webhook or getattr(Config, 'DINGTALK_WEBHOOK', None)
self.dingtalk_client = None
if self.dingtalk_webhook:
self.dingtalk_client = DingTalkWebhook(self.dingtalk_webhook)
self.logger.info("已启用钉钉推送功能")
def generate(self, output_dir: str = "output/reports/weekly",
template_path: str = None,
save_markdown: bool = True,
send_dingtalk: bool = True) -> dict:
"""
生成周报
Args:
output_dir: 输出目录
template_path: 可选的外部HTML模板路径
save_markdown: 是否保存Markdown文件
send_dingtalk: 是否发送到钉钉
Returns:
包含生成文件路径的字典
"""
self.logger.info("开始生成周报")
# 计算时间范围:7天内
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
self.logger.info(f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')}{end_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 收集数据
articles = self.collect_data(start_time, end_time)
# 生成报告内容(generate_report_content会自动处理空数据情况)
markdown_content = f"""# 汽车后市场情报周报
## 报告时间
**生成时间**: {end_time.strftime('%Y-%m-%d %H:%M:%S')}
**时间范围**: {start_time.strftime('%Y-%m-%d %H:%M:%S')}{end_time.strftime('%Y-%m-%d %H:%M:%S')}
{self.generate_report_content(articles, report_type="周报")}
"""
# 生成HTML报告
html_content = self.generate_html_report(markdown_content, template_path=template_path)
# 保存报告
os.makedirs(output_dir, exist_ok=True)
timestamp = end_time.strftime('%Y%m%d_%H%M%S')
result = {}
# 保存HTML报告
html_filename = f"weekly_report_{timestamp}.html"
html_path = os.path.join(output_dir, html_filename)
self.save_report(html_content, html_path)
result['html_path'] = html_path
self.logger.info(f"HTML报告已保存: {html_path}")
# 保存Markdown报告
markdown_path = None
if save_markdown:
markdown_filename = f"weekly_report_{timestamp}.md"
markdown_path = os.path.join(output_dir, markdown_filename)
self.save_markdown_report(markdown_content, markdown_path)
result['markdown_path'] = markdown_path
self.logger.info(f"Markdown报告已保存: {markdown_path}")
# 发送到钉钉
if send_dingtalk and self.dingtalk_client:
title = f"汽车后市场情报周报 - {start_time.strftime('%Y-%m-%d')}{end_time.strftime('%Y-%m-%d')}"
success = self.dingtalk_client.send_report(title, markdown_content, markdown_path)
result['dingtalk_sent'] = success
if success:
self.logger.info("报告已推送到钉钉群")
else:
self.logger.warning("报告推送到钉钉群失败")
self.logger.info(f"周报生成完成")
return result
def main():
"""主函数"""
try:
reporter = WeeklyReporter()
result = reporter.generate()
print(f"周报已生成:")
print(f" HTML: {result.get('html_path')}")
if 'markdown_path' in result:
print(f" Markdown: {result.get('markdown_path')}")
if 'dingtalk_sent' in result:
print(f" 钉钉推送: {'成功' if result.get('dingtalk_sent') else '失败'}")
except Exception as e:
logger.error(f"生成周报失败: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
main()
Binary file not shown.
+35 -2
View File
@@ -1,11 +1,23 @@
import os
class Config:
MYSQL_CONFIG = {
'host': '123.60.167.249',
'port': 3306,
'user': 'intelligence',
'password': '123123',
'database': "intelligence_system",
'max_connections': 10
}
OFFLINE_MYSQL_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': '123123',
'database':"intelligence_system",
'database': "intelligence_system",
'max_connections': 10
}
@@ -14,4 +26,25 @@ class Config:
'access_key': 'admin',
'secret_key': 'abc88888888',
'secure': False # 社区版默认不启用SSL
}
}
# 百度AI API配置(千帆平台)
# 优先从环境变量读取,如果没有则使用默认值(需要用户自行配置)
BAIDU_AI_CONFIG = {
'api_key': os.getenv('BAIDU_API_KEY', 'bce-v3/ALTAK-SFA4vEP3uBYLsyqCZcERg/1f43596d40d9a2c8318b13d5888a5e8e4e7a7f30'), # 百度千帆API Key
'model': 'ernie-x1-turbo-32k', # 使用的模型
}
# AI处理器配置
AI_PROCESSOR_CONFIG = {
'batch_size': 10, # 批量处理的默认大小
'delay': 1.5, # 每条记录之间的延迟(秒),避免API限流
'source_table': 'processed_rss_data', # 源数据表
'result_table': 'ai_processor_rss_analysis', # AI分析结果表
}
# 钉钉Webhook配置
# 优先从环境变量读取,如果没有则使用下面的默认值(需要用户自行配置)
# 请将下面的空字符串替换为你的钉钉Webhook地址,格式:https://oapi.dingtalk.com/robot/send?access_token=xxx
DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '') # 钉钉机器人Webhook地址
# 例如:DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', 'https://oapi.dingtalk.com/robot/send?access_token=your_token_here')
+135
View File
@@ -0,0 +1,135 @@
@echo off
REM 情报数据处理系统 - 简化启动脚本
REM 功能: Python环境检测 + 系统启动
REM 作者: AI Assistant
REM 版本: 1.0
REM 日期: 2025-10-29
chcp 65001 >nul
setlocal enabledelayedexpansion
REM 设置颜色
for /f %%a in ('echo prompt $E ^| cmd') do set "ESC=%%a"
set "GREEN=%ESC%[32m"
set "RED=%ESC%[31m"
set "YELLOW=%ESC%[33m"
set "CYAN=%ESC%[36m"
set "RESET=%ESC%[0m"
REM 配置变量
set "CONDA_ENV_NAME=intelligence_env"
set "PROJECT_PATH=%~dp0.."
set "PYTHON_VERSION=3.13"
echo %CYAN%===============================================%RESET%
echo %CYAN% 情报数据处理系统启动器%RESET%
echo %CYAN%===============================================%RESET%
echo.
REM 检查项目路径
if not exist "%PROJECT_PATH%\main.py" (
echo %RED%错误: 项目路径不存在或main.py文件未找到%RESET%
echo %YELLOW%当前路径: %PROJECT_PATH%%RESET%
pause
exit /b 1
)
echo %GREEN%✓ 项目路径检查通过%RESET%
REM 检查Python是否安装
echo %CYAN%检查Python环境...%RESET%
python --version >nul 2>&1
if %errorLevel% neq 0 (
echo %RED%Python未安装或未添加到PATH%RESET%
echo %YELLOW%正在尝试检测Anaconda...%RESET%
REM 检查Anaconda
where conda >nul 2>&1
if %errorLevel% neq 0 (
echo %RED%Anaconda未安装%RESET%
echo %YELLOW%请安装Python或Anaconda后重试%RESET%
echo %CYAN%下载地址: https://www.python.org/downloads/%RESET%
echo %CYAN%或: https://www.anaconda.com/products/distribution%RESET%
pause
exit /b 1
) else (
echo %GREEN%✓ 检测到Anaconda%RESET%
conda --version
)
) else (
echo %GREEN%✓ Python已安装%RESET%
python --version
)
REM 检查Conda环境
echo %CYAN%检查Conda环境: %CONDA_ENV_NAME%%RESET%
conda env list | findstr /i "%CONDA_ENV_NAME%" >nul 2>&1
if %errorLevel% neq 0 (
echo %YELLOW%环境不存在,正在创建...%RESET%
conda create -n %CONDA_ENV_NAME% python=%PYTHON_VERSION% -y
if %errorLevel% neq 0 (
echo %RED%环境创建失败%RESET%
pause
exit /b 1
)
echo %GREEN%✓ 环境创建成功%RESET%
) else (
echo %GREEN%✓ 环境已存在%RESET%
)
REM 激活环境
echo %CYAN%激活Conda环境...%RESET%
call conda activate %CONDA_ENV_NAME%
if %errorLevel% neq 0 (
echo %RED%环境激活失败%RESET%
pause
exit /b 1
)
echo %GREEN%✓ 环境激活成功%RESET%
REM 检查依赖
echo %CYAN%检查Python依赖...%RESET%
if exist "%PROJECT_PATH%\requirements.txt" (
echo %YELLOW%安装/更新依赖包...%RESET%
pip install -r "%PROJECT_PATH%\requirements.txt" --quiet
if %errorLevel% neq 0 (
echo %YELLOW%依赖安装失败,尝试继续运行...%RESET%
) else (
echo %GREEN%✓ 依赖安装完成%RESET%
)
) else (
echo %YELLOW%未找到requirements.txt,跳过依赖安装%RESET%
)
REM 切换到项目目录
echo %CYAN%切换到项目目录: %PROJECT_PATH%%RESET%
cd /d "%PROJECT_PATH%"
REM 检查配置文件
if not exist "config.py" (
echo %YELLOW%警告: 未找到config.py配置文件%RESET%
echo %CYAN%将使用默认配置运行%RESET%
)
REM 显示启动信息
echo.
echo %GREEN%===============================================%RESET%
echo %GREEN% 启动情报数据处理系统%RESET%
echo %GREEN%===============================================%RESET%
echo.
echo %CYAN%环境信息:%RESET%
echo Conda环境: %CONDA_ENV_NAME%
echo 项目路径: %PROJECT_PATH%
echo Python版本:
python --version
echo.
echo %YELLOW%按 Ctrl+C 停止系统%RESET%
echo.
REM 启动系统
echo %CYAN%启动情报数据处理系统主程序...%RESET%
python main.py
echo.
echo %CYAN%情报数据处理系统已停止%RESET%
pause
+53550
View File
File diff suppressed because it is too large Load Diff
+2834
View File
File diff suppressed because it is too large Load Diff
+21 -4
View File
@@ -11,17 +11,32 @@ log = CrossPlatformLog.get_logger("Main")
class IntelligenceSystem:
def __init__(self, db_config=None):
"""初始化系统(仅作为容器,不包含业务逻辑)"""
def __init__(self, db_config=None, run_all_on_startup=False):
"""初始化系统(仅作为容器,不包含业务逻辑)
Args:
db_config: 数据库配置
run_all_on_startup: 启动时是否立即执行所有到期任务(默认False)
"""
self.scheduler = TaskScheduler(Config.MYSQL_CONFIG, max_workers=5)
self._running = False
log.info("情报系统已初始化(Cron模式)")
self.run_all_on_startup = run_all_on_startup
log.info(f"情报系统已初始化(Cron模式),启动时执行任务: {run_all_on_startup}")
def start(self):
"""启动系统主入口"""
self._running = True
self._setup_signal_handlers()
log.info("系统启动 - 运行在Cron调度模式")
# 启动时执行所有到期任务(如果开关开启)
if self.run_all_on_startup:
print(f"\n{'='*60}")
print("🚀 启动时执行所有到期任务...")
print(f"{'='*60}\n")
log.info("启动时执行所有到期任务")
result = self.scheduler.check_and_run_tasks(print_empty_status=True)
print(f"\n启动任务执行完成: 总数={result['总任务数']}, 成功={result['成功']}, 失败={result['失败']}\n")
# 时间追踪变量
last_status_print_time = time.time() # 上次打印状态的时间
@@ -110,7 +125,9 @@ class IntelligenceSystem:
if __name__ == "__main__":
try:
# 启动系统 - 仅作为入口,不包含调度逻辑
system = IntelligenceSystem()
# run_all_on_startup=True: 启动时立即执行所有到期任务
# run_all_on_startup=False: 启动时不执行任务,等待下次调度周期
system = IntelligenceSystem(run_all_on_startup=False)
system.start()
except Exception as e:
log.critical("情报系统启动失败", exc_info=True)
Binary file not shown.
@@ -0,0 +1,301 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>汽车后市场情报报告</title>
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans+SC:wght@300;400;500;700&display=swap" rel="stylesheet">
<style>
:root {
--primary: #3498db;
--secondary: #2ecc71;
--accent: #e74c3c;
--dark: #2c3e50;
--light: #f8f9fa;
--border: #e0e0e0;
}
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Noto Sans SC', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.8;
color: #333;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
padding: 20px;
}
.report-container {
max-width: 1200px;
margin: 0 auto;
padding: 40px;
background: white;
box-shadow: 0 10px 40px rgba(0,0,0,0.1);
border-radius: 12px;
}
.report-header {
text-align: center;
padding-bottom: 30px;
border-bottom: 3px solid var(--primary);
margin-bottom: 40px;
}
.report-header h1 {
color: var(--dark);
font-size: 2.5em;
margin-bottom: 10px;
}
.report-header .report-date {
color: #666;
font-size: 1.1em;
}
h1 {
color: var(--dark);
font-size: 2em;
margin: 30px 0 20px 0;
padding-bottom: 10px;
border-bottom: 2px solid var(--primary);
}
h2 {
color: var(--dark);
font-size: 1.6em;
margin: 25px 0 15px 0;
padding-left: 10px;
border-left: 4px solid var(--primary);
}
h3 {
color: var(--dark);
font-size: 1.3em;
margin: 20px 0 10px 0;
}
h4 {
color: #555;
font-size: 1.1em;
margin: 15px 0 8px 0;
}
p {
margin: 12px 0;
text-align: justify;
}
ul, ol {
margin: 15px 0;
padding-left: 30px;
}
li {
margin: 8px 0;
}
/* 表格样式 */
table {
width: 100%;
border-collapse: collapse;
margin: 25px 0;
box-shadow: 0 2px 15px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}
table thead {
background: linear-gradient(135deg, var(--primary) 0%, #2980b9 100%);
color: white;
}
table th {
padding: 15px;
text-align: left;
font-weight: 600;
}
table td {
padding: 12px 15px;
border-bottom: 1px solid var(--border);
}
table tbody tr:hover {
background-color: #f5f5f5;
}
table tbody tr:last-child td {
border-bottom: none;
}
/* 代码块样式 */
pre {
background: #f4f4f4;
border: 1px solid var(--border);
border-radius: 6px;
padding: 15px;
overflow-x: auto;
margin: 20px 0;
}
code {
background: #f4f4f4;
padding: 2px 6px;
border-radius: 3px;
font-family: 'Courier New', monospace;
font-size: 0.9em;
}
pre code {
background: none;
padding: 0;
}
/* 链接样式 */
a {
color: var(--primary);
text-decoration: none;
border-bottom: 1px dotted var(--primary);
transition: all 0.3s;
}
a:hover {
color: var(--accent);
border-bottom-color: var(--accent);
}
/* 新闻列表样式 */
.news-item {
background: #f9f9f9;
border-left: 4px solid var(--secondary);
padding: 15px 20px;
margin: 15px 0;
border-radius: 6px;
transition: all 0.3s;
}
.news-item:hover {
background: #f0f0f0;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.news-item h3 {
margin-top: 0;
color: var(--dark);
}
.news-item .news-meta {
color: #666;
font-size: 0.9em;
margin-top: 10px;
}
.news-item .news-category {
display: inline-block;
background: var(--secondary);
color: white;
padding: 3px 10px;
border-radius: 12px;
font-size: 0.85em;
margin-right: 10px;
}
/* 统计信息样式 */
.stats-box {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 25px;
border-radius: 10px;
margin: 25px 0;
}
.stats-box h2 {
color: white;
border: none;
padding: 0;
margin: 0 0 15px 0;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin-top: 20px;
}
.stat-item {
text-align: center;
}
.stat-number {
font-size: 2.5em;
font-weight: bold;
margin-bottom: 5px;
}
.stat-label {
font-size: 0.9em;
opacity: 0.9;
}
/* 响应式设计 */
@media (max-width: 768px) {
.report-container {
padding: 20px;
}
.report-header h1 {
font-size: 1.8em;
}
h1 {
font-size: 1.6em;
}
h2 {
font-size: 1.3em;
}
table {
font-size: 0.9em;
}
table th,
table td {
padding: 8px;
}
}
/* 打印样式 */
@media print {
body {
background: white;
padding: 0;
}
.report-container {
box-shadow: none;
padding: 0;
}
}
</style>
</head>
<body>
<div class="report-container">
<h1>汽车后市场情报日报</h1>
<h2>报告时间</h2>
<p><strong>生成时间</strong>: 2025-10-29 17:35:46
<strong>时间范围</strong>: 2025-10-28 17:35:46 至 2025-10-29 17:35:46</p>
<h2>数据统计</h2>
<ul>
<li><strong>相关文章数</strong>: 0</li>
</ul>
<h2>相关新闻</h2>
<p>昨日无汽车后市场相关的新闻</p>
</div>
</body>
</html>
@@ -0,0 +1,311 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>汽车后市场情报报告</title>
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans+SC:wght@300;400;500;700&display=swap" rel="stylesheet">
<style>
:root {
--primary: #3498db;
--secondary: #2ecc71;
--accent: #e74c3c;
--dark: #2c3e50;
--light: #f8f9fa;
--border: #e0e0e0;
}
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Noto Sans SC', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.8;
color: #333;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
padding: 20px;
}
.report-container {
max-width: 1200px;
margin: 0 auto;
padding: 40px;
background: white;
box-shadow: 0 10px 40px rgba(0,0,0,0.1);
border-radius: 12px;
}
.report-header {
text-align: center;
padding-bottom: 30px;
border-bottom: 3px solid var(--primary);
margin-bottom: 40px;
}
.report-header h1 {
color: var(--dark);
font-size: 2.5em;
margin-bottom: 10px;
}
.report-header .report-date {
color: #666;
font-size: 1.1em;
}
h1 {
color: var(--dark);
font-size: 2em;
margin: 30px 0 20px 0;
padding-bottom: 10px;
border-bottom: 2px solid var(--primary);
}
h2 {
color: var(--dark);
font-size: 1.6em;
margin: 25px 0 15px 0;
padding-left: 10px;
border-left: 4px solid var(--primary);
}
h3 {
color: var(--dark);
font-size: 1.3em;
margin: 20px 0 10px 0;
}
h4 {
color: #555;
font-size: 1.1em;
margin: 15px 0 8px 0;
}
p {
margin: 12px 0;
text-align: justify;
}
ul, ol {
margin: 15px 0;
padding-left: 30px;
}
li {
margin: 8px 0;
}
/* 表格样式 */
table {
width: 100%;
border-collapse: collapse;
margin: 25px 0;
box-shadow: 0 2px 15px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}
table thead {
background: linear-gradient(135deg, var(--primary) 0%, #2980b9 100%);
color: white;
}
table th {
padding: 15px;
text-align: left;
font-weight: 600;
}
table td {
padding: 12px 15px;
border-bottom: 1px solid var(--border);
}
table tbody tr:hover {
background-color: #f5f5f5;
}
table tbody tr:last-child td {
border-bottom: none;
}
/* 代码块样式 */
pre {
background: #f4f4f4;
border: 1px solid var(--border);
border-radius: 6px;
padding: 15px;
overflow-x: auto;
margin: 20px 0;
}
code {
background: #f4f4f4;
padding: 2px 6px;
border-radius: 3px;
font-family: 'Courier New', monospace;
font-size: 0.9em;
}
pre code {
background: none;
padding: 0;
}
/* 链接样式 */
a {
color: var(--primary);
text-decoration: none;
border-bottom: 1px dotted var(--primary);
transition: all 0.3s;
}
a:hover {
color: var(--accent);
border-bottom-color: var(--accent);
}
/* 新闻列表样式 */
.news-item {
background: #f9f9f9;
border-left: 4px solid var(--secondary);
padding: 15px 20px;
margin: 15px 0;
border-radius: 6px;
transition: all 0.3s;
}
.news-item:hover {
background: #f0f0f0;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.news-item h3 {
margin-top: 0;
color: var(--dark);
}
.news-item .news-meta {
color: #666;
font-size: 0.9em;
margin-top: 10px;
}
.news-item .news-category {
display: inline-block;
background: var(--secondary);
color: white;
padding: 3px 10px;
border-radius: 12px;
font-size: 0.85em;
margin-right: 10px;
}
/* 统计信息样式 */
.stats-box {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 25px;
border-radius: 10px;
margin: 25px 0;
}
.stats-box h2 {
color: white;
border: none;
padding: 0;
margin: 0 0 15px 0;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin-top: 20px;
}
.stat-item {
text-align: center;
}
.stat-number {
font-size: 2.5em;
font-weight: bold;
margin-bottom: 5px;
}
.stat-label {
font-size: 0.9em;
opacity: 0.9;
}
/* 响应式设计 */
@media (max-width: 768px) {
.report-container {
padding: 20px;
}
.report-header h1 {
font-size: 1.8em;
}
h1 {
font-size: 1.6em;
}
h2 {
font-size: 1.3em;
}
table {
font-size: 0.9em;
}
table th,
table td {
padding: 8px;
}
}
/* 打印样式 */
@media print {
body {
background: white;
padding: 0;
}
.report-container {
box-shadow: none;
padding: 0;
}
}
</style>
</head>
<body>
<div class="report-container">
<h1>汽车后市场情报周报</h1>
<h2>报告时间</h2>
<p><strong>生成时间</strong>: 2025-10-29 17:36:37
<strong>时间范围</strong>: 2025-10-22 17:36:37 至 2025-10-29 17:36:37</p>
<h2>数据统计</h2>
<ul>
<li><strong>相关文章数</strong>: 1</li>
</ul>
<h2>汽车后市场相关新闻</h2>
<p>共找到 1 篇相关新闻:</p>
<h3>1. 2025年全国汽车以旧换新补贴申请量突破1000万份</h3>
<ul class="news-list">
<li><strong>分类</strong>: 二手车</li>
<li><strong>标签</strong>: ["二手车", "政策补贴"]</li>
<li><strong>摘要</strong>: 记者从商务部了解到,截至10月22日,2025年汽车以旧换新补贴申请量突破1000万份,其中汽车报废更新超340万份,置换更新超660万份。</li>
<li><strong>链接</strong>: <a href="http://www.chinanews.com/cj/2025/10-23/10503300.shtml" rel="noopener noreferrer" target="_blank">http://www.chinanews.com/cj/2025/10-23/10503300.shtml</a></li>
<li><strong>发布时间</strong>: 2025-10-23 08:35:31</li>
<li><strong>相关度评分</strong>: 70</li>
<li><strong>分析说明</strong>: 新闻涉及汽车以旧换新补贴申请量,其中包含置换更新超660万份,直接关联二手车流通环节,属于汽车后市场中二手车领域的政策动态。</li>
</ul>
</div>
</body>
</html>
@@ -0,0 +1,456 @@
# RSS数据AI处理模块
import os
import sys
import json
import time
import pandas as pd
from typing import List, Dict, Any, Optional
from datetime import datetime
from openai import OpenAI
# 添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from utils.mysql_agent import MySQLAgent
from utils.logger import log
from config import Config
class RSSDataAIProcessor:
"""RSS数据AI处理主类
负责:
- 从数据库加载未处理的RSS数据
- 调用AI进行分析
- 保存分析结果
- 更新处理状态
"""
def __init__(self):
"""初始化AI处理器"""
self.log = log.bind(module="RSSDataAIProcessor")
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
# 从Config读取配置
self.source_table = Config.AI_PROCESSOR_CONFIG['source_table']
self.ai_table = Config.AI_PROCESSOR_CONFIG['result_table']
self.default_batch_size = Config.AI_PROCESSOR_CONFIG['batch_size']
self.default_delay = Config.AI_PROCESSOR_CONFIG['delay']
# 初始化百度千帆API客户端
self.api_key = Config.BAIDU_AI_CONFIG.get('api_key')
if self.api_key:
self.ai_client = OpenAI(
base_url='https://qianfan.baidubce.com/v2',
api_key=self.api_key
)
self.model = Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k')
self.log.info("RSS数据AI处理器初始化完成")
else:
self.ai_client = None
self.log.warning("百度AI未配置,AI处理功能将不可用")
self.log.warning("请在config.py中配置 BAIDU_AI_CONFIG['api_key']")
def is_configured(self) -> bool:
"""检查是否已配置API"""
return self.ai_client is not None
def main(self, batch_size: Optional[int] = 200, delay: Optional[float] = None) -> Dict[str, Any]:
"""主程序:批量处理RSS数据的完整流程
Args:
batch_size: 批量处理的记录数,None则使用配置的默认值
delay: 每条记录之间的延迟(秒),None则使用配置的默认值
Returns:
dict: 处理结果统计信息
"""
# 使用传入参数或默认配置
batch_size = batch_size or self.default_batch_size
delay = delay or self.default_delay
try:
# 1. 检查配置
if not self.is_configured():
error_msg = "百度AI未配置,请在config.py中配置 BAIDU_AI_CONFIG['api_key']"
self.log.error(error_msg)
return {
'success': False,
'message': error_msg,
'processed_count': 0,
'failed_count': 0
}
self.log.info(f"开始批量处理数据,批次大小: {batch_size}, 延迟: {delay}")
# 2. 准备数据库表结构
self.ensure_ai_processed_column()
if not self.db_agent.table_exists(self.ai_table):
self.create_ai_result_table()
# 3. 加载未处理的数据
df = self.load_unprocessed_data(batch_size)
if df.empty:
self.log.info("没有需要处理的数据")
return {
'success': True,
'message': '没有需要处理的数据',
'processed_count': 0,
'failed_count': 0
}
# 4. 处理每条记录
results = []
processed_ids = []
failed_count = 0
for idx, record in df.iterrows():
try:
self.log.debug(f"处理记录 {record['id']} ({idx + 1}/{len(df)})")
result = self.process_single_record(record.to_dict())
if result:
results.append(result)
processed_ids.append(record['id'])
else:
failed_count += 1
# 延迟,避免API限流
if delay > 0 and idx < len(df) - 1:
time.sleep(delay)
except Exception as e:
self.log.error(f"处理记录 {record['id']} 异常: {str(e)}", exc_info=True)
failed_count += 1
# 5. 保存结果
saved_count = 0
if results:
saved_count = self.save_ai_results(results)
# 6. 标记为已处理
if processed_ids:
self.mark_as_processed(processed_ids)
# 7. 返回统计信息
stats = {
'success': True,
'message': 'AI处理完成',
'total_count': len(df),
'processed_count': len(processed_ids),
'saved_count': saved_count,
'failed_count': failed_count,
'relevant_count': sum(1 for r in results if r.get('是否相关')),
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.log.info("批量处理完成", **stats)
return stats
except Exception as e:
error_msg = f"批量处理失败: {str(e)}"
self.log.error(error_msg, exc_info=True)
return {
'success': False,
'message': error_msg,
'processed_count': 0,
'failed_count': 0
}
def ensure_ai_processed_column(self):
"""确保processed_rss_data表有"是否ai处理"字段"""
try:
# 检查字段是否存在
check_sql = """
SELECT COUNT(*) as count
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
AND COLUMN_NAME = '是否ai处理'
"""
result = self.db_agent.execute_sql(
check_sql,
params=(Config.MYSQL_CONFIG['database'], self.source_table),
fetch=True
)
if result[0][0] == 0:
# 字段不存在,添加字段
alter_sql = f"""
ALTER TABLE {self.source_table}
ADD COLUMN 是否ai处理 TINYINT(1) DEFAULT 0 COMMENT 'AI处理标记:0-未处理,1-已处理'
"""
self.db_agent.execute_sql(alter_sql)
self.log.info(f"成功为表 {self.source_table} 添加 '是否ai处理' 字段")
else:
self.log.debug(f"{self.source_table} 已存在 '是否ai处理' 字段")
except Exception as e:
self.log.error(f"检查/添加字段失败: {str(e)}", exc_info=True)
raise
def create_ai_result_table(self):
"""创建AI处理结果表(使用安全方法,确保不会删除现有数据)"""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.ai_table} (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
source_id INT NOT NULL COMMENT '来源数据IDprocessed_rss_data.id',
文章标题 TEXT COMMENT '文章标题',
文章摘要 TEXT COMMENT '文章摘要',
发布时间 DATETIME COMMENT '发布时间',
来源URL VARCHAR(1024) COMMENT '来源URL',
文章链接 VARCHAR(1024) COMMENT '文章链接',
是否相关 BOOLEAN COMMENT 'AI判断是否与汽车后市场相关',
相关度评分 INT COMMENT '相关度评分(0-100',
标签 TEXT COMMENT 'AI生成的标签(JSON数组)',
分类 VARCHAR(100) COMMENT 'AI判断的主要分类',
分析说明 TEXT COMMENT 'AI分析说明',
处理时间 DATETIME COMMENT 'AI处理时间',
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
INDEX idx_source_id (source_id),
INDEX idx_是否相关 (是否相关),
INDEX idx_分类 (分类),
INDEX idx_处理时间 (处理时间)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='RSS数据AI分析结果表'
"""
try:
# 使用安全方法创建表(如果不存在),确保不会删除现有数据
self.db_agent.create_table_if_not_exists(
table_name=self.ai_table,
create_sql=create_sql
)
except Exception as e:
self.log.error(f"创建AI结果表失败(可能是数据库连接问题): {str(e)}", exc_info=True)
raise
def load_unprocessed_data(self, limit: int = 100) -> pd.DataFrame:
"""加载未经AI处理的数据
Args:
limit: 每次处理的记录数量
Returns:
未处理的数据DataFrame
"""
try:
sql = f"""
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
FROM {self.source_table}
WHERE 是否ai处理 = 0 OR 是否ai处理 IS NULL
ORDER BY 创建时间 DESC
LIMIT %s
"""
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
self.log.info(f"成功加载 {len(df)} 条未处理的数据")
return df
except Exception as e:
self.log.error(f"加载未处理数据失败: {str(e)}", exc_info=True)
return pd.DataFrame()
def analyze_news(self, title: str, summary: str) -> Dict[str, Any]:
"""调用AI分析新闻(保留原有提示词)"""
# 构建提示词(保留原有格式)
prompt = f"""分析以下新闻是否与汽车后市场相关,返回JSON格式:
标题:{title}
摘要:{summary}
返回格式:
{{
"is_relevant": true/false,
"relevance_score": 0-100,
"tags": ["标签1", "标签2"],
"category": "分类(配件/维修/保养/改装/美容/装饰/二手车/金融/保险/其他)",
"analysis": "简要说明"
}}
注意:只返回JSON格式的结果,不要包含其他说明文字。"""
try:
# 调用百度千帆API
response = self.ai_client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": prompt
}]
)
# 获取响应内容
raw_content = response.choices[0].message.content
# 解析JSON(处理markdown包裹)
if '```json' in raw_content:
json_str = raw_content.split('```json')[1].split('```')[0].strip()
elif '```' in raw_content:
json_str = raw_content.split('```')[1].split('```')[0].strip()
else:
json_str = raw_content.strip()
result = json.loads(json_str)
# 补充缺失字段
return {
'is_relevant': result.get('is_relevant', False),
'relevance_score': result.get('relevance_score', 0),
'tags': result.get('tags', []),
'category': result.get('category', '其他'),
'analysis': result.get('analysis', '')
}
except json.JSONDecodeError as e:
self.log.warning(f"JSON解析失败: {str(e)}, 原始响应: {raw_content[:200]}")
return {
'is_relevant': False,
'relevance_score': 0,
'tags': [],
'category': '其他',
'analysis': f"解析失败: {raw_content[:100]}"
}
except Exception as e:
self.log.error(f"AI调用异常: {str(e)}", exc_info=True)
return {
'is_relevant': False,
'relevance_score': 0,
'tags': [],
'category': '其他',
'analysis': f"处理异常: {str(e)}"
}
def process_single_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理单条记录
Args:
record: 记录字典
Returns:
处理结果字典
"""
if not self.is_configured():
self.log.error("AI客户端未配置,无法处理数据")
return None
try:
title = str(record.get('文章标题', '')).strip()
summary = str(record.get('文章摘要', '')).strip()
if not title and not summary:
self.log.warning(f"记录 {record.get('id')} 标题和摘要均为空,跳过处理")
return None
# 调用AI分析
analysis_result = self.analyze_news(title, summary)
# 构建结果记录
result = {
'source_id': record['id'],
'文章标题': title,
'文章摘要': summary,
'发布时间': record.get('发布时间'),
'来源URL': record.get('来源URL'),
'文章链接': record.get('文章链接'),
'是否相关': analysis_result.get('is_relevant', False),
'相关度评分': analysis_result.get('relevance_score', 0),
'标签': json.dumps(analysis_result.get('tags', []), ensure_ascii=False),
'分类': analysis_result.get('category', '其他'),
'分析说明': analysis_result.get('analysis', ''),
'处理时间': datetime.now()
}
return result
except Exception as e:
self.log.error(f"处理记录 {record.get('id')} 失败: {str(e)}", exc_info=True)
return None
def save_ai_results(self, results: List[Dict[str, Any]]) -> int:
"""保存AI处理结果
Args:
results: 处理结果列表
Returns:
成功保存的记录数
"""
if not results:
return 0
try:
df = pd.DataFrame(results)
inserted = self.db_agent.insert_from_df(
table_name=self.ai_table,
df=df,
ignore_duplicates=True
)
self.log.info(f"成功保存 {inserted} 条AI处理结果")
return inserted
except Exception as e:
self.log.error(f"保存AI处理结果失败: {str(e)}", exc_info=True)
return 0
def mark_as_processed(self, ids: List[int]) -> bool:
"""标记记录为已处理
Args:
ids: 记录ID列表
Returns:
是否成功
"""
if not ids:
return True
try:
id_placeholders = ','.join(['%s'] * len(ids))
sql = f"""
UPDATE {self.source_table}
SET 是否ai处理 = 1
WHERE id IN ({id_placeholders})
"""
self.db_agent.execute_sql(sql, params=ids)
self.log.info(f"成功标记 {len(ids)} 条记录为已处理")
return True
except Exception as e:
self.log.error(f"标记记录为已处理失败: {str(e)}", exc_info=True)
return False
if __name__ == "__main__":
"""命令行直接运行"""
# 实例化处理器并调用main方法
processor = RSSDataAIProcessor()
result = processor.main()
# 输出结果
if result['success']:
print("\n" + "=" * 60)
print("✓ AI处理完成")
print("=" * 60)
print(f"总记录数: {result.get('total_count', 0)}")
print(f"成功处理: {result.get('processed_count', 0)}")
print(f"保存记录: {result.get('saved_count', 0)}")
print(f"失败记录: {result.get('failed_count', 0)}")
print(f"相关记录: {result.get('relevant_count', 0)}")
print(f"处理时间: {result.get('processing_time', '')}")
print("=" * 60 + "\n")
else:
print("\n" + "=" * 60)
print("✗ 处理失败")
print("=" * 60)
print(f"错误信息: {result['message']}")
print("\n提示: 请设置环境变量")
print(" Windows: $env:BAIDU_API_KEY = 'your_key'")
print(" Linux/Mac: export BAIDU_API_KEY='your_key'")
print("=" * 60 + "\n")
View File
+37
View File
@@ -0,0 +1,37 @@
汽车配件
汽车维修
汽车保养
汽车改装
汽车美容
汽车装饰
轮胎
机油
刹车片
火花塞
滤清器
蓄电池
车灯
保险杠
车门
座椅
方向盘
仪表盘
音响
导航
汽车用品
车载设备
汽车电子
汽车安全
汽车保险
二手车
汽车交易
汽车金融
汽车租赁
汽车服务
4S店
汽修店
汽车后市场
汽车产业链
汽车供应链
汽车
+431
View File
@@ -0,0 +1,431 @@
# RSS数据处理模块 - 汽车后市场新闻分词和过滤
import pandas as pd
import jieba
import jieba.posseg as pseg
import os
import sys
from typing import List, Dict, Any, Optional
from datetime import datetime
# 添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from utils.mysql_agent import MySQLAgent
from utils.logger import log
from config import Config
class RSSDataProcessor:
"""RSS数据处理器 - 专门处理汽车后市场相关新闻"""
def __init__(self):
"""初始化处理器"""
self.log = log.bind(module="RSSDataProcessor")
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
self.table_name = "collector_rss_subscriptions"
self.processed_table_name = "processed_rss_data"
# 获取项目根目录
current_dir = os.path.dirname(os.path.abspath(__file__))
self.project_root = os.path.dirname(current_dir)
# 设置文件路径(相对于项目根目录)
self.keywords_file = os.path.join(self.project_root, "processors", "keywords.txt")
self.stopwords_file = os.path.join(self.project_root, "processors", "stopwords.txt")
# 汽车后市场相关关键词(默认值,实际从文件加载)
self.auto_aftermarket_keywords = {
'汽车配件', '汽车维修', '汽车保养', '汽车改装', '汽车美容', '汽车装饰',
'轮胎', '机油', '刹车片', '火花塞', '滤清器', '蓄电池', '车灯',
'保险杠', '车门', '座椅', '方向盘', '仪表盘', '音响', '导航',
'汽车用品', '车载设备', '汽车电子', '汽车安全', '汽车保险',
'二手车', '汽车交易', '汽车金融', '汽车租赁', '汽车服务',
'4S店', '汽修店', '汽车后市场', '汽车产业链', '汽车供应链', '汽车', ''
}
# 停用词表(默认值,实际从文件加载)
self.stopwords = {
'', '', '', '', '', '', '', '', '', '', '', '', '一个',
'', '', '', '', '', '', '', '', '', '', '没有', '', '',
'自己', '', '', '', '', '', '我们', '你们', '他们', '什么', '怎么',
'为什么', '因为', '所以', '但是', '然后', '如果', '虽然', '而且', '或者',
'可以', '应该', '必须', '需要', '想要', '希望', '觉得', '认为', '知道',
'了解', '明白', '清楚', '简单', '容易', '困难', '重要', '主要', '基本',
'一般', '特别', '非常', '十分', '相当', '比较', '更加', '', '',
'已经', '正在', '将要', '可能', '也许', '大概', '大约', '左右', '上下',
'今天', '明天', '昨天', '现在', '以前', '以后', '时候', '时间', '地方',
'这里', '那里', '这样', '那样', '如此', '这样', '那样', '如何', '怎样'
}
# 缓存关键词,避免重复加载
self._cached_keywords = None
self.log.info("RSS数据处理器初始化完成")
def load_keywords(self, keywords_file: Optional[str] = None) -> set:
"""从文件加载汽车后市场关键词(带缓存)"""
# 如果已经缓存,直接返回
if self._cached_keywords is not None:
return self._cached_keywords
# 使用默认路径(项目根目录下的文件)
if keywords_file is None:
keywords_file = self.keywords_file
keywords = set()
try:
if os.path.exists(keywords_file):
with open(keywords_file, 'r', encoding='utf-8') as f:
keywords = set(line.strip() for line in f if line.strip())
self.log.info(f"成功加载汽车后市场关键词,共 {len(keywords)}")
else:
self.log.warning(f"关键词文件不存在: {keywords_file}")
# 使用默认关键词
keywords = self.auto_aftermarket_keywords
except Exception as e:
self.log.error(f"加载关键词失败: {str(e)}")
keywords = self.auto_aftermarket_keywords
# 缓存关键词
self._cached_keywords = keywords
return keywords
def load_rss_data(self, limit: int = 1000) -> pd.DataFrame:
"""从数据库加载未处理的RSS数据"""
try:
sql = f"""
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
FROM {self.table_name}
WHERE 是否已处理 = 0
ORDER BY 发布时间 DESC
LIMIT %s
"""
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
self.log.info(f"成功加载 {len(df)} 条未处理的RSS数据")
return df
except Exception as e:
self.log.error(f"加载RSS数据失败: {str(e)}", exc_info=True)
return pd.DataFrame()
def mark_as_processed(self, ids: List[int]) -> bool:
"""标记指定ID的数据为已处理"""
if not ids:
return True
try:
# 将ID列表转换为字符串格式用于SQL IN语句
id_placeholders = ','.join(['%s'] * len(ids))
sql = f"""
UPDATE {self.table_name}
SET 是否已处理 = 1
WHERE id IN ({id_placeholders})
"""
result = self.db_agent.execute_sql(sql, params=ids)
self.log.info(f"成功标记 {len(ids)} 条数据为已处理")
return True
except Exception as e:
self.log.error(f"标记数据为已处理失败: {str(e)}", exc_info=True)
return False
def load_stopwords(self, stopwords_file: Optional[str] = None) -> set:
"""加载停用词表"""
# 使用默认路径(项目根目录下的文件)
if stopwords_file is None:
stopwords_file = self.stopwords_file
try:
if os.path.exists(stopwords_file):
with open(stopwords_file, 'r', encoding='utf-8') as f:
stopwords = set(line.strip() for line in f if line.strip())
self.log.info(f"成功加载停用词表,共 {len(stopwords)} 个词")
return stopwords
else:
self.log.warning(f"停用词文件不存在: {stopwords_file},使用默认停用词")
return self.stopwords
except Exception as e:
self.log.error(f"加载停用词表失败: {str(e)}")
return self.stopwords
def add_custom_dict(self, custom_dict_file: Optional[str] = None):
"""添加自定义词典"""
if custom_dict_file and os.path.exists(custom_dict_file):
try:
jieba.load_userdict(custom_dict_file)
self.log.info("成功加载自定义词典")
except Exception as e:
self.log.warning(f"加载自定义词典失败: {str(e)}")
# 从文件加载汽车后市场关键词并添加到jieba词典
keywords = self.load_keywords()
for keyword in keywords:
jieba.add_word(keyword, freq=1000, tag='n')
def segment_and_pos(self, text: str, stopwords: set) -> List[str]:
"""分词并标注词性,过滤停用词"""
if not text or pd.isna(text):
return []
words = pseg.cut(str(text))
result = []
# 汽车后市场相关的词性标签
allowed_flags = {'n', 'vn', 'np', 'ns', 'nr', 'nt'} # 名词、动词、动名词、名词短语、处所词、人名、机构名
for word, flag in words:
word = word.strip()
if (len(word) >= 1 and
word not in stopwords and
flag in allowed_flags and
not word.isdigit()): # 过滤纯数字
result.append(word)
return result
def is_auto_aftermarket_related(self, text: str) -> bool:
"""判断文本是否与汽车后市场相关"""
if not text:
return False
text_lower = str(text).lower()
# 从文件加载关键词
keywords = self.load_keywords()
# 检查是否包含汽车后市场关键词
for keyword in keywords:
if keyword in text_lower:
return True
# 检查分词结果中是否包含相关词汇
words = self.segment_and_pos(text, self.stopwords)
for word in words:
if word in keywords:
return True
return False
def process_dataframe(self, df: pd.DataFrame, stopwords: set) -> pd.DataFrame:
"""处理整个DataFrame,进行分词和过滤"""
if df.empty:
self.log.warning("输入的DataFrame为空")
return df
# 确保所有文本都是字符串,并处理NaN值
df['文章标题'] = df['文章标题'].fillna('').astype(str)
df['文章摘要'] = df['文章摘要'].fillna('').astype(str)
# 合并标题和摘要进行分词
df['combined_text'] = df['文章标题'] + ' ' + df['文章摘要']
# 分词处理
df['segmented_words'] = df['combined_text'].apply(lambda x: self.segment_and_pos(x, stopwords))
# 判断是否与汽车后市场相关(只要出现关键词就入库)
df['is_auto_related'] = df['combined_text'].apply(self.is_auto_aftermarket_related)
df['is_filtered'] = df['is_auto_related']
# 添加处理时间
df['processed_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.log.info(f"数据处理完成,共处理 {len(df)} 条记录")
return df
def filter_auto_aftermarket_news(self, df: pd.DataFrame) -> pd.DataFrame:
"""过滤出汽车后市场相关的新闻"""
if df.empty:
return df
# 过滤出包含关键词的文章
filtered_df = df[df['is_filtered'] == True].copy()
self.log.info(f"过滤出 {len(filtered_df)} 条汽车后市场相关新闻")
return filtered_df
def save_to_database(self, df: pd.DataFrame) -> bool:
"""保存处理结果到数据库"""
if df.empty:
self.log.warning("没有数据需要保存")
return False
try:
# 准备保存的数据
save_df = df[['文章标题', '文章摘要', '发布时间', '来源URL', '文章链接',
'segmented_words', 'is_auto_related', 'processed_time']].copy()
# 将分词结果转换为字符串
save_df['分词结果'] = save_df['segmented_words'].apply(lambda x: ' '.join(x))
# 重命名列名为中文
save_df = save_df.rename(columns={
'is_auto_related': '是否汽车相关',
'processed_time': '处理时间'
})
# 删除不需要的列
save_df = save_df.drop('segmented_words', axis=1)
# 检查目标表是否存在,不存在则创建
# 注意:如果连接失败,table_exists可能返回False,需要捕获异常
try:
table_exists = self.db_agent.table_exists(self.processed_table_name)
if not table_exists:
self.log.warning(f"{self.processed_table_name} 不存在,正在创建...")
self.create_processed_table()
else:
# 表存在时,也确保有唯一索引(安全操作,不会删除数据)
self.create_processed_table() # 这个方法会检查并添加索引,不会删除数据
except Exception as table_check_error:
# 如果检查表存在性时连接失败,记录错误但不中断
# 因为后续的插入操作会再次尝试连接
self.log.warning(f"检查表存在性时出错(可能是连接问题): {str(table_check_error)}")
# 尝试创建表(如果表已存在,CREATE TABLE IF NOT EXISTS不会报错)
try:
self.create_processed_table()
except Exception as create_error:
# 如果创建表也失败(可能是连接问题),记录错误
self.log.error(f"创建表时出错(可能是连接问题): {str(create_error)}")
# 继续尝试插入,如果表存在,插入会成功;如果表不存在,插入会失败并抛出异常
# 插入数据(ignore_duplicates=True 会跳过重复的文章链接)
# 注意:INSERT INTO + ignore_duplicates 只会跳过重复记录,不会覆盖或删除现有数据
# 如果数据库连接失败,此操作会抛出异常,不会部分成功
inserted_rows = self.db_agent.insert_from_df(
table_name=self.processed_table_name,
df=save_df,
ignore_duplicates=True # 跳过重复的文章链接,不会删除或覆盖现有数据
)
self.log.info(f"成功保存 {inserted_rows} 条处理结果到数据库")
return True
except Exception as e:
self.log.error(f"保存到数据库失败: {str(e)}", exc_info=True)
return False
def create_processed_table(self):
"""
创建处理结果表(带唯一索引保护,防止重复插入)
使用 MySQLAgent 的安全方法,确保不会删除现有数据
"""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.processed_table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
文章标题 TEXT,
文章摘要 TEXT,
发布时间 DATETIME,
来源URL VARCHAR(1024),
文章链接 VARCHAR(1024),
分词结果 TEXT,
是否汽车相关 BOOLEAN,
处理时间 DATETIME,
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"""
try:
# 使用安全方法创建表(如果不存在)
self.db_agent.create_table_if_not_exists(
table_name=self.processed_table_name,
create_sql=create_sql
)
# 使用安全方法添加唯一索引(如果不存在)
# 注意:唯一索引在创建表时不能直接包含,因为如果表已存在会报错
# 所以先创建表,再单独添加索引
self.db_agent.add_unique_index_if_not_exists(
table_name=self.processed_table_name,
index_name='uk_article_link',
column_name='文章链接',
column_length=500,
check_duplicates=True
)
except Exception as e:
# 如果创建表或添加索引失败(可能是连接问题),抛出异常
# 这样上层调用可以知道操作失败,不会误以为成功
self.log.error(f"创建/检查表失败(可能是数据库连接问题): {str(e)}", exc_info=True)
raise
def get_processing_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""获取处理统计信息"""
if df.empty:
return {}
total_count = len(df)
filtered_count = len(df[df['is_filtered'] == True])
stats = {
'total_articles': total_count,
'filtered_articles': filtered_count,
'filter_rate': filtered_count / total_count if total_count > 0 else 0,
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return stats
def process_rss_data(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]:
"""处理RSS数据的主函数"""
try:
self.log.info("开始处理RSS数据...")
# 1. 加载RSS数据
df = self.load_rss_data(limit)
if df.empty:
self.log.warning("没有加载到RSS数据")
return {'success': False, 'message': '没有数据可处理'}
# 2. 加载停用词表
stopwords = self.load_stopwords()
# 3. 添加自定义词典
self.add_custom_dict()
# 4. 处理数据
processed_df = self.process_dataframe(df, stopwords)
# 5. 过滤汽车后市场相关新闻
filtered_df = self.filter_auto_aftermarket_news(processed_df)
# 6. 获取统计信息
stats = self.get_processing_statistics(processed_df)
# 7. 保存到数据库
if save_to_db and not filtered_df.empty:
save_success = self.save_to_database(filtered_df)
stats['save_success'] = save_success
# 8. 标记数据为已处理
if not df.empty and 'id' in df.columns:
processed_ids = df['id'].tolist()
mark_success = self.mark_as_processed(processed_ids)
stats['mark_success'] = mark_success
if not mark_success:
self.log.warning("部分数据标记为已处理失败")
# 9. 输出结果
self.log.info("RSS数据处理完成", **stats)
return {
'success': True,
'message': 'RSS数据处理完成',
'statistics': stats,
'filtered_data': filtered_df
}
except Exception as e:
self.log.error(f"RSS数据处理失败: {str(e)}", exc_info=True)
return {'success': False, 'message': f'处理失败: {str(e)}'}
def main(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]:
"""主函数入口(实例方法),对外统一调用"""
return self.process_rss_data(limit=limit, save_to_db=save_to_db)
if __name__ == "__main__":
RSSDataProcessor().main(limit=5000, save_to_db=True)
+100
View File
@@ -0,0 +1,100 @@
一个
没有
自己
我们
你们
他们
什么
怎么
为什么
因为
所以
但是
然后
如果
虽然
而且
或者
可以
应该
必须
需要
想要
希望
觉得
认为
知道
了解
明白
清楚
简单
容易
困难
重要
主要
基本
一般
特别
非常
十分
相当
比较
更加
已经
正在
将要
可能
也许
大概
大约
左右
上下
今天
明天
昨天
现在
以前
以后
时候
时间
地方
这里
那里
这样
那样
如此
这样
那样
如何
怎样
View File
+81 -42
View File
@@ -1,6 +1,5 @@
import importlib
import threading
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
import croniter
@@ -246,50 +245,90 @@ class TaskScheduler:
except Exception:
pass
def _execute_task_logic(self, task: Dict[str, Any]) -> None:
"""执行任务的具体逻辑(动态导入模块)"""
start_time = time.time()
task_id = task['task_id']
module_path = task['module_path']
task_log = log.bind(task_id=task_id, module=module_path)
def _execute_task_logic(self, task):
"""
执行任务逻辑的核心方法
支持类方法、静态方法和实例方法的调用
"""
module_path = task.get('module_path')
if not module_path:
raise ValueError("任务缺少 module_path 配置")
# 解析模块路径和类名
try:
path_parts = module_path.split('.')
if len(path_parts) < 2:
raise ValueError(f"无效的模块路径: {module_path}")
module_name = '.'.join(path_parts[:-1])
class_name = path_parts[-1]
method_name = 'main' # 默认方法名
except Exception as e:
raise ValueError(f"解析模块路径失败: {str(e)}")
# 动态导入模块
try:
import importlib
module = importlib.import_module(module_name)
except ImportError as e:
raise ImportError(f"无法导入模块 {module_name}: {str(e)}")
# 获取类和方法
if not hasattr(module, class_name):
raise AttributeError(f"模块 {module_name} 中未找到类 {class_name}")
cls = getattr(module, class_name)
# 检查是否存在指定方法
if not hasattr(cls, method_name):
raise AttributeError(f"{class_name} 中未找到方法 {method_name}")
method = getattr(cls, method_name)
# 根据方法类型决定如何调用
import inspect
callable_entry = None
# 判断是否为静态方法或类方法
if isinstance(method, staticmethod):
# 静态方法可以直接调用
callable_entry = method
elif isinstance(method, classmethod):
# 类方法需要传入类作为第一个参数
callable_entry = method
else:
# 实例方法或普通函数
try:
# 尝试检查方法签名
sig = inspect.signature(method)
params = list(sig.parameters.values())
# 如果第一个参数是self且没有默认值,则认为是实例方法
if params and params[0].name == 'self' and params[0].default == inspect.Parameter.empty:
# 创建实例并获取绑定方法
instance = cls()
callable_entry = getattr(instance, method_name)
else:
# 可能是普通函数或者是带有默认self参数的方法
callable_entry = method
except Exception:
# 如果检查签名失败,默认尝试创建实例
try:
instance = cls()
callable_entry = getattr(instance, method_name)
except Exception:
# 如果创建实例也失败,则直接调用方法(适用于不需要self的特殊情况)
callable_entry = method
# 执行任务
if not callable(callable_entry):
raise TypeError(f"{module_path}.{method_name} 不是可调用对象")
try:
# 解析可调用入口(支持模块/类/函数路径)
# 若路径最终为类,先实例化再调 main;否则直接调用
target_obj = None
parts = module_path.split('.') if isinstance(module_path, str) else []
resolved = None
try:
# 尝试导入尽可能深的模块
for i in range(len(parts), 0, -1):
mod = importlib.import_module('.'.join(parts[:i]))
attr_chain = parts[i:]
obj = mod
for attr in attr_chain:
obj = getattr(obj, attr)
resolved = obj
break
except Exception:
resolved = None
if isinstance(resolved, type):
try:
target_obj = resolved() # 触发 __init__ 日志
if hasattr(target_obj, 'main') and callable(getattr(target_obj, 'main')):
task_log.debug("开始执行实例的 main()")
getattr(target_obj, 'main')()
else:
raise AttributeError(f"{resolved.__name__} 未提供可调用的 main()")
except Exception as e:
raise
else:
callable_entry = self._resolve_callable(module_path)
task_log.debug("开始执行任务入口函数")
callable_entry()
task_log.info(f"任务执行完成,耗时: {time.time() - start_time:.2f}")
# 执行任务逻辑
callable_entry()
except Exception as e:
task_log.error("任务逻辑执行失败", exc_info=True)
self.logger.error(f"任务逻辑执行失败: {str(e)}")
raise
def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime:
+1
View File
@@ -0,0 +1 @@
print("Hello, World!")
+67
View File
@@ -0,0 +1,67 @@
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "## 获取钉钉token",
"id": "4a7d18176711daad"
},
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2025-10-30T02:59:09.458462Z",
"start_time": "2025-10-30T02:59:09.015765Z"
}
},
"source": [
"from utils.Ding_api import DingAPI\n",
"\n",
"api_instance = DingAPI()\n",
"token = api_instance.get_token()\n",
"print(token)"
],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"D:\\ProgramTools\\anaconda3\\envs\\intelligence_system\\Lib\\site-packages\\requests\\__init__.py:86: RequestsDependencyWarning: Unable to find acceptable character detection dependency (chardet or charset_normalizer).\n",
" warnings.warn(\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2b166a1c8e683ee38f8d2112a7de5e05\n"
]
}
],
"execution_count": 1
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Binary file not shown.
+118 -122
View File
@@ -14,8 +14,8 @@
"id": "initial_id",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-17T05:43:18.381936Z",
"start_time": "2025-10-17T05:43:15.265036Z"
"end_time": "2025-10-29T02:25:08.582541Z",
"start_time": "2025-10-29T02:25:08.473381Z"
},
"collapsed": true
},
@@ -25,7 +25,7 @@
"output_type": "stream",
"text": [
"PROJECT_ROOT = d:\\Idea Project\\intelligence_system\n",
"\u001b[32m2025-10-23 16:56:55\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m任务调度器已初始化,最大工作线程数: 5\u001b[0m\n"
"\u001b[32m2025-10-30 13:57:07\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m任务调度器已初始化,最大工作线程数: 5\u001b[0m\n"
]
}
],
@@ -242,7 +242,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-17 13:43:18\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
"\u001b[32m2025-10-29 09:54:09\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
]
},
{
@@ -281,20 +281,36 @@
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <td>2</td>\n",
" <td>RSS基于规则数据处理</td>\n",
" <td>processor</td>\n",
" <td>processors.processor_rss_data</td>\n",
" <td>0 8,20 * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-28 20:00:00</td>\n",
" <td>2025-10-28 13:34:49</td>\n",
" <td>success</td>\n",
" <td>10</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-22 16:06:42</td>\n",
" <td>2025-10-28 13:34:50</td>\n",
" </tr>\n",
" <tr>\n",
" <td>1</td>\n",
" <td>RSS新闻订阅</td>\n",
" <td>collector</td>\n",
" <td>collectors.rss_subscriptions.NewsAPIClient</td>\n",
" <td>5 0 * * *</td>\n",
" <td>*/5 * * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-18 00:05:00</td>\n",
" <td>2025-10-17 00:05:07</td>\n",
" <td>2025-10-28 13:40:00</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" <td>success</td>\n",
" <td>4</td>\n",
" <td>495</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-16 15:47:34</td>\n",
" <td>2025-10-17 00:05:08</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>"
@@ -346,34 +362,54 @@
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2</td>\n",
" <td>RSS基于规则数据处理</td>\n",
" <td>processor</td>\n",
" <td>processors.processor_rss_data</td>\n",
" <td>0 8,20 * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-28 20:00:00</td>\n",
" <td>2025-10-28 13:34:49</td>\n",
" <td>success</td>\n",
" <td>10</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-22 16:06:42</td>\n",
" <td>2025-10-28 13:34:50</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>RSS新闻订阅</td>\n",
" <td>collector</td>\n",
" <td>collectors.rss_subscriptions.NewsAPIClient</td>\n",
" <td>5 0 * * *</td>\n",
" <td>*/5 * * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-18 00:05:00</td>\n",
" <td>2025-10-17 00:05:07</td>\n",
" <td>2025-10-28 13:40:00</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" <td>success</td>\n",
" <td>4</td>\n",
" <td>495</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-16 15:47:34</td>\n",
" <td>2025-10-17 00:05:08</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" 任务ID 任务名称 任务类型 模块路径 \\\n",
"0 1 RSS新闻订阅 collector collectors.rss_subscriptions.NewsAPIClient \n",
" 任务ID 任务名称 任务类型 模块路径 \\\n",
"0 2 RSS基于规则数据处理 processor processors.processor_rss_data \n",
"1 1 RSS新闻订阅 collector collectors.rss_subscriptions.NewsAPIClient \n",
"\n",
" Cron表达式 时区 下次运行时间 最后运行时间 \\\n",
"0 5 0 * * * Asia/Shanghai 2025-10-18 00:05:00 2025-10-17 00:05:07 \n",
" Cron表达式 时区 下次运行时间 最后运行时间 \\\n",
"0 0 8,20 * * * Asia/Shanghai 2025-10-28 20:00:00 2025-10-28 13:34:49 \n",
"1 */5 * * * * Asia/Shanghai 2025-10-28 13:40:00 2025-10-28 13:35:09 \n",
"\n",
" 运行状态 运行次数 是否活跃 is_running created_at updated_at \n",
"0 success 4 1 0 2025-10-16 15:47:34 2025-10-17 00:05:08 "
"0 success 10 1 0 2025-10-22 16:06:42 2025-10-28 13:34:50 \n",
"1 success 495 1 0 2025-10-16 15:47:34 2025-10-28 13:35:09 "
]
},
"execution_count": 2,
@@ -433,12 +469,12 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 10,
"id": "eab90de72c35429e",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-17T05:43:26.113877Z",
"start_time": "2025-10-17T05:43:26.071398Z"
"end_time": "2025-10-29T02:26:12.873536Z",
"start_time": "2025-10-29T02:26:12.648420Z"
}
},
"outputs": [
@@ -446,7 +482,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-17 13:43:26\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
"\u001b[32m2025-10-29 10:26:12\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
]
},
{
@@ -456,14 +492,14 @@
"**任务ID**: 1\n",
"**任务名称**: RSS新闻订阅\n",
"**任务类型**: collector\n",
"**模块路径**: collectors.rss_subscriptions.NewsAPIClient\n",
"**Cron表达式**: 5 0 * * *\n",
"**模块路径**: processors.processor_rss_data.RSSDataProcessor\n",
"**Cron表达式**: */5 * * * *\n",
"**时区**: Asia/Shanghai\n",
"**最后运行时间**: 2025-10-17 00:05:07\n",
"**下次运行时间**: 2025-10-18 00:05:00\n",
"**最后运行时间**: 2025-10-28 13:35:09\n",
"**下次运行时间**: 2025-10-29 10:25:00\n",
"**运行状态**: success\n",
"**是否活跃**: 是\n",
"**运行次数**: 4\n",
"**运行次数**: 496\n",
"**创建时间**: 2025-10-16 15:47:34"
],
"text/plain": [
@@ -479,20 +515,20 @@
"{'task_id': 1,\n",
" 'task_name': 'RSS新闻订阅',\n",
" 'task_type': 'collector',\n",
" 'module_path': 'collectors.rss_subscriptions.NewsAPIClient',\n",
" 'cron_expression': '5 0 * * *',\n",
" 'module_path': 'processors.processor_rss_data.RSSDataProcessor',\n",
" 'cron_expression': '*/5 * * * *',\n",
" 'time_zone': 'Asia/Shanghai',\n",
" 'next_run_time': Timestamp('2025-10-18 00:05:00'),\n",
" 'last_run_time': Timestamp('2025-10-17 00:05:07'),\n",
" 'next_run_time': Timestamp('2025-10-29 10:25:00'),\n",
" 'last_run_time': Timestamp('2025-10-28 13:35:09'),\n",
" 'last_run_status': 'success',\n",
" 'run_count': 4,\n",
" 'run_count': 496,\n",
" 'is_active': 1,\n",
" 'is_running': 0,\n",
" 'created_at': Timestamp('2025-10-16 15:47:34'),\n",
" 'updated_at': Timestamp('2025-10-17 00:05:08')}"
" 'updated_at': Timestamp('2025-10-29 10:24:49')}"
]
},
"execution_count": 3,
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
@@ -536,24 +572,16 @@
},
{
"cell_type": "code",
"execution_count": 14,
"execution_count": 4,
"id": "2b2d723bb8e2784f",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"d:\\ProgramTools\\anaconda3\\envs\\intelligence_system\\Lib\\site-packages\\requests\\__init__.py:86: RequestsDependencyWarning: Unable to find acceptable character detection dependency (chardet or charset_normalizer).\n",
" warnings.warn(\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-16 15:47:34\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
"\u001b[32m2025-10-16 15:47:34\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m新任务添加成功\u001b[0m\n"
"\u001b[32m2025-10-29 09:56:52\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
"\u001b[32m2025-10-29 09:56:52\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m新任务添加成功\u001b[0m\n"
]
},
{
@@ -571,7 +599,7 @@
{
"data": {
"text/markdown": [
"新任务ID: 0,任务名称: RSS新闻订阅"
"新任务ID: 0,任务名称: AI处理RSS新闻"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
@@ -586,7 +614,7 @@
"np.int64(0)"
]
},
"execution_count": 14,
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
@@ -611,9 +639,9 @@
"\n",
"# 执行:添加一个新闻采集任务\n",
"add_new_task(\n",
" name=\"RSS新闻订阅\",\n",
" task_type=\"collector\",\n",
" module_path=\"collectors.rss_subscriptions\",\n",
" name=\"AI处理RSS新闻\",\n",
" task_type=\"processor\",\n",
" module_path=\"processors.ai_processors.ai_processor_rss_data.RSSDataAIProcessor\",\n",
" cron_expression=\"5 0 * * *\", # 每5分钟执行1次\n",
" timezone=\"Asia/Shanghai\"\n",
")"
@@ -629,19 +657,19 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 21,
"id": "c892fd8ad2f0dd9d",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-17T05:44:19.046308Z",
"start_time": "2025-10-17T05:44:18.980345Z"
"end_time": "2025-10-29T02:29:56.088085Z",
"start_time": "2025-10-29T02:29:55.754298Z"
}
},
"outputs": [
{
"data": {
"text/markdown": [
"### 任务ID 1 更新成功"
"### 任务ID 2 更新成功"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
@@ -654,25 +682,25 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-17 13:44:19\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
"\u001b[32m2025-10-29 10:29:56\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
]
},
{
"data": {
"text/markdown": [
"### 任务详情\n",
"**任务ID**: 1\n",
"**任务名称**: RSS新闻订阅\n",
"**任务类型**: collector\n",
"**模块路径**: collectors.rss_subscriptions.NewsAPIClient\n",
"**Cron表达式**: 5 * * * *\n",
"**任务ID**: 2\n",
"**任务名称**: RSS基于规则数据处理\n",
"**任务类型**: processor\n",
"**模块路径**: processors.processor_rss_data\n",
"**Cron表达式**: 0 8,20 * * *\n",
"**时区**: Asia/Shanghai\n",
"**最后运行时间**: 2025-10-17 00:05:07\n",
"**下次运行时间**: 2025-10-18 00:05:00\n",
"**最后运行时间**: 2025-10-28 13:34:49\n",
"**下次运行时间**: 2025-10-28 20:00:00\n",
"**运行状态**: success\n",
"**是否活跃**: 是\n",
"**运行次数**: 4\n",
"**创建时间**: 2025-10-16 15:47:34"
"**运行次数**: 10\n",
"**创建时间**: 2025-10-22 16:06:42"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
@@ -687,7 +715,7 @@
"True"
]
},
"execution_count": 4,
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
@@ -720,7 +748,7 @@
" return success\n",
"\n",
"# 执行:更新任务(示例:修改任务1的Cron表达式为每天10点)\n",
"update_task(1, cron = \"5 * * * *\")\n",
"update_task(2, module = \"processors.processor_rss_data\")\n",
"\n",
"# 执行:同时更新多个属性(名称和Cron表达式)\n",
"# update_task(1, name=\"每日早间新闻采集\", cron=\"0 8 * * *\")"
@@ -786,17 +814,21 @@
"id": "c554c748169d5ac8",
"metadata": {},
"source": [
"## 7. 手动执行任务(对应命令行 run)"
"## 7. 手动执行任务(对应命令行 run)\n",
"\n",
"自动识别main,即main的上一级"
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 3,
"id": "94892f4134316f8e",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-17T05:44:37.714559Z",
"start_time": "2025-10-17T05:44:35.084369Z"
"start_time": "2025-10-29T02:30:10.298891Z"
},
"jupyter": {
"is_executing": true
}
},
"outputs": [
@@ -828,12 +860,12 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-23 16:57:20\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:20\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理器初始化完成\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:20\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m开始处理RSS数据...\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载 8 条未处理的RSS数据\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[33m\u001b[1mWARNING \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[33m\u001b[1m停用词文件不存在: processors/stopwords.txt,使用默认停用词\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[33m\u001b[1mWARNING \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[33m\u001b[1m关键词文件不存在: processors/keywords.txt\u001b[0m\n"
"\u001b[32m2025-10-30 13:57:49\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:49\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理器初始化完成\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:49\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m开始处理RSS数据...\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载 6 条未处理的RSS数据\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载停用词表,共 98 个词\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载汽车后市场关键词,共 37 个\u001b[0m\n"
]
},
{
@@ -842,7 +874,7 @@
"text": [
"Building prefix dict from the default dictionary ...\n",
"Loading model from cache C:\\Users\\zy187\\AppData\\Local\\Temp\\jieba.cache\n",
"Loading model cost 0.609 seconds.\n",
"Loading model cost 0.839 seconds.\n",
"Prefix dict has been built successfully.\n"
]
},
@@ -850,13 +882,10 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m数据处理完成,共处理 8 条记录\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m过滤出 1 条汽车后市场相关新闻\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m表 processed_rss_data 插入结果汇总\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功保存 1 条处理结果到数据库\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功标记 8 条数据为已处理\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理完成\u001b[0m\n",
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m任务执行完成,耗时: 1.19秒\u001b[0m\n"
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m数据处理完成,共处理 6 条记录\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m过滤出 0 条汽车后市场相关新闻\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:51\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功标记 6 条数据为已处理\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:51\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理完成\u001b[0m\n"
]
},
{
@@ -886,7 +915,7 @@
{
"data": {
"text/markdown": [
"**执行时长**: 1.26 秒"
"**执行时长**: 4.41 秒"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
@@ -907,39 +936,6 @@
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"### 📋 执行输出:"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"RSS数据处理完成!\n",
"处理统计: {'total_articles': 8, 'filtered_articles': 1, 'filter_rate': 0.125, 'processing_time': '2025-10-23 16:57:21', 'save_success': True, 'mark_success': True}\n",
"\n"
]
},
{
"data": {
"text/markdown": [
"---"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
@@ -958,12 +954,12 @@
"{'success': True,\n",
" 'task_name': 'RSS基于规则数据处理',\n",
" 'task_id': 2,\n",
" 'execution_time': 1.2610254287719727,\n",
" 'output': \"RSS数据处理完成!\\n处理统计: {'total_articles': 8, 'filtered_articles': 1, 'filter_rate': 0.125, 'processing_time': '2025-10-23 16:57:21', 'save_success': True, 'mark_success': True}\\n\",\n",
" 'execution_time': 4.414557695388794,\n",
" 'output': '',\n",
" 'error': None}"
]
},
"execution_count": 2,
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
+52
View File
@@ -0,0 +1,52 @@
import requests
from typing import Optional
class DingAPI():
def __init__(self):
self.token = None
self.url = ''
def get_token(self) -> Optional:
"""
获取Access Token
return: token(str)
"""
url = 'https://api.dingtalk.com/v1.0/oauth2/dinga88e3d35525b86ca/token'
payload = {
"client_id": "dingn3de1pyuwkymohhe",
"client_secret": "qv__egWJnLVXh14_R1rfD_vBi7M8Gzhnk94EJN6puMzsqqpBCP8U7Ow-zA7SV8Rx",
"grant_type": "client_credentials"
}
response = requests.post(url, json=payload)
token = response.json().get('access_token')
return token
def card_create(self, data):
"""
创建并投放卡片
return: response(dict)
"""
url = 'https://api.dingtalk.com/v1.0/card/instances/createAndDeliver'
headers = {
'x-acs-dingtalk-access-token': data["token"],
'Content-Type': 'application/json'
}
data = {
"cardTemplateId": "cee2715f-001d-41cb-8fcd-3be18be9fbf5.schema",
"outTrackId": "",
"cardData":"",
"openSpaceId":"dtv1.card//IM_GROUP.4210192048793363",# 场域id
}
response = requests.post(url, json=data, headers=headers)
return response.json()
def get_
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+114
View File
@@ -153,6 +153,12 @@ class MySQLAgent:
"""
兼容旧接口的通用插入方法:保留replace参数,同时支持新的ignore_duplicates
自动处理重复数据,对所有数据源通用,插入失败的数据会通过日志记录
安全性说明:
- 使用 INSERT INTO(不是 REPLACE INTO 或 INSERT ... ON DUPLICATE KEY UPDATE
- 当 ignore_duplicates=True 时,重复记录会被跳过,不会覆盖或删除现有数据
- 如果数据库连接失败,操作会抛出异常,不会部分成功
- 所有操作都是安全的,不会导致数据丢失或覆盖
"""
# 【兼容性处理】如果未指定ignore_duplicates,用replace参数推导
if ignore_duplicates is None:
@@ -592,6 +598,114 @@ class MySQLAgent:
exc_info=True)
return False
def create_table_if_not_exists(self, table_name: str, create_sql: str) -> bool:
"""
创建表(如果不存在)
使用 CREATE TABLE IF NOT EXISTS,不会删除已存在的表和数据
参数:
table_name: 表名
create_sql: 完整的 CREATE TABLE SQL 语句(必须包含 IF NOT EXISTS
返回:
bool: 是否成功(表已存在也会返回True)
注意:
- 此方法使用 CREATE TABLE IF NOT EXISTS,是安全的,不会删除现有数据
- 如果连接失败,会抛出异常
"""
if "IF NOT EXISTS" not in create_sql.upper():
self.log.warning(f"CREATE TABLE 语句建议使用 IF NOT EXISTS 以保证安全性")
try:
self.execute_sql(create_sql)
self.log.info(f"成功创建/检查表(表已存在时不会删除数据): {table_name}")
return True
except Exception as e:
self.log.error(f"创建/检查表失败(可能是数据库连接问题): {str(e)}",
table=table_name, exc_info=True)
raise
def add_unique_index_if_not_exists(self, table_name: str, index_name: str,
column_name: str, column_length: int = 500,
check_duplicates: bool = True) -> bool:
"""
添加唯一索引(如果不存在)
不会删除数据,只添加索引
参数:
table_name: 表名
index_name: 索引名称
column_name: 要添加索引的列名
column_length: 索引长度(对于VARCHAR/TEXT类型)
check_duplicates: 是否在添加索引前检查重复数据
返回:
bool: 是否成功添加索引(索引已存在也会返回True)
注意:
- 此方法是安全的,不会删除数据
- 如果表中存在重复数据,会跳过添加索引(不会删除数据)
- 如果连接失败,会抛出异常
"""
try:
# 1. 检查索引是否已存在
check_index_sql = f"""
SELECT COUNT(*) as cnt
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
AND INDEX_NAME = %s
"""
result = self.query_to_df(
check_index_sql,
params=(self.config['database'], table_name, index_name),
is_print=False
)
if not result.empty and result['cnt'].iloc[0] > 0:
self.log.debug(f"唯一索引 {index_name} 已存在,跳过添加")
return True
# 2. 如果启用重复检查,先检查是否有重复数据
if check_duplicates:
check_duplicates_sql = f"""
SELECT {column_name}, COUNT(*) as cnt
FROM `{table_name}`
WHERE {column_name} IS NOT NULL AND {column_name} != ''
GROUP BY {column_name}
HAVING cnt > 1
LIMIT 1
"""
duplicates = self.query_to_df(check_duplicates_sql, is_print=False)
if not duplicates.empty:
self.log.warning(
f"{table_name} 中存在重复的 {column_name} 数据,无法添加唯一索引。"
"现有数据不会被删除。",
duplicate_count=len(duplicates)
)
return False
# 3. 添加唯一索引
add_index_sql = f"""
ALTER TABLE `{table_name}`
ADD UNIQUE KEY `{index_name}` ({column_name}({column_length}))
"""
self.execute_sql(add_index_sql)
self.log.info(f"成功添加唯一索引 {index_name}(现有数据不受影响)")
return True
except Exception as e:
error_msg = str(e)
# 如果索引已存在,不报错
if "Duplicate key name" in error_msg or "already exists" in error_msg.lower():
self.log.debug(f"唯一索引 {index_name} 已存在,跳过添加")
return True
else:
self.log.warning(f"添加唯一索引时出现问题(不影响现有数据): {error_msg}")
raise
def execute_sql(self, sql: str, params: Union[tuple, dict, None] = None,
fetch: bool = False) -> Union[int, List[Dict[str, Any]]]:
"""执行SQL语句(原有逻辑完全保留)"""