Files
intelligence_system/main.py
T
2025-08-05 17:13:19 +08:00

181 lines
6.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
情报收集系统主程序(明文配置版)
功能:
1. 调度数据采集、处理、存储流程
2. 生成日报/月报
3. 异常监控和报警
"""
import sys
import logging
import time
import threading
from datetime import datetime
from typing import Dict, List, Any
# 自定义模块
from processors.data_processor import DataProcessor
from storage.database import IntelligenceDB
from applications.reporter import ReportGenerator
from applications.alert import AlertService
from utils.logger import setup_logging
class IntelligenceSystem:
def __init__(self):
# 初始化核心组件
setup_logging()
self.logger = logging.getLogger(__name__)
self.db = IntelligenceDB()
self.processor = DataProcessor()
self.alert = AlertService()
def run_daily_pipeline(self):
"""每日数据采集处理流程"""
try:
self.logger.info("开始执行每日数据采集流程")
# 阶段1:数据采集
raw_data = self._collect_data()
# 阶段2:数据处理
processed_data = self._process_data(raw_data)
# 阶段3:数据存储
self._store_data(processed_data)
# 阶段4:生成日报
self._generate_reports()
# 阶段5:异常检测
self._check_alerts()
self.logger.info("每日流程执行完成")
except Exception as e:
self.logger.error(f"主流程执行失败: {str(e)}", exc_info=True)
self.alert.send_critical(f"系统异常: {str(e)}")
def _collect_data(self) -> Dict[str, List[Dict]]:
"""执行所有数据采集任务"""
collected = {}
for name, collector in self.collectors.items():
try:
self.logger.info(f"开始采集 {name} 数据...")
data = collector.fetch_data({
'keywords': '汽车后市场',
'max_results': 100
})
collected[name] = data
self.logger.info(f"{name} 采集完成,共 {len(data)} 条数据")
except Exception as e:
self.logger.error(f"{name} 采集器异常: {str(e)}")
continue
return collected
def _process_data(self, raw_data: Dict) -> Dict:
"""处理原始数据"""
processed = {}
for data_type, items in raw_data.items():
processed[data_type] = []
for item in items:
try:
# 文本数据标准处理
if data_type in ['news', 'complaint']:
result = self.processor.process_text(item['content'])
processed_item = {
**item,
'keywords': result['keywords'],
'category': result['category']
}
processed[data_type].append(processed_item)
# 图像处理(预留接口)
elif data_type == 'images':
processed[data_type].append(
self.processor.image_to_text(item)
)
except Exception as e:
self.logger.warning(f"数据处理失败: {item.get('id', '')} - {str(e)}")
continue
return processed
def _store_data(self, processed_data: Dict):
"""存储到数据库"""
for data_type, items in processed_data.items():
success_count = 0
for item in items:
try:
if self.db.insert_data(data_type, item):
success_count += 1
except Exception as e:
self.logger.error(f"数据存储失败: {str(e)}")
self.logger.info(
f"{data_type} 数据存储完成,成功 {success_count}/{len(items)}"
)
def _generate_reports(self):
"""生成报告并发送"""
try:
# 日报生成
report_date = datetime.now().date()
report_html = ReportGenerator(self.db).generate_daily()
report_path = f"reports/daily_{report_date}.html"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_html)
self.logger.info(f"日报已生成: {report_path}")
# 每月1号生成月报
if datetime.now().day == 1:
monthly_report = ReportGenerator(self.db).generate_monthly()
# 这里替换为实际的邮件发送逻辑
self.logger.info("月度报告已生成(邮件发送功能需配置)")
except Exception as e:
self.logger.error(f"报告生成失败: {str(e)}")
def _check_alerts(self):
"""检查预警信息"""
negative_keywords = ['投诉', '造假', '违规']
alerts = self.alert.check_negative(negative_keywords)
if alerts:
alert_msg = "\n".join([f"[{a['source']}] {a['content']}" for a in alerts])
self.logger.warning(f"发现负面舆情:\n{alert_msg}")
# 这里替换为实际的通知发送逻辑
self.alert.send_urgent("负面舆情警报", alert_msg)
def cleanup(self):
"""资源清理"""
self.db.close()
self.logger.info("系统资源已释放")
def run_scheduled():
"""定时任务执行入口"""
system = IntelligenceSystem()
try:
while True:
now = datetime.now()
if now.hour == 9 and now.minute == 0: # 每天9点执行
system.run_daily_pipeline()
time.sleep(60) # 避免重复执行
time.sleep(30)
except KeyboardInterrupt:
system.cleanup()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--manual":
# 手动执行模式
system = IntelligenceSystem()
try:
system.logger.info("手动执行模式启动")
system.run_daily_pipeline()
finally:
system.cleanup()
else:
# 定时任务模式
print("情报收集系统已启动(定时模式)")
print("按 Ctrl+C 退出")
run_scheduled()