From fad2b2d1c870294078a22281098cbe4d44ae102e Mon Sep 17 00:00:00 2001 From: Administrator <1415243231@qq.com> Date: Tue, 5 Aug 2025 17:13:19 +0800 Subject: [PATCH] =?UTF-8?q?md=E6=96=87=E6=A1=A3=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- applications/alert.py | 237 +++++++++++++++++++++++++++++++++++++++++ collectors/__init__.py | 5 + collectors/base.py | 0 doc/readme.md | 72 +++++++++++++ main.py | 88 ++++++++------- utils/logger.py | 7 +- 6 files changed, 361 insertions(+), 48 deletions(-) create mode 100644 collectors/__init__.py delete mode 100644 collectors/base.py create mode 100644 doc/readme.md diff --git a/applications/alert.py b/applications/alert.py index e69de29..b6677ea 100644 --- a/applications/alert.py +++ b/applications/alert.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +报警通知模块 +功能: +1. 多通道报警通知(邮件/企业微信/飞书) +2. 分级报警策略 +3. 失败重试机制 +""" + +import smtplib +import json +import requests +from email.mime.text import MIMEText +from typing import Optional, Dict, List +import logging +from dataclasses import dataclass +from tenacity import retry, stop_after_attempt, wait_exponential + +# 日志配置 +logger = logging.getLogger('alert') +logger.setLevel(logging.INFO) + +@dataclass +class AlertConfig: + """报警配置数据类""" + email: Dict[str, str] = None # SMTP配置 + wecom: Dict[str, str] = None # 企业微信机器人配置 + feishu: Dict[str, str] = None # 飞书机器人配置 + min_level: str = 'WARNING' # 默认最低报警级别 + +class AlertService: + def __init__(self, config: AlertConfig): + """ + 初始化报警服务 + 参数: + config: AlertConfig对象 + """ + self.config = config + self.levels = { + 'DEBUG': 0, + 'INFO': 1, + 'WARNING': 2, + 'ERROR': 3, + 'CRITICAL': 4 + } + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def send_email(self, subject: str, content: str, to_addrs: List[str]) -> bool: + """ + 发送邮件报警 + 参数: + subject: 邮件主题 + content: 邮件内容(支持HTML) + to_addrs: 收件人列表 + 返回: + 发送是否成功 + """ + if not self.config.email: + logger.warning("邮件配置未启用") + return False + + try: + msg = MIMEText(content, 'html', 'utf-8') + msg['From'] = self.config.email['from_addr'] + msg['To'] = ','.join(to_addrs) + msg['Subject'] = subject + + with smtplib.SMTP_SSL( + host=self.config.email['smtp_server'], + port=self.config.email['smtp_port'] + ) as server: + server.login( + user=self.config.email['username'], + password=self.config.email['password'] + ) + server.send_message(msg) + + logger.info(f"邮件报警发送成功 -> {to_addrs}") + return True + except Exception as e: + logger.error(f"邮件发送失败: {str(e)}") + raise + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def send_wecom(self, content: str, mentioned_list: List[str] = None) -> bool: + """ + 发送企业微信机器人通知 + 参数: + content: 消息内容(支持Markdown) + mentioned_list: 要@的成员手机号列表 + 返回: + 发送是否成功 + """ + if not self.config.wecom: + logger.warning("企业微信配置未启用") + return False + + try: + payload = { + "msgtype": "markdown", + "markdown": { + "content": content, + } + } + if mentioned_list: + payload["mentioned_mobile_list"] = mentioned_list + + resp = requests.post( + url=self.config.wecom['webhook_url'], + headers={'Content-Type': 'application/json'}, + data=json.dumps(payload), + timeout=5 + ) + resp.raise_for_status() + + logger.info("企业微信报警发送成功") + return True + except Exception as e: + logger.error(f"企业微信发送失败: {str(e)}") + raise + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def send_feishu(self, title: str, content: str) -> bool: + """ + 发送飞书机器人通知 + 参数: + title: 消息标题 + content: 消息内容(支持Markdown) + 返回: + 发送是否成功 + """ + if not self.config.feishu: + logger.warning("飞书配置未启用") + return False + + try: + payload = { + "msg_type": "interactive", + "card": { + "header": { + "title": { + "tag": "plain_text", + "content": title + }, + "template": "red" # 红色标题表示报警 + }, + "elements": [{ + "tag": "markdown", + "content": content + }] + } + } + + resp = requests.post( + url=self.config.feishu['webhook_url'], + headers={'Content-Type': 'application/json'}, + data=json.dumps(payload), + timeout=5 + ) + resp.raise_for_status() + + logger.info("飞书报警发送成功") + return True + except Exception as e: + logger.error(f"飞书发送失败: {str(e)}") + raise + + def send_alert(self, level: str, title: str, content: str) -> bool: + """ + 分级发送报警通知 + 参数: + level: 报警级别(DEBUG/INFO/WARNING/ERROR/CRITICAL) + title: 报警标题 + content: 报警详情 + 返回: + 是否发送成功(至少一个通道成功) + """ + if self.levels[level] < self.levels[self.config.min_level]: + logger.debug(f"忽略低于阈值的报警: {level} < {self.config.min_level}") + return False + + results = [] + if self.config.email: + email_content = f""" +
报警级别: {level}
+{content}
+ """
+ results.append(
+ self.send_email(
+ subject=f"[{level}] {title}",
+ content=email_content,
+ to_addrs=self.config.email['receivers']
+ )
+ )
+
+ if self.config.wecom:
+ wecom_content = f"## {title}\n**级别**: {level}\n```\n{content}\n```"
+ results.append(self.send_wecom(wecom_content))
+
+ if self.config.feishu:
+ feishu_content = f"**级别**: {level}\n\n{content}"
+ results.append(self.send_feishu(title, feishu_content))
+
+ return any(results)
+
+# 配置示例
+if __name__ == "__main__":
+ # 初始化配置
+ config = AlertConfig(
+ email={
+ 'smtp_server': 'smtp.example.com',
+ 'smtp_port': 465,
+ 'username': 'alert@example.com',
+ 'password': 'your_password',
+ 'from_addr': 'alert@example.com',
+ 'receivers': ['admin@example.com']
+ },
+ wecom={
+ 'webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key'
+ },
+ feishu={
+ 'webhook_url': 'https://open.feishu.cn/open-apis/bot/v2/hook/your_token'
+ },
+ min_level='ERROR'
+ )
+
+ # 使用示例
+ alert = AlertService(config)
+
+ # 发送测试报警
+ alert.send_alert(
+ level='ERROR',
+ title='数据库连接失败',
+ content='无法连接到MySQL服务器,已重试3次\n主机: 192.168.1.100:3306'
+ )
\ No newline at end of file
diff --git a/collectors/__init__.py b/collectors/__init__.py
new file mode 100644
index 0000000..87eedc7
--- /dev/null
+++ b/collectors/__init__.py
@@ -0,0 +1,5 @@
+"""
+数据采集包
+"""
+
+
diff --git a/collectors/base.py b/collectors/base.py
deleted file mode 100644
index e69de29..0000000
diff --git a/doc/readme.md b/doc/readme.md
new file mode 100644
index 0000000..da760ff
--- /dev/null
+++ b/doc/readme.md
@@ -0,0 +1,72 @@
+## 情报收集系统设计
+
+### 程序框架
+```angular2html
+intelligence_system/
+├── config/ # 配置管理
+│ ├── __init__.py
+│ ├── settings.py # 全局参数
+│ └── logging.conf # 日志配置
+├── collectors/ # 数据采集
+│ ├── news_api.py
+│ └── complaint_spider.py
+├── processors/ # 数据处理
+│ ├── text_processor.py
+│ └── image_processor.py
+├── storage/ # 数据存储
+│ ├── database.py
+│ └── cache_manager.py
+├── applications/ # 应用层
+│ ├── reporter/
+│ │ ├── daily.py
+│ │ └── monthly.py
+│ └── alert.py
+├── utils/ # 工具类
+│ ├── logger.py
+│ └── network.py
+└── main.py # 调度入口
+```
+### 程序设计原则
+1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行
+2. 配置需要在配置类中定义
+3. 密钥等信息直接放在配置类中
+
+### 主程序设计
+主程序需要一次启动,一直运行,启动时运行一次(在代码中可取消),之后每天定时生成一次报告
+
+主程序包含爬虫/api调度器。该调度器通过查询mysql中任务调度情况按需执行,db文件中应包含任务名称、
+任务路径、任务执行频率(支持按天、按周,按分钟)、上次执行时间、下次执行时间等信息
+
+主程序应包含数据处理调度器,根据数据类别分别处理,如文本数据处理调度器、图片数据处理调度器等,
+每天定时拉取db获取到的原始数据,分别进行处理,处理完成后将结果保存到mysql中
+
+主程序应包含日报、周报等生成,根据时间定时生成报告
+
+### 日志设计
+日志系统应兼容多个平台,如win、mac和linux,日志需要保存为log文件,并且在日志大于20mb时自动压缩
+
+### 数据库链接设计
+数据存储放在数据库中,数据库类型为mysql,数据库名称为intelligence_system
+
+数据库表的命名规则与目录一致,数据采集类以collector_为开头,数据处理类以processor_为开头,数据存储类以storage_为开头,应用层类以application_为开头
+依次类推。
+
+数据库链接为通用配置,要求数据采集或处理类等,可以直接调用封装好的数据库链接,不必每次都重新些,
+该链接包含表的增删改查功能,以及执行sql语句功能
+
+数据库结构:
+1. collector_news_api:新闻api数据表
+2. collector_complaint_spider:投诉数据表
+3. processor_text_processor:文本处理数据表
+4. processor_image_processor:图片处理数据表
+5. main_task 任务调度表
+6. application_reporter_daily:日报数据表
+7. application_reporter_monthly:周报数据表
+
+### 数据采集设计
+每一个数据采集均为独立python文件,里面执行主程序均为main,以方便调度
+每一个数据采集均会根据规则创建数据库表,数据处理类以processor_为开头
+
+### 数据处理
+从多个数据库库表中获取数据,对数据进行处理,处理完成后将结果保存到数据库中,处理结果可能存储在多个表中
+数据处理数据库表以processor_为开头
diff --git a/main.py b/main.py
index 4d10b3a..f2da81c 100644
--- a/main.py
+++ b/main.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
-情报收集系统主程序
+情报收集系统主程序(明文配置版)
功能:
1. 调度数据采集、处理、存储流程
2. 生成日报/月报
@@ -10,42 +10,32 @@
import sys
import logging
-from datetime import datetime, timedelta
-from typing import List, Dict, Any
+import time
+import threading
+from datetime import datetime
+from typing import Dict, List, Any
# 自定义模块
-from config.settings import API_KEYS, DATA_SOURCES
-from collectors.news_api import NewsAPICollector
-from collectors.complaint_spider import ComplaintSpider
from processors.data_processor import DataProcessor
from storage.database import IntelligenceDB
from applications.reporter import ReportGenerator
from applications.alert import AlertService
from utils.logger import setup_logging
-from utils.mail import send_email
class IntelligenceSystem:
def __init__(self):
# 初始化核心组件
setup_logging()
self.logger = logging.getLogger(__name__)
-
self.db = IntelligenceDB()
self.processor = DataProcessor()
self.alert = AlertService()
- # 数据采集器注册
- self.collectors = {
- "news": NewsAPICollector(API_KEYS['newsapi']),
- "complaint": ComplaintSpider(
- base_url=DATA_SOURCES['blackcat'],
- rate_limit=30 # 30秒爬取间隔
- )
- }
-
def run_daily_pipeline(self):
"""每日数据采集处理流程"""
try:
+ self.logger.info("开始执行每日数据采集流程")
+
# 阶段1:数据采集
raw_data = self._collect_data()
@@ -61,6 +51,7 @@ class IntelligenceSystem:
# 阶段5:异常检测
self._check_alerts()
+ self.logger.info("每日流程执行完成")
except Exception as e:
self.logger.error(f"主流程执行失败: {str(e)}", exc_info=True)
self.alert.send_critical(f"系统异常: {str(e)}")
@@ -128,58 +119,63 @@ class IntelligenceSystem:
"""生成报告并发送"""
try:
# 日报生成
+ report_date = datetime.now().date()
report_html = ReportGenerator(self.db).generate_daily()
- with open(f"reports/daily_{datetime.now().date()}.html", 'w') as f:
+
+ report_path = f"reports/daily_{report_date}.html"
+ with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_html)
+ self.logger.info(f"日报已生成: {report_path}")
# 每月1号生成月报
if datetime.now().day == 1:
monthly_report = ReportGenerator(self.db).generate_monthly()
- send_email(
- to="team@example.com",
- subject=f"{datetime.now().strftime('%Y-%m')} 情报月报",
- content=monthly_report
- )
+ # 这里替换为实际的邮件发送逻辑
+ self.logger.info("月度报告已生成(邮件发送功能需配置)")
except Exception as e:
self.logger.error(f"报告生成失败: {str(e)}")
def _check_alerts(self):
"""检查预警信息"""
- # 负面舆情监测
negative_keywords = ['投诉', '造假', '违规']
alerts = self.alert.check_negative(negative_keywords)
if alerts:
- self.alert.send_urgent(
- "负面舆情警报",
- "\n".join([f"[{a['source']}] {a['content']}" for a in alerts])
- )
+ alert_msg = "\n".join([f"[{a['source']}] {a['content']}" for a in alerts])
+ self.logger.warning(f"发现负面舆情:\n{alert_msg}")
+ # 这里替换为实际的通知发送逻辑
+ self.alert.send_urgent("负面舆情警报", alert_msg)
def cleanup(self):
"""资源清理"""
self.db.close()
self.logger.info("系统资源已释放")
-if __name__ == "__main__":
+def run_scheduled():
+ """定时任务执行入口"""
system = IntelligenceSystem()
-
try:
- # 执行每日任务
- if len(sys.argv) > 1 and sys.argv[1] == "--manual":
+ while True:
+ now = datetime.now()
+ if now.hour == 9 and now.minute == 0: # 每天9点执行
+ system.run_daily_pipeline()
+ time.sleep(60) # 避免重复执行
+ time.sleep(30)
+ except KeyboardInterrupt:
+ system.cleanup()
+
+if __name__ == "__main__":
+ if len(sys.argv) > 1 and sys.argv[1] == "--manual":
+ # 手动执行模式
+ system = IntelligenceSystem()
+ try:
system.logger.info("手动执行模式启动")
system.run_daily_pipeline()
- else:
- # 定时任务模式(实际部署时改用crontab或APScheduler)
- system.logger.info("定时任务模式启动")
- while True:
- now = datetime.now()
- if now.hour == 9 and now.minute == 0: # 每天9点执行
- system.run_daily_pipeline()
- time.sleep(60) # 避免重复执行
- time.sleep(30)
-
- except KeyboardInterrupt:
- system.logger.info("用户中断执行")
- finally:
- system.cleanup()
\ No newline at end of file
+ finally:
+ system.cleanup()
+ else:
+ # 定时任务模式
+ print("情报收集系统已启动(定时模式)")
+ print("按 Ctrl+C 退出")
+ run_scheduled()
\ No newline at end of file
diff --git a/utils/logger.py b/utils/logger.py
index a89b623..7b5c73e 100644
--- a/utils/logger.py
+++ b/utils/logger.py
@@ -16,6 +16,7 @@ from logging.handlers import TimedRotatingFileHandler
from datetime import datetime
import platform
+
class CrossPlatformLogger:
def __init__(self, name="intelligence_system"):
"""
@@ -65,8 +66,8 @@ class CrossPlatformLogger:
# Windows终端特殊处理
if self.system == 'windows' and not sys.stdout.isatty():
+ import colorama
try:
- import colorama
colorama.init()
except ImportError:
pass
@@ -112,10 +113,12 @@ class CrossPlatformLogger:
"""获取配置好的日志实例"""
return CrossPlatformLogger(name).logger
+
def setup_logging(name: str = "intelligence_system"):
"""快速配置日志(兼容旧代码)"""
return CrossPlatformLogger(name).logger
+
# 测试代码
if __name__ == "__main__":
logger = CrossPlatformLogger().logger
@@ -123,4 +126,4 @@ if __name__ == "__main__":
try:
1 / 0
except Exception as e:
- logger.error("除零错误示例", exc_info=True)
\ No newline at end of file
+ logger.error("除零错误示例", exc_info=True)