Files

149 lines
8.8 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
## 情报收集系统设计
### 参考文档
https://alidocs.dingtalk.com/i/nodes/NZQYprEoWoexdo1ohPdxXvDbJ1waOeDk?utm_scene=team_space
### 程序框架(当前实现)
```angular2html
intelligence_system/
├── collectors/ # 数据采集层
│ ├── complaint_spider.py # 投诉信息爬虫(结构化入库/附件走MinIO)
│ ├── rss_subscriptions.py # RSS 订阅抓取
│ └── internal/ # 内部数据收集(保留)
│ └── jian_dao_cloud.py # 简道云表单收集器(示例/占位)
├── processors/ # 数据处理层
│ ├── processor_rss_data.py # RSS数据清洗、分词、过滤与入库
│ ├── keywords.txt # 行业关键词(用于分词/过滤)
│ ├── stopwords.txt # 停用词
│ └── ai_engine/
│ └── ai_proessor_rss_data # 预留(AI分析扩展占位)
├── services/ # 应用服务层(保留)
│ ├── monitoring/ # 舆情监控
│ │ ├── opinion_monitor.py # 实时舆情追踪(占位)
│ │ └── brand_reputation.py # 品牌口碑分析(占位)
│ ├── analysis/ # 竞品分析
│ │ ├── competitor_tracker.py # 竞品动态监控(占位)
│ │ └── swot_generator.py # SWOT分析报告(占位)
│ ├── reporting/ # 报告服务
│ │ ├── daily_reporter.py # 自动化日报生成(占位)
│ │ └── weekly_digest.py # 周报汇编系统(占位)
│ └── alert/ # 预警服务
│ ├── alert_trigger.py # 动态阈值告警(占位)
│ └── notification_center.py # 邮件/短信通知(占位)
├── applications/ # 应用层
│ ├── alert.py # 告警触发/通知(占位/实现中)
│ └── reporter/
│ ├── daily.py # 日报生成
│ └── monthly.py # 月报生成
├── system_management/ # 系统管理层
│ ├── scheduler/
│ │ ├── task_scheduler.py # 任务调度器(Cron表达式 + 线程池)
│ │ └── task_management.py # 任务管理辅助
│ └── monitor/ # 系统监控(目录占位)
├── utils/ # 工具库
│ ├── file_handler.py # 通用文件操作
│ ├── logger.py # 跨平台日志系统(Loguru)
│ ├── mysql_agent.py # MySQL读写管理器
│ └── minio_agent.py # MinIO对象存储客户端
├── config.py # 配置加载与管理(含数据库/存储配置)
├── main.py # 系统入口(Cron轮询 + 调度执行)
└── requirements.txt # 依赖清单
```
### 程序设计原则
1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行
2. 配置需要在配置类中定义
3. 密钥等信息直接放在配置类中
4. 数据存储遵循"结构化存MySQL,非结构化存MinIO"原则,通过元数据关联
### 主程序与调度设计(已实现)
主程序以长运行进程方式启动,进入轻量轮询循环(每10秒)。调度器按Cron表达式在`main_task`表中拉取到期任务,使用线程池异步执行,并在每分钟输出运行状态、每小时汇总统计。
- 调度器能力:
- 基于`croniter`解析Cron表达式,支持时区(默认`Asia/Shanghai`
- 线程池并发执行,信号量限制最大并发(与`max_workers`一致)
- 任务入口动态解析:支持`package.module``package.module.ClassName.main``package.module.func` 等形式
- 成功/失败后自动计算`next_run_time`或设置15分钟后重试
- 关键字段自动更新:`is_running``last_run_time``last_run_status``run_count``next_run_time`
- 主循环:
- 每10秒检查一次待运行任务
- 每分钟打印当前周期统计;每小时写入累计统计日志
- 支持`SIGINT/SIGTERM`优雅关闭,等待正在运行的任务完成
### 日志设计(已实现)
跨平台日志系统(Loguru)输出至`logs/`目录:
- application.log:主日志,`rotation = 20MB`,达到阈值后压缩为`application.log.YYYYMMDD.zip``retention = 30天`
- errors.log:错误日志(ERROR及以上),`rotation = 10MB``retention = 90天`
- 结构化扩展字段:日志支持`extra`键值对,自动美化并对长字段(如`sql``params`)截断
建议记录的业务事件:
- MySQL读写操作要点(表名、影响行数、事务状态)
- MinIO对象操作(对象路径、大小、耗时、状态)
- 任务执行上下文(task_id、task_name、module_path、耗时、状态)
### 存储系统设计(MinIO+MySQL
#### 核心存储分工
| 存储类型 | 适用数据 | 核心作用 |
|----------|----------|----------|
| MySQL | 结构化数据、元数据、关系型数据 | 存储业务逻辑数据、非结构化数据的索引信息、任务调度信息等 |
| MinIO | 非结构化数据 | 存储图片、视频、PDF文档、原始爬取文件等二进制/大文件数据 |
#### 核心存储配置
1. **MySQL配置**
- 数据库名称:`intelligence_system`
- 连接管理:通过`utils/mysql_agent.py`封装线程安全的连接池,提供结构化数据的增删改查及SQL执行能力
- 适配特性:支持多平台(Windows/macOS/Linux)的超时配置和批处理优化
2. **MinIO配置**
- 存储桶命名规则:按数据类型划分,如`collector-images`(采集层图片)、`processor-videos`(处理层视频)
- 连接管理:通过`utils/minio_agent.py`封装客户端,提供对象上传、下载、删除、查询URL等能力
- 路径规则:`{数据层}/{来源}/{时间戳}_{唯一ID}.{后缀}`(例:`collector/weibo_spider/20240520_12345.jpg`
#### 表命名规则(扩展)
- 数据采集类:以`collector_`为前缀(存储采集到的结构化数据及MinIO对象元数据)
- 数据处理类:以`processor_`为前缀(存储处理结果的结构化数据及MinIO处理后对象的元数据)
- 数据存储类:以`storage_`为前缀(存储MinIO对象的索引信息,如哈希、大小、访问权限等)
- 应用层类:以`application_`为前缀(对应业务应用数据)
- 系统类:如任务调度表等采用功能命名(如`main_task`
#### 核心表结构(当前落地)
1. `main_task`:任务调度表(`task_name``task_type``module_path``cron_expression``time_zone``run_count``is_running``last_run_time``last_run_status``next_run_time``is_active` 等)
2. `collector_rss_subscriptions`RSS源采集数据(`文章标题``文章摘要``发布时间``来源URL``文章链接``是否已处理` 等)
3. `processed_rss_data`RSS处理结果(`分词结果``是否汽车相关``处理时间` 等)
4. `collector_complaint_spider`:投诉信息爬虫数据(含文本与附件MinIO路径`attachment_minio_path`等)
5. 可选:`storage_object_index`(建议用于统一索引MinIO对象元数据)
### 数据采集设计
1. 结构化数据(RSS、投诉文本):写入`collector_`前缀表
2. 非结构化数据(附件/图片等):
- 使用`utils/minio_agent.py`上传至对应存储桶
- 将对象路径与元数据写入业务表或`storage_object_index`
3. 采集模块需同时处理MySQL与MinIO交互,确保关联完整
### 数据处理设计(RSS流程已实现)
`processors/processor_rss_data.py`流程:
-`collector_rss_subscriptions`加载未处理数据(可配置`limit`
- 加载停用词与行业关键词(`stopwords.txt` / `keywords.txt`),并动态注入`jieba`词典
- 标注词性并过滤停用词,仅保留与汽车后市场相关的词汇
- 标记与过滤:出现任一行业关键词即视为相关,进入保存
- 将结果写入`processed_rss_data`,并回写源表`是否已处理 = 1`
- 输出处理统计(总量、命中量、命中率、时间)
### 依赖与运行
- 依赖:见`requirements.txt`pandas、SQLAlchemy、PyMySQL、croniter、pytz、loguru、jieba、feedparser、beautifulsoup4、minio 等)
- 配置:在`config.py`中设置`MYSQL_CONFIG`与MinIO参数
- 运行:
- 启动主程序:`python main.py`
- 添加任务:向`main_task`插入记录,`module_path`可指向如`processors.processor_rss_data.main`