md文档更新

This commit is contained in:
2025-08-05 17:13:19 +08:00
parent e5da1203c0
commit fad2b2d1c8
6 changed files with 361 additions and 48 deletions
+237
View File
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
报警通知模块
功能:
1. 多通道报警通知(邮件/企业微信/飞书)
2. 分级报警策略
3. 失败重试机制
"""
import smtplib
import json
import requests
from email.mime.text import MIMEText
from typing import Optional, Dict, List
import logging
from dataclasses import dataclass
from tenacity import retry, stop_after_attempt, wait_exponential
# 日志配置
logger = logging.getLogger('alert')
logger.setLevel(logging.INFO)
@dataclass
class AlertConfig:
"""报警配置数据类"""
email: Dict[str, str] = None # SMTP配置
wecom: Dict[str, str] = None # 企业微信机器人配置
feishu: Dict[str, str] = None # 飞书机器人配置
min_level: str = 'WARNING' # 默认最低报警级别
class AlertService:
def __init__(self, config: AlertConfig):
"""
初始化报警服务
参数:
config: AlertConfig对象
"""
self.config = config
self.levels = {
'DEBUG': 0,
'INFO': 1,
'WARNING': 2,
'ERROR': 3,
'CRITICAL': 4
}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def send_email(self, subject: str, content: str, to_addrs: List[str]) -> bool:
"""
发送邮件报警
参数:
subject: 邮件主题
content: 邮件内容(支持HTML)
to_addrs: 收件人列表
返回:
发送是否成功
"""
if not self.config.email:
logger.warning("邮件配置未启用")
return False
try:
msg = MIMEText(content, 'html', 'utf-8')
msg['From'] = self.config.email['from_addr']
msg['To'] = ','.join(to_addrs)
msg['Subject'] = subject
with smtplib.SMTP_SSL(
host=self.config.email['smtp_server'],
port=self.config.email['smtp_port']
) as server:
server.login(
user=self.config.email['username'],
password=self.config.email['password']
)
server.send_message(msg)
logger.info(f"邮件报警发送成功 -> {to_addrs}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {str(e)}")
raise
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def send_wecom(self, content: str, mentioned_list: List[str] = None) -> bool:
"""
发送企业微信机器人通知
参数:
content: 消息内容(支持Markdown)
mentioned_list: 要@的成员手机号列表
返回:
发送是否成功
"""
if not self.config.wecom:
logger.warning("企业微信配置未启用")
return False
try:
payload = {
"msgtype": "markdown",
"markdown": {
"content": content,
}
}
if mentioned_list:
payload["mentioned_mobile_list"] = mentioned_list
resp = requests.post(
url=self.config.wecom['webhook_url'],
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=5
)
resp.raise_for_status()
logger.info("企业微信报警发送成功")
return True
except Exception as e:
logger.error(f"企业微信发送失败: {str(e)}")
raise
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def send_feishu(self, title: str, content: str) -> bool:
"""
发送飞书机器人通知
参数:
title: 消息标题
content: 消息内容(支持Markdown)
返回:
发送是否成功
"""
if not self.config.feishu:
logger.warning("飞书配置未启用")
return False
try:
payload = {
"msg_type": "interactive",
"card": {
"header": {
"title": {
"tag": "plain_text",
"content": title
},
"template": "red" # 红色标题表示报警
},
"elements": [{
"tag": "markdown",
"content": content
}]
}
}
resp = requests.post(
url=self.config.feishu['webhook_url'],
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=5
)
resp.raise_for_status()
logger.info("飞书报警发送成功")
return True
except Exception as e:
logger.error(f"飞书发送失败: {str(e)}")
raise
def send_alert(self, level: str, title: str, content: str) -> bool:
"""
分级发送报警通知
参数:
level: 报警级别(DEBUG/INFO/WARNING/ERROR/CRITICAL)
title: 报警标题
content: 报警详情
返回:
是否发送成功(至少一个通道成功)
"""
if self.levels[level] < self.levels[self.config.min_level]:
logger.debug(f"忽略低于阈值的报警: {level} < {self.config.min_level}")
return False
results = []
if self.config.email:
email_content = f"""
<h2>{title}</h2>
<p>报警级别: <strong>{level}</strong></p>
<pre>{content}</pre>
"""
results.append(
self.send_email(
subject=f"[{level}] {title}",
content=email_content,
to_addrs=self.config.email['receivers']
)
)
if self.config.wecom:
wecom_content = f"## {title}\n**级别**: {level}\n```\n{content}\n```"
results.append(self.send_wecom(wecom_content))
if self.config.feishu:
feishu_content = f"**级别**: {level}\n\n{content}"
results.append(self.send_feishu(title, feishu_content))
return any(results)
# 配置示例
if __name__ == "__main__":
# 初始化配置
config = AlertConfig(
email={
'smtp_server': 'smtp.example.com',
'smtp_port': 465,
'username': 'alert@example.com',
'password': 'your_password',
'from_addr': 'alert@example.com',
'receivers': ['admin@example.com']
},
wecom={
'webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key'
},
feishu={
'webhook_url': 'https://open.feishu.cn/open-apis/bot/v2/hook/your_token'
},
min_level='ERROR'
)
# 使用示例
alert = AlertService(config)
# 发送测试报警
alert.send_alert(
level='ERROR',
title='数据库连接失败',
content='无法连接到MySQL服务器,已重试3次\n主机: 192.168.1.100:3306'
)
+5
View File
@@ -0,0 +1,5 @@
"""
数据采集包
"""
View File
+72
View File
@@ -0,0 +1,72 @@
## 情报收集系统设计
### 程序框架
```angular2html
intelligence_system/
├── config/ # 配置管理
│ ├── __init__.py
│ ├── settings.py # 全局参数
│ └── logging.conf # 日志配置
├── collectors/ # 数据采集
│ ├── news_api.py
│ └── complaint_spider.py
├── processors/ # 数据处理
│ ├── text_processor.py
│ └── image_processor.py
├── storage/ # 数据存储
│ ├── database.py
│ └── cache_manager.py
├── applications/ # 应用层
│ ├── reporter/
│ │ ├── daily.py
│ │ └── monthly.py
│ └── alert.py
├── utils/ # 工具类
│ ├── logger.py
│ └── network.py
└── main.py # 调度入口
```
### 程序设计原则
1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行
2. 配置需要在配置类中定义
3. 密钥等信息直接放在配置类中
### 主程序设计
主程序需要一次启动,一直运行,启动时运行一次(在代码中可取消),之后每天定时生成一次报告
主程序包含爬虫/api调度器。该调度器通过查询mysql中任务调度情况按需执行,db文件中应包含任务名称、
任务路径、任务执行频率(支持按天、按周,按分钟)、上次执行时间、下次执行时间等信息
主程序应包含数据处理调度器,根据数据类别分别处理,如文本数据处理调度器、图片数据处理调度器等,
每天定时拉取db获取到的原始数据,分别进行处理,处理完成后将结果保存到mysql中
主程序应包含日报、周报等生成,根据时间定时生成报告
### 日志设计
日志系统应兼容多个平台,如win、mac和linux,日志需要保存为log文件,并且在日志大于20mb时自动压缩
### 数据库链接设计
数据存储放在数据库中,数据库类型为mysql,数据库名称为intelligence_system
数据库表的命名规则与目录一致,数据采集类以collector_为开头,数据处理类以processor_为开头,数据存储类以storage_为开头,应用层类以application_为开头
依次类推。
数据库链接为通用配置,要求数据采集或处理类等,可以直接调用封装好的数据库链接,不必每次都重新些,
该链接包含表的增删改查功能,以及执行sql语句功能
数据库结构:
1. collector_news_api:新闻api数据表
2. collector_complaint_spider:投诉数据表
3. processor_text_processor:文本处理数据表
4. processor_image_processor:图片处理数据表
5. main_task 任务调度表
6. application_reporter_daily:日报数据表
7. application_reporter_monthly:周报数据表
### 数据采集设计
每一个数据采集均为独立python文件,里面执行主程序均为main,以方便调度
每一个数据采集均会根据规则创建数据库表,数据处理类以processor_为开头
### 数据处理
从多个数据库库表中获取数据,对数据进行处理,处理完成后将结果保存到数据库中,处理结果可能存储在多个表中
数据处理数据库表以processor_为开头
+42 -46
View File
@@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
情报收集系统主程序 情报收集系统主程序(明文配置版)
功能: 功能:
1. 调度数据采集、处理、存储流程 1. 调度数据采集、处理、存储流程
2. 生成日报/月报 2. 生成日报/月报
@@ -10,42 +10,32 @@
import sys import sys
import logging import logging
from datetime import datetime, timedelta import time
from typing import List, Dict, Any import threading
from datetime import datetime
from typing import Dict, List, Any
# 自定义模块 # 自定义模块
from config.settings import API_KEYS, DATA_SOURCES
from collectors.news_api import NewsAPICollector
from collectors.complaint_spider import ComplaintSpider
from processors.data_processor import DataProcessor from processors.data_processor import DataProcessor
from storage.database import IntelligenceDB from storage.database import IntelligenceDB
from applications.reporter import ReportGenerator from applications.reporter import ReportGenerator
from applications.alert import AlertService from applications.alert import AlertService
from utils.logger import setup_logging from utils.logger import setup_logging
from utils.mail import send_email
class IntelligenceSystem: class IntelligenceSystem:
def __init__(self): def __init__(self):
# 初始化核心组件 # 初始化核心组件
setup_logging() setup_logging()
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.db = IntelligenceDB() self.db = IntelligenceDB()
self.processor = DataProcessor() self.processor = DataProcessor()
self.alert = AlertService() self.alert = AlertService()
# 数据采集器注册
self.collectors = {
"news": NewsAPICollector(API_KEYS['newsapi']),
"complaint": ComplaintSpider(
base_url=DATA_SOURCES['blackcat'],
rate_limit=30 # 30秒爬取间隔
)
}
def run_daily_pipeline(self): def run_daily_pipeline(self):
"""每日数据采集处理流程""" """每日数据采集处理流程"""
try: try:
self.logger.info("开始执行每日数据采集流程")
# 阶段1:数据采集 # 阶段1:数据采集
raw_data = self._collect_data() raw_data = self._collect_data()
@@ -61,6 +51,7 @@ class IntelligenceSystem:
# 阶段5:异常检测 # 阶段5:异常检测
self._check_alerts() self._check_alerts()
self.logger.info("每日流程执行完成")
except Exception as e: except Exception as e:
self.logger.error(f"主流程执行失败: {str(e)}", exc_info=True) self.logger.error(f"主流程执行失败: {str(e)}", exc_info=True)
self.alert.send_critical(f"系统异常: {str(e)}") self.alert.send_critical(f"系统异常: {str(e)}")
@@ -128,58 +119,63 @@ class IntelligenceSystem:
"""生成报告并发送""" """生成报告并发送"""
try: try:
# 日报生成 # 日报生成
report_date = datetime.now().date()
report_html = ReportGenerator(self.db).generate_daily() report_html = ReportGenerator(self.db).generate_daily()
with open(f"reports/daily_{datetime.now().date()}.html", 'w') as f:
report_path = f"reports/daily_{report_date}.html"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_html) f.write(report_html)
self.logger.info(f"日报已生成: {report_path}")
# 每月1号生成月报 # 每月1号生成月报
if datetime.now().day == 1: if datetime.now().day == 1:
monthly_report = ReportGenerator(self.db).generate_monthly() monthly_report = ReportGenerator(self.db).generate_monthly()
send_email( # 这里替换为实际的邮件发送逻辑
to="team@example.com", self.logger.info("月度报告已生成(邮件发送功能需配置)")
subject=f"{datetime.now().strftime('%Y-%m')} 情报月报",
content=monthly_report
)
except Exception as e: except Exception as e:
self.logger.error(f"报告生成失败: {str(e)}") self.logger.error(f"报告生成失败: {str(e)}")
def _check_alerts(self): def _check_alerts(self):
"""检查预警信息""" """检查预警信息"""
# 负面舆情监测
negative_keywords = ['投诉', '造假', '违规'] negative_keywords = ['投诉', '造假', '违规']
alerts = self.alert.check_negative(negative_keywords) alerts = self.alert.check_negative(negative_keywords)
if alerts: if alerts:
self.alert.send_urgent( alert_msg = "\n".join([f"[{a['source']}] {a['content']}" for a in alerts])
"负面舆情警报", self.logger.warning(f"发现负面舆情:\n{alert_msg}")
"\n".join([f"[{a['source']}] {a['content']}" for a in alerts]) # 这里替换为实际的通知发送逻辑
) self.alert.send_urgent("负面舆情警报", alert_msg)
def cleanup(self): def cleanup(self):
"""资源清理""" """资源清理"""
self.db.close() self.db.close()
self.logger.info("系统资源已释放") self.logger.info("系统资源已释放")
if __name__ == "__main__": def run_scheduled():
"""定时任务执行入口"""
system = IntelligenceSystem() system = IntelligenceSystem()
try: try:
# 执行每日任务 while True:
if len(sys.argv) > 1 and sys.argv[1] == "--manual": now = datetime.now()
if now.hour == 9 and now.minute == 0: # 每天9点执行
system.run_daily_pipeline()
time.sleep(60) # 避免重复执行
time.sleep(30)
except KeyboardInterrupt:
system.cleanup()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--manual":
# 手动执行模式
system = IntelligenceSystem()
try:
system.logger.info("手动执行模式启动") system.logger.info("手动执行模式启动")
system.run_daily_pipeline() system.run_daily_pipeline()
else: finally:
# 定时任务模式(实际部署时改用crontab或APScheduler system.cleanup()
system.logger.info("定时任务模式启动") else:
while True: # 定时任务模式
now = datetime.now() print("情报收集系统已启动(定时模式)")
if now.hour == 9 and now.minute == 0: # 每天9点执行 print("按 Ctrl+C 退出")
system.run_daily_pipeline() run_scheduled()
time.sleep(60) # 避免重复执行
time.sleep(30)
except KeyboardInterrupt:
system.logger.info("用户中断执行")
finally:
system.cleanup()
+5 -2
View File
@@ -16,6 +16,7 @@ from logging.handlers import TimedRotatingFileHandler
from datetime import datetime from datetime import datetime
import platform import platform
class CrossPlatformLogger: class CrossPlatformLogger:
def __init__(self, name="intelligence_system"): def __init__(self, name="intelligence_system"):
""" """
@@ -65,8 +66,8 @@ class CrossPlatformLogger:
# Windows终端特殊处理 # Windows终端特殊处理
if self.system == 'windows' and not sys.stdout.isatty(): if self.system == 'windows' and not sys.stdout.isatty():
import colorama
try: try:
import colorama
colorama.init() colorama.init()
except ImportError: except ImportError:
pass pass
@@ -112,10 +113,12 @@ class CrossPlatformLogger:
"""获取配置好的日志实例""" """获取配置好的日志实例"""
return CrossPlatformLogger(name).logger return CrossPlatformLogger(name).logger
def setup_logging(name: str = "intelligence_system"): def setup_logging(name: str = "intelligence_system"):
"""快速配置日志(兼容旧代码)""" """快速配置日志(兼容旧代码)"""
return CrossPlatformLogger(name).logger return CrossPlatformLogger(name).logger
# 测试代码 # 测试代码
if __name__ == "__main__": if __name__ == "__main__":
logger = CrossPlatformLogger().logger logger = CrossPlatformLogger().logger
@@ -123,4 +126,4 @@ if __name__ == "__main__":
try: try:
1 / 0 1 / 0
except Exception as e: except Exception as e:
logger.error("除零错误示例", exc_info=True) logger.error("除零错误示例", exc_info=True)