From cf78104a5bf6781340643d5e19e75ab8507b8262 Mon Sep 17 00:00:00 2001 From: z66 <1415243231@qq.com> Date: Tue, 9 Sep 2025 16:29:20 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E7=B3=BB=E7=BB=9F=E5=BC=80?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/sqldialects.xml | 2 +- doc/{前端展示.md => 前端展示-废弃.md} | 0 doc/数据库操作.md | 2 +- main.py | 107 ++---- readme.md | 10 +- system_management/scheduler/task_scheduler.py | 361 +++++++----------- test/数据库链接测试.py | 3 +- {storage => utils}/mysql_agent.py | 0 8 files changed, 188 insertions(+), 297 deletions(-) rename doc/{前端展示.md => 前端展示-废弃.md} (100%) rename {storage => utils}/mysql_agent.py (100%) diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml index 250a1dc..d49b9e3 100644 --- a/.idea/sqldialects.xml +++ b/.idea/sqldialects.xml @@ -1,7 +1,7 @@ - + \ No newline at end of file diff --git a/doc/前端展示.md b/doc/前端展示-废弃.md similarity index 100% rename from doc/前端展示.md rename to doc/前端展示-废弃.md diff --git a/doc/数据库操作.md b/doc/数据库操作.md index 712ac9f..b26e7f5 100644 --- a/doc/数据库操作.md +++ b/doc/数据库操作.md @@ -28,7 +28,7 @@ ### 基本配置参数 ```python -{ +Config = { 'host': 'localhost', # 数据库主机 'port': 3306, # 端口 'user': 'root', # 用户名 diff --git a/main.py b/main.py index afa9770..0518e42 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,3 @@ -# main.py import signal import time from datetime import datetime @@ -11,101 +10,63 @@ log = CrossPlatformLog.get_logger("Main") class IntelligenceSystem: def __init__(self, db_config=None): - self.scheduler = TaskScheduler(db_config) + """初始化系统(仅作为容器,不包含业务逻辑)""" + self.scheduler = TaskScheduler(db_config, max_workers=5) self._running = False - log.info("IntelligenceSystem initialized") + log.info("情报系统已初始化(Cron模式)") - def run(self): - """启动系统主循环""" + def start(self): + """启动系统主入口""" self._running = True - self._register_signal_handlers() - - log.info("Starting main loop") + self._setup_signal_handlers() + log.info("系统启动 - 运行在Cron调度模式") try: + # 主循环 - 仅负责定期检查任务 while self._running: - start_time = time.time() - self._run_cycle() + # 检查并执行到期任务 + self.scheduler.check_and_run_tasks() - # 精确控制循环间隔(扣除执行时间) - elapsed = time.time() - start_time - sleep_time = max(0, 60 - elapsed) - time.sleep(sleep_time) + # 短间隔轮询(每10秒检查一次,保证Cron时间精度) + time.sleep(10) - except KeyboardInterrupt: - log.info("Received keyboard interrupt") except Exception as e: - log.critical( - "System crashed", - exc_info=True - ) - raise + log.critical("系统主循环崩溃", exc_info=True) finally: self.shutdown() - def _run_cycle(self): - """单个运行周期""" - try: - # 1. 执行任务调度 - result = self.scheduler.run_pending_tasks() - - # 2. 每小时记录系统状态 - if datetime.now().minute == 0: - self._log_system_status() - - except Exception as e: - log.error( - "Cycle execution failed", - exc_info=True - ) - raise - - def _log_system_status(self): - """记录系统状态""" - try: - status_df = self.scheduler.get_task_status() - pending = len(status_df[status_df['next_run_time'] <= datetime.now()]) - - log.info( - "System status", - pending_tasks=pending, - active_tasks=len(status_df), - last_success=status_df['last_run_time'].max() - ) - except Exception as e: - log.error( - "Failed to log system status", - exc_info=True - ) - - def _register_signal_handlers(self): - """注册信号处理""" + def _setup_signal_handlers(self): + """设置系统信号处理器""" signal.signal(signal.SIGINT, self._handle_shutdown) signal.signal(signal.SIGTERM, self._handle_shutdown) - log.debug("Signal handlers registered") + log.debug("信号处理器已注册") def _handle_shutdown(self, signum, frame): - """处理关闭信号""" - log.info( - f"Processing shutdown signal {signum}", - signal=signum - ) + """处理系统关闭信号""" + log.info(f"收到关闭信号 {signum},开始关闭系统") self._running = False def shutdown(self): - """关闭系统""" - log.info("Performing system shutdown") - # 此处可添加其他清理逻辑 - log.success("System shutdown completed") + """优雅关闭系统""" + log.info("开始优雅关闭系统") + + # 等待所有正在执行的任务完成 + self.scheduler.executor.shutdown(wait=True, cancel_futures=False) + + # 记录最终状态 + pending_count = self.scheduler.get_pending_tasks_count() + log.info( + "系统关闭完成", + pending_tasks=pending_count, + shutdown_time=datetime.now() + ) if __name__ == "__main__": try: + # 启动系统 - 仅作为入口,不包含调度逻辑 system = IntelligenceSystem() - system.run() + system.start() except Exception as e: - log.critical( - "System startup failed", - exc_info=True - ) + log.critical("情报系统启动失败", exc_info=True) raise diff --git a/readme.md b/readme.md index d096495..358cc39 100644 --- a/readme.md +++ b/readme.md @@ -8,8 +8,8 @@ https://alidocs.dingtalk.com/i/nodes/NZQYprEoWoexdo1ohPdxXvDbJ1waOeDk?utm_scene= intelligence_system/ ├── config/ # 系统配置中心 │ ├── __init__.py # 配置包初始化 -│ ├── settings.py # 主配置文件(数据库连接、API密钥等) -│ └── scheduler_rules.yaml # 任务调度规则 +│ ├── config.py # 配置加载与管理 +│ └── constants.py # 系统常量定义 ├── data_collection/ # 数据采集层 │ ├── spiders/ # 网络爬虫子系统 @@ -36,10 +36,6 @@ intelligence_system/ │ ├── sentiment_analyzer.py # 情感分析模型 │ └── topic_modeler.py # LDA主题建模工具 -├── storage/ # 数据存储层 -│ ├── mysql_agent.py # MySQL读写管理器 -│ └── query_builder.py # SQL动态构建器 - ├── services/ # 应用服务层 │ ├── monitoring/ # 舆情监控 │ │ ├── opinion_monitor.py # 实时舆情追踪 @@ -68,10 +64,12 @@ intelligence_system/ ├── utils/ # 工具库 │ ├── file_handler.py # 通用文件操作 │ ├── logger.py # 日志系统 +│ ├── mysql_agent.py # MySQL读写管理器 │ └── datetime_parser.py # 时间格式处理 └── main.py # 系统入口(启动所有服务) ``` + ### 程序设计原则 1. 所有程序尽可能在py文件中运行,尽量避免使用命令行执行 2. 配置需要在配置类中定义 diff --git a/system_management/scheduler/task_scheduler.py b/system_management/scheduler/task_scheduler.py index 201fc42..f04c1be 100644 --- a/system_management/scheduler/task_scheduler.py +++ b/system_management/scheduler/task_scheduler.py @@ -1,277 +1,210 @@ -# system_management/scheduler/task_scheduler.py import importlib import time -from datetime import datetime, timedelta -from typing import Dict, List, Optional +from datetime import datetime +from typing import Dict, List, Optional, Any +import croniter +import pytz +from concurrent.futures import ThreadPoolExecutor, as_completed +import pandas as pd from storage.mysql_agent import MySQLAgent -from pathlib import Path - -# 使用您的日志系统 from utils.logger import CrossPlatformLog + +# 初始化调度器日志 log = CrossPlatformLog.get_logger("TaskScheduler") class TaskScheduler: - def __init__(self, db_config: Optional[Dict] = None): - """ - 初始化任务调度器 + def __init__(self, db_config: Optional[Dict] = None, max_workers: int = 5): + """初始化任务调度器(基于Cron表达式)""" + self.db = MySQLAgent(db_config or {}) + self.executor = ThreadPoolExecutor(max_workers=max_workers) + log.info(f"任务调度器已初始化,最大工作线程数: {max_workers}") - Args: - db_config (Optional[Dict]): 可选的数据库配置,默认使用MySQLAgent默认配置 - """ - self.db = MySQLAgent(db_config or {}) # 使用您提供的MySQLAgent - self._init_task_table() - log.info("TaskScheduler initialized") - - def _init_task_table(self): - """确保任务表存在并包含必要字段""" - if not self.db.table_exists("main_task"): - log.info("Creating main_task table") - create_sql = """ - CREATE TABLE main_task ( - task_id INT AUTO_INCREMENT PRIMARY KEY, - task_name VARCHAR(100) NOT NULL, - module_path VARCHAR(255) NOT NULL COMMENT '例如data_collection.spiders.weibo_spider', - frequency_type ENUM('minute','hourly','daily','weekly','monthly') NOT NULL, - frequency_value INT DEFAULT NULL COMMENT '间隔数值', - last_run_time DATETIME DEFAULT NULL, - next_run_time DATETIME DEFAULT NULL, - last_run_status VARCHAR(20) DEFAULT NULL, - is_active TINYINT(1) DEFAULT 1, - is_running TINYINT(1) DEFAULT 0, - run_count INT DEFAULT 0, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX idx_next_run (next_run_time), - INDEX idx_active (is_active) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 - """ - self.db.execute_sql(create_sql) - log.success("main_task table created") - - def run_pending_tasks(self) -> Dict[str, int]: - """ - 执行所有到期的活跃任务 - - Returns: - Dict[str, int]: 包含执行结果的字典 { - 'total': 总任务数, - 'success': 成功数, - 'failed': 失败数 - } - """ - result = {'total': 0, 'success': 0, 'failed': 0} + def check_and_run_tasks(self) -> Dict[str, int]: + """检查并执行所有到期的任务""" + result = {'总任务数': 0, '成功': 0, '失败': 0} try: - # 使用您提供的query_to_df方法获取任务 - tasks_df = self.db.query_to_df( - "SELECT * FROM main_task " - "WHERE is_active = 1 AND next_run_time <= %s " - "ORDER BY next_run_time", - params=(datetime.now(),) - ) + # 获取当前时间(带时区) + now = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None) - result['total'] = len(tasks_df) + # 查询所有到期的活跃任务 + tasks_df = self.db.query_to_df(""" + SELECT * FROM main_task + WHERE is_active = 1 + AND next_run_time <= %s + AND is_running = 0 + ORDER BY next_run_time + """, params=(now,)) + result['总任务数'] = len(tasks_df) if tasks_df.empty: - log.debug("No pending tasks found") + log.debug("没有到期的任务需要执行") return result + # 并发执行任务 + futures = [] for _, task in tasks_df.iterrows(): - task_id = task['task_id'] - log.bind(task_id=task_id).info( - f"Starting task {task['task_name']}" - ) - - # 标记任务为执行中 - self._update_task_status( - task_id, - {'is_running': 1, 'last_run_time': datetime.now()} - ) + futures.append(self.executor.submit(self._process_single_task, task)) + # 收集执行结果 + for future in as_completed(futures): try: - self._execute_single_task(task) - self._update_task_status( - task_id, - { - 'last_run_status': 'success', - 'is_running': 0, - 'run_count': task['run_count'] + 1, - 'next_run_time': self._calculate_next_run( - task['frequency_type'], - task['frequency_value'] - ) - } - ) - result['success'] += 1 - log.bind(task_id=task_id).success("Task completed") + if future.result(): + result['成功'] += 1 + else: + result['失败'] += 1 except Exception as e: - log.bind(task_id=task_id).error( - f"Task failed: {str(e)}", - exc_info=True - ) - self._update_task_status( - task_id, - { - 'last_run_status': 'failed', - 'is_running': 0, - 'next_run_time': self._calculate_next_run( - task['frequency_type'], - task['frequency_value'], - retry=True - ) - } - ) - result['failed'] += 1 + log.error(f"任务线程执行失败: {str(e)}") + result['失败'] += 1 log.info( - "Scheduler cycle completed", - total_tasks=result['total'], - success=result['success'], - failed=result['failed'] + "任务调度周期完成", + 总任务数=result['总任务数'], + 成功=result['成功'], + 失败=result['失败'] ) return result except Exception as e: - log.critical( - "Scheduler main loop failed", - exc_info=True - ) + log.critical("调度器主循环执行失败", exc_info=True) raise - def _execute_single_task(self, task: Dict) -> None: - """执行单个任务模块""" - start_time = time.time() - task_log = log.bind( - task_id=task['task_id'], - module=task['module_path'] - ) + def _process_single_task(self, task: Dict[str, Any]) -> bool: + """处理单个任务(线程安全)""" + task_id = task['task_id'] + task_log = log.bind(task_id=task_id, task_name=task['task_name']) + task_log.info(f"开始执行任务: {task['task_name']}") try: - module = importlib.import_module(task['module_path']) + # 标记任务为运行中 + self._update_task_status(task_id, { + 'is_running': 1, + 'last_run_time': datetime.now() + }) - if not hasattr(module, 'main'): - raise ImportError(f"Module has no main() function") + # 执行任务逻辑 + self._execute_task_logic(task) - # 执行任务 - task_log.debug("Task execution started") - module.main() - - elapsed = time.time() - start_time - task_log.info( - f"Task completed in {elapsed:.2f}s", - duration=elapsed + # 计算下次运行时间(基于Cron表达式) + next_run_time = self._calculate_next_run_time( + cron_expr=task['cron_expression'], + time_zone=task.get('time_zone', 'Asia/Shanghai') ) + # 更新任务状态为成功 + self._update_task_status(task_id, { + 'last_run_status': 'success', + 'is_running': 0, + 'run_count': task['run_count'] + 1, + 'next_run_time': next_run_time + }) + task_log.info(f"任务执行成功: {task['task_name']}") + return True + except Exception as e: - task_log.error( - "Task execution failed", - exc_info=True - ) + task_log.error(f"任务执行失败: {str(e)}", exc_info=True) + + # 失败时计算下次重试时间(15分钟后) + next_retry_time = datetime.now() + pd.Timedelta(minutes=15) + + self._update_task_status(task_id, { + 'last_run_status': 'failed', + 'is_running': 0, + 'next_run_time': next_retry_time + }) + return False + + def _execute_task_logic(self, task: Dict[str, Any]) -> None: + """执行任务的具体逻辑(动态导入模块)""" + start_time = time.time() + task_log = log.bind(task_id=task['task_id'], module=task['module_path']) + + try: + # 动态导入任务模块 + module = importlib.import_module(task['module_path']) + if not hasattr(module, 'main'): + raise ImportError(f"模块 {task['module_path']} 中未找到 main() 函数") + + task_log.debug("开始执行模块中的 main() 函数") + module.main() # 调用任务主函数 + task_log.info(f"任务执行完成,耗时: {time.time() - start_time:.2f}秒") + + except Exception as e: + task_log.error("任务逻辑执行失败", exc_info=True) raise - def _update_task_status(self, task_id: int, updates: Dict) -> None: - """更新任务状态""" - set_clause = ", ".join([f"{k}=%s" for k in updates.keys()]) - sql = f"UPDATE main_task SET {set_clause} WHERE task_id=%s" + def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime: + """基于Cron表达式计算下次运行时间""" + try: + tz = pytz.timezone(time_zone) + now = datetime.now(tz) + cron = croniter.croniter(cron_expr, now) + next_run = cron.get_next(datetime) + return next_run.replace(tzinfo=None) # 移除时区信息,适应数据库存储 + except Exception as e: + log.error(f"Cron表达式解析失败: {cron_expr}, 错误: {str(e)}") + raise ValueError(f"无效的Cron表达式: {cron_expr}") + def _update_task_status(self, task_id: int, updates: Dict[str, Any]) -> None: + """更新任务状态到数据库""" + set_clause = ", ".join([f"{k}=%s" for k in updates.keys()]) + sql = f"UPDATE main_task SET {set_clause}, updated_at=NOW() WHERE task_id=%s" params = list(updates.values()) + [task_id] try: - affected = self.db.execute_sql(sql, params=params) - if affected != 1: + affected_rows = self.db.execute_sql(sql, params=params) + if affected_rows != 1: log.warning( - "Unexpected row count in update", + "任务状态更新异常", task_id=task_id, - expected=1, - affected=affected + 预期影响行数=1, + 实际影响行数=affected_rows ) except Exception as e: - log.error( - "Failed to update task status", - task_id=task_id, - exc_info=True - ) + log.error(f"任务状态更新失败,task_id: {task_id}", exc_info=True) raise - def _calculate_next_run(self, freq_type: str, freq_value: Optional[int] = None, - retry: bool = False) -> datetime: - """ - 计算下次执行时间(带重试逻辑) - """ - base_time = datetime.now() + def add_task(self, + task_name: str, + task_type: str, + module_path: str, + cron_expression: str, + time_zone: str = 'Asia/Shanghai') -> int: + """添加新的Cron任务""" + if not cron_expression: + raise ValueError("Cron表达式不能为空") - if retry: - # 失败后15分钟重试 - log.debug("Calculating retry time") - return base_time + timedelta(minutes=15) + # 计算首次运行时间 + first_run_time = self._calculate_next_run_time(cron_expression, time_zone) - if freq_type == 'minute': - delta = timedelta(minutes=freq_value or 1) - elif freq_type == 'hourly': - delta = timedelta(hours=freq_value or 1) - elif freq_type == 'daily': - delta = timedelta(days=freq_value or 1) - elif freq_type == 'weekly': - delta = timedelta(weeks=freq_value or 1) - elif freq_type == 'monthly': - # 处理月末日期特殊情况 - next_month = (base_time.replace(day=1) + timedelta(days=32)).replace(day=1) - last_day = (next_month - timedelta(days=1)).day - day = min(base_time.day, last_day) - return base_time.replace(day=1, month=next_month.month, day=day) - else: - raise ValueError(f"Unknown frequency type: {freq_type}") - - return base_time + delta - - def add_task(self, task_name: str, module_path: str, frequency_type: str, - frequency_value: Optional[int] = None) -> int: - """ - 添加新任务到调度系统 - """ + # 插入数据库 sql = """ INSERT INTO main_task - (task_name, module_path, frequency_type, frequency_value, next_run_time) - VALUES (%s, %s, %s, %s, %s) + (task_name, task_type, module_path, cron_expression, time_zone, + next_run_time, is_active) + VALUES (%s, %s, %s, %s, %s, %s, 1) """ - - next_run = self._calculate_next_run(frequency_type, frequency_value) - params = (task_name, module_path, frequency_type, frequency_value, next_run) + params = (task_name, task_type, module_path, cron_expression, time_zone, first_run_time) try: self.db.execute_sql(sql, params=params) task_id = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id").iloc[0]['id'] log.info( - "New task added", + f"新任务添加成功", task_id=task_id, task_name=task_name, - next_run=next_run + cron表达式=cron_expression, + 首次运行时间=first_run_time ) return task_id except Exception as e: - log.error( - "Failed to add new task", - task_name=task_name, - exc_info=True - ) + log.error(f"添加任务失败: {task_name}", exc_info=True) raise - def get_task_status(self, active_only: bool = True) -> pd.DataFrame: - """ - 获取任务状态 - """ - where = "WHERE is_active = 1" if active_only else "" - log.debug("Fetching task status", active_only=active_only) - return self.db.query_to_df( - f""" - SELECT - task_id, task_name, module_path, - frequency_type, frequency_value, - last_run_time, next_run_time, - last_run_status, run_count, - is_active, is_running - FROM main_task - {where} - ORDER BY next_run_time - """ - ) \ No newline at end of file + def get_pending_tasks_count(self) -> int: + """获取当前等待执行的任务数量""" + result = self.db.query_to_df(""" + SELECT COUNT(*) AS count FROM main_task + WHERE is_active = 1 AND next_run_time <= %s + """, params=(datetime.now(),)) + return result.iloc[0]['count'] if not result.empty else 0 diff --git a/test/数据库链接测试.py b/test/数据库链接测试.py index a4f8991..27b3556 100644 --- a/test/数据库链接测试.py +++ b/test/数据库链接测试.py @@ -1,10 +1,9 @@ import unittest import pandas as pd from datetime import datetime -import tempfile import time import pymysql -from storage.mysql_agent import MySQLAgent +from utils.mysql_agent import MySQLAgent import platform class TestMySQLAgent(unittest.TestCase): diff --git a/storage/mysql_agent.py b/utils/mysql_agent.py similarity index 100% rename from storage/mysql_agent.py rename to utils/mysql_agent.py