diff --git a/applications/alert.py b/applications/alert.py index e69de29..b6677ea 100644 --- a/applications/alert.py +++ b/applications/alert.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +报警通知模块 +功能: +1. 多通道报警通知(邮件/企业微信/飞书) +2. 分级报警策略 +3. 失败重试机制 +""" + +import smtplib +import json +import requests +from email.mime.text import MIMEText +from typing import Optional, Dict, List +import logging +from dataclasses import dataclass +from tenacity import retry, stop_after_attempt, wait_exponential + +# 日志配置 +logger = logging.getLogger('alert') +logger.setLevel(logging.INFO) + +@dataclass +class AlertConfig: + """报警配置数据类""" + email: Dict[str, str] = None # SMTP配置 + wecom: Dict[str, str] = None # 企业微信机器人配置 + feishu: Dict[str, str] = None # 飞书机器人配置 + min_level: str = 'WARNING' # 默认最低报警级别 + +class AlertService: + def __init__(self, config: AlertConfig): + """ + 初始化报警服务 + 参数: + config: AlertConfig对象 + """ + self.config = config + self.levels = { + 'DEBUG': 0, + 'INFO': 1, + 'WARNING': 2, + 'ERROR': 3, + 'CRITICAL': 4 + } + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def send_email(self, subject: str, content: str, to_addrs: List[str]) -> bool: + """ + 发送邮件报警 + 参数: + subject: 邮件主题 + content: 邮件内容(支持HTML) + to_addrs: 收件人列表 + 返回: + 发送是否成功 + """ + if not self.config.email: + logger.warning("邮件配置未启用") + return False + + try: + msg = MIMEText(content, 'html', 'utf-8') + msg['From'] = self.config.email['from_addr'] + msg['To'] = ','.join(to_addrs) + msg['Subject'] = subject + + with smtplib.SMTP_SSL( + host=self.config.email['smtp_server'], + port=self.config.email['smtp_port'] + ) as server: + server.login( + user=self.config.email['username'], + password=self.config.email['password'] + ) + server.send_message(msg) + + logger.info(f"邮件报警发送成功 -> {to_addrs}") + return True + except Exception as e: + logger.error(f"邮件发送失败: {str(e)}") + raise + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def send_wecom(self, content: str, mentioned_list: List[str] = None) -> bool: + """ + 发送企业微信机器人通知 + 参数: + content: 消息内容(支持Markdown) + mentioned_list: 要@的成员手机号列表 + 返回: + 发送是否成功 + """ + if not self.config.wecom: + logger.warning("企业微信配置未启用") + return False + + try: + payload = { + "msgtype": "markdown", + "markdown": { + "content": content, + } + } + if mentioned_list: + payload["mentioned_mobile_list"] = mentioned_list + + resp = requests.post( + url=self.config.wecom['webhook_url'], + headers={'Content-Type': 'application/json'}, + data=json.dumps(payload), + timeout=5 + ) + resp.raise_for_status() + + logger.info("企业微信报警发送成功") + return True + except Exception as e: + logger.error(f"企业微信发送失败: {str(e)}") + raise + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def send_feishu(self, title: str, content: str) -> bool: + """ + 发送飞书机器人通知 + 参数: + title: 消息标题 + content: 消息内容(支持Markdown) + 返回: + 发送是否成功 + """ + if not self.config.feishu: + logger.warning("飞书配置未启用") + return False + + try: + payload = { + "msg_type": "interactive", + "card": { + "header": { + "title": { + "tag": "plain_text", + "content": title + }, + "template": "red" # 红色标题表示报警 + }, + "elements": [{ + "tag": "markdown", + "content": content + }] + } + } + + resp = requests.post( + url=self.config.feishu['webhook_url'], + headers={'Content-Type': 'application/json'}, + data=json.dumps(payload), + timeout=5 + ) + resp.raise_for_status() + + logger.info("飞书报警发送成功") + return True + except Exception as e: + logger.error(f"飞书发送失败: {str(e)}") + raise + + def send_alert(self, level: str, title: str, content: str) -> bool: + """ + 分级发送报警通知 + 参数: + level: 报警级别(DEBUG/INFO/WARNING/ERROR/CRITICAL) + title: 报警标题 + content: 报警详情 + 返回: + 是否发送成功(至少一个通道成功) + """ + if self.levels[level] < self.levels[self.config.min_level]: + logger.debug(f"忽略低于阈值的报警: {level} < {self.config.min_level}") + return False + + results = [] + if self.config.email: + email_content = f""" +

{title}

+

报警级别: {level}

+
{content}
+ """ + results.append( + self.send_email( + subject=f"[{level}] {title}", + content=email_content, + to_addrs=self.config.email['receivers'] + ) + ) + + if self.config.wecom: + wecom_content = f"## {title}\n**级别**: {level}\n```\n{content}\n```" + results.append(self.send_wecom(wecom_content)) + + if self.config.feishu: + feishu_content = f"**级别**: {level}\n\n{content}" + results.append(self.send_feishu(title, feishu_content)) + + return any(results) + +# 配置示例 +if __name__ == "__main__": + # 初始化配置 + config = AlertConfig( + email={ + 'smtp_server': 'smtp.example.com', + 'smtp_port': 465, + 'username': 'alert@example.com', + 'password': 'your_password', + 'from_addr': 'alert@example.com', + 'receivers': ['admin@example.com'] + }, + wecom={ + 'webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key' + }, + feishu={ + 'webhook_url': 'https://open.feishu.cn/open-apis/bot/v2/hook/your_token' + }, + min_level='ERROR' + ) + + # 使用示例 + alert = AlertService(config) + + # 发送测试报警 + alert.send_alert( + level='ERROR', + title='数据库连接失败', + content='无法连接到MySQL服务器,已重试3次\n主机: 192.168.1.100:3306' + ) \ No newline at end of file diff --git a/collectors/__init__.py b/collectors/__init__.py new file mode 100644 index 0000000..87eedc7 --- /dev/null +++ b/collectors/__init__.py @@ -0,0 +1,5 @@ +""" +数据采集包 +""" + + diff --git a/collectors/base.py b/collectors/base.py deleted file mode 100644 index e69de29..0000000 diff --git a/doc/readme.md b/doc/readme.md new file mode 100644 index 0000000..da760ff --- /dev/null +++ b/doc/readme.md @@ -0,0 +1,72 @@ +## 情报收集系统设计 + +### 程序框架 +```angular2html +intelligence_system/ +├── config/ # 配置管理 +│ ├── __init__.py +│ ├── settings.py # 全局参数 +│ └── logging.conf # 日志配置 +├── collectors/ # 数据采集 +│ ├── news_api.py +│ └── complaint_spider.py +├── processors/ # 数据处理 +│ ├── text_processor.py +│ └── image_processor.py +├── storage/ # 数据存储 +│ ├── database.py +│ └── cache_manager.py +├── applications/ # 应用层 +│ ├── reporter/ +│ │ ├── daily.py +│ │ └── monthly.py +│ └── alert.py +├── utils/ # 工具类 +│ ├── logger.py +│ └── network.py +└── main.py # 调度入口 +``` +### 程序设计原则 +1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行 +2. 配置需要在配置类中定义 +3. 密钥等信息直接放在配置类中 + +### 主程序设计 +主程序需要一次启动,一直运行,启动时运行一次(在代码中可取消),之后每天定时生成一次报告 + +主程序包含爬虫/api调度器。该调度器通过查询mysql中任务调度情况按需执行,db文件中应包含任务名称、 +任务路径、任务执行频率(支持按天、按周,按分钟)、上次执行时间、下次执行时间等信息 + +主程序应包含数据处理调度器,根据数据类别分别处理,如文本数据处理调度器、图片数据处理调度器等, +每天定时拉取db获取到的原始数据,分别进行处理,处理完成后将结果保存到mysql中 + +主程序应包含日报、周报等生成,根据时间定时生成报告 + +### 日志设计 +日志系统应兼容多个平台,如win、mac和linux,日志需要保存为log文件,并且在日志大于20mb时自动压缩 + +### 数据库链接设计 +数据存储放在数据库中,数据库类型为mysql,数据库名称为intelligence_system + +数据库表的命名规则与目录一致,数据采集类以collector_为开头,数据处理类以processor_为开头,数据存储类以storage_为开头,应用层类以application_为开头 +依次类推。 + +数据库链接为通用配置,要求数据采集或处理类等,可以直接调用封装好的数据库链接,不必每次都重新些, +该链接包含表的增删改查功能,以及执行sql语句功能 + +数据库结构: +1. collector_news_api:新闻api数据表 +2. collector_complaint_spider:投诉数据表 +3. processor_text_processor:文本处理数据表 +4. processor_image_processor:图片处理数据表 +5. main_task 任务调度表 +6. application_reporter_daily:日报数据表 +7. application_reporter_monthly:周报数据表 + +### 数据采集设计 +每一个数据采集均为独立python文件,里面执行主程序均为main,以方便调度 +每一个数据采集均会根据规则创建数据库表,数据处理类以processor_为开头 + +### 数据处理 +从多个数据库库表中获取数据,对数据进行处理,处理完成后将结果保存到数据库中,处理结果可能存储在多个表中 +数据处理数据库表以processor_为开头 diff --git a/main.py b/main.py index 4d10b3a..f2da81c 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -情报收集系统主程序 +情报收集系统主程序(明文配置版) 功能: 1. 调度数据采集、处理、存储流程 2. 生成日报/月报 @@ -10,42 +10,32 @@ import sys import logging -from datetime import datetime, timedelta -from typing import List, Dict, Any +import time +import threading +from datetime import datetime +from typing import Dict, List, Any # 自定义模块 -from config.settings import API_KEYS, DATA_SOURCES -from collectors.news_api import NewsAPICollector -from collectors.complaint_spider import ComplaintSpider from processors.data_processor import DataProcessor from storage.database import IntelligenceDB from applications.reporter import ReportGenerator from applications.alert import AlertService from utils.logger import setup_logging -from utils.mail import send_email class IntelligenceSystem: def __init__(self): # 初始化核心组件 setup_logging() self.logger = logging.getLogger(__name__) - self.db = IntelligenceDB() self.processor = DataProcessor() self.alert = AlertService() - # 数据采集器注册 - self.collectors = { - "news": NewsAPICollector(API_KEYS['newsapi']), - "complaint": ComplaintSpider( - base_url=DATA_SOURCES['blackcat'], - rate_limit=30 # 30秒爬取间隔 - ) - } - def run_daily_pipeline(self): """每日数据采集处理流程""" try: + self.logger.info("开始执行每日数据采集流程") + # 阶段1:数据采集 raw_data = self._collect_data() @@ -61,6 +51,7 @@ class IntelligenceSystem: # 阶段5:异常检测 self._check_alerts() + self.logger.info("每日流程执行完成") except Exception as e: self.logger.error(f"主流程执行失败: {str(e)}", exc_info=True) self.alert.send_critical(f"系统异常: {str(e)}") @@ -128,58 +119,63 @@ class IntelligenceSystem: """生成报告并发送""" try: # 日报生成 + report_date = datetime.now().date() report_html = ReportGenerator(self.db).generate_daily() - with open(f"reports/daily_{datetime.now().date()}.html", 'w') as f: + + report_path = f"reports/daily_{report_date}.html" + with open(report_path, 'w', encoding='utf-8') as f: f.write(report_html) + self.logger.info(f"日报已生成: {report_path}") # 每月1号生成月报 if datetime.now().day == 1: monthly_report = ReportGenerator(self.db).generate_monthly() - send_email( - to="team@example.com", - subject=f"{datetime.now().strftime('%Y-%m')} 情报月报", - content=monthly_report - ) + # 这里替换为实际的邮件发送逻辑 + self.logger.info("月度报告已生成(邮件发送功能需配置)") except Exception as e: self.logger.error(f"报告生成失败: {str(e)}") def _check_alerts(self): """检查预警信息""" - # 负面舆情监测 negative_keywords = ['投诉', '造假', '违规'] alerts = self.alert.check_negative(negative_keywords) if alerts: - self.alert.send_urgent( - "负面舆情警报", - "\n".join([f"[{a['source']}] {a['content']}" for a in alerts]) - ) + alert_msg = "\n".join([f"[{a['source']}] {a['content']}" for a in alerts]) + self.logger.warning(f"发现负面舆情:\n{alert_msg}") + # 这里替换为实际的通知发送逻辑 + self.alert.send_urgent("负面舆情警报", alert_msg) def cleanup(self): """资源清理""" self.db.close() self.logger.info("系统资源已释放") -if __name__ == "__main__": +def run_scheduled(): + """定时任务执行入口""" system = IntelligenceSystem() - try: - # 执行每日任务 - if len(sys.argv) > 1 and sys.argv[1] == "--manual": + while True: + now = datetime.now() + if now.hour == 9 and now.minute == 0: # 每天9点执行 + system.run_daily_pipeline() + time.sleep(60) # 避免重复执行 + time.sleep(30) + except KeyboardInterrupt: + system.cleanup() + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "--manual": + # 手动执行模式 + system = IntelligenceSystem() + try: system.logger.info("手动执行模式启动") system.run_daily_pipeline() - else: - # 定时任务模式(实际部署时改用crontab或APScheduler) - system.logger.info("定时任务模式启动") - while True: - now = datetime.now() - if now.hour == 9 and now.minute == 0: # 每天9点执行 - system.run_daily_pipeline() - time.sleep(60) # 避免重复执行 - time.sleep(30) - - except KeyboardInterrupt: - system.logger.info("用户中断执行") - finally: - system.cleanup() \ No newline at end of file + finally: + system.cleanup() + else: + # 定时任务模式 + print("情报收集系统已启动(定时模式)") + print("按 Ctrl+C 退出") + run_scheduled() \ No newline at end of file diff --git a/utils/logger.py b/utils/logger.py index a89b623..7b5c73e 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -16,6 +16,7 @@ from logging.handlers import TimedRotatingFileHandler from datetime import datetime import platform + class CrossPlatformLogger: def __init__(self, name="intelligence_system"): """ @@ -65,8 +66,8 @@ class CrossPlatformLogger: # Windows终端特殊处理 if self.system == 'windows' and not sys.stdout.isatty(): + import colorama try: - import colorama colorama.init() except ImportError: pass @@ -112,10 +113,12 @@ class CrossPlatformLogger: """获取配置好的日志实例""" return CrossPlatformLogger(name).logger + def setup_logging(name: str = "intelligence_system"): """快速配置日志(兼容旧代码)""" return CrossPlatformLogger(name).logger + # 测试代码 if __name__ == "__main__": logger = CrossPlatformLogger().logger @@ -123,4 +126,4 @@ if __name__ == "__main__": try: 1 / 0 except Exception as e: - logger.error("除零错误示例", exc_info=True) \ No newline at end of file + logger.error("除零错误示例", exc_info=True)