diff --git a/doc/readme.md b/doc/readme.md index 6844b0e..d096495 100644 --- a/doc/readme.md +++ b/doc/readme.md @@ -58,9 +58,8 @@ intelligence_system/ │ └── notification_center.py # 邮件/短信通知 ├── system_management/ # 系统管理层 -│ ├── scheduler/ # 任务调度 -│ │ ├── task_scheduler.py # 分布式任务调度器 -│ │ └── cron_manager.py # 定时规则配置 +│ ├── scheduler/ # 任务调度 +│ │ └── task_scheduler.py # 任务调度器 │ │ │ └── monitor/ # 系统监控 │ ├── health_monitor.py # 服务健康检测 diff --git a/doc/数据库操作.md b/doc/数据库操作.md index 2e1da29..712ac9f 100644 --- a/doc/数据库操作.md +++ b/doc/数据库操作.md @@ -1,63 +1,51 @@ -目录 -1. -类概述 -2. -初始化配置 -3. -基础CRUD操作 -4. -表结构管理 -5. -事务管理 -6. -高级功能 -7. -注意事项 -8. -示例代码 -9. -性能优化 -10. -错误处理 -类概述 -MySQLAgent 是一个全平台兼容的MySQL数据库操作类,支持Windows/macOS/Linux系统,提供连接池管理、数据操作和事务处理等功能。 +# MySQLAgent 使用文档 -​核心特性:​​ +**最后更新于:2023-08-06** +**代码版本:1.2.0** -• -线程安全的连接池管理 -• -自动适配各平台配置 -• -支持DataFrame直接交互 -• -完善的事务处理机制 -• -详细的日志记录 -初始化配置 -基本配置参数 -python -下载 -复制 -运行 +> **环境要求:** +> - Python ≥ 3.8 +> - PyMySQL ≥ 1.0.2 +> - pandas ≥ 1.3.0 + +--- + +## 1. 类概述 + +`MySQLAgent` 是一个全平台兼容的 MySQL 数据库操作类,支持 Windows/macOS/Linux 系统,提供连接池管理、数据操作和事务处理等功能。 + +### 核心特性: + +- ✅ 线程安全的连接池管理 +- ✅ 自动适配各平台配置 +- ✅ 支持 DataFrame 直接交互 +- ✅ 完善的事务处理机制 +- ✅ 详细的日志记录 + +--- + +## 2. 初始化配置 + +### 基本配置参数 +```python { -'host': 'localhost', # 数据库主机 -'port': 3306, # 端口 -'user': 'root', # 用户名 -'password': '123123', # 密码 -'database': 'test_db', # 数据库名 -'charset': 'utf8mb4', # 字符集(默认utf8mb4) -'max_connections': 5, # 最大连接数(默认5) -'connect_timeout': 10, # 连接超时(秒) -'read_timeout': 30, # 读取超时(秒) -'write_timeout': 30, # 写入超时(秒) -'ssl': None # SSL配置 + 'host': 'localhost', # 数据库主机 + 'port': 3306, # 端口 + 'user': 'root', # 用户名 + 'password': '123123', # 密码 + 'database': 'test_db', # 数据库名 + 'charset': 'utf8mb4', # 字符集(默认 utf8mb4) + 'max_connections': 5, # 最大连接数(默认 5) + 'connect_timeout': 10, # 连接超时(秒) + 'read_timeout': 30, # 读取超时(秒) + 'write_timeout': 30, # 写入超时(秒) + 'ssl': None # SSL 配置 } -获取平台默认配置 -python -下载 -复制 -运行 +``` + + +### 获取平台默认配置 +```python from mysql_agent import get_default_config # 自动根据当前操作系统返回优化配置 @@ -65,334 +53,189 @@ config = get_default_config() # 可覆盖默认值 config.update({ -'host': '192.168.1.100', -'database': 'production_db' + 'host': '192.168.1.100', + 'database': 'production_db' }) db = MySQLAgent(config) -各平台特殊配置 -平台 默认超时 SSL配置 批处理优化 -Windows 10/30/30秒 禁用 小批次(100-500) -macOS 15/60/60秒 自动检测证书 中批次(500-1000) -Linux 15/60/60秒 禁用 大批次(1000+) -基础CRUD操作 -查询数据 -python -下载 -复制 -运行 -# 返回DataFrame +``` + +### 各平台特殊配置 +| 平台 | 默认超时(连接/读/写) | SSL 配置 | 批处理优化 | +|---------|------------------------|----------------|------------------| +| Windows | 10/30/30 秒 | 禁用 | 小批次 (100-500) | +| macOS | 15/60/60 秒 | 自动检测证书 | 中批次 (500-1000)| +| Linux | 15/60/60 秒 | 禁用 | 大批次 (1000+) | + +## 3. 基础CRUD操作 +### 查询数据 +```python +# 返回 DataFrame df = db.query_to_df( -"SELECT * FROM users WHERE age > %s", -params=(18,), # 参数可以是元组或字典 -parse_dates=['create_time'] # 自动解析日期字段 + "SELECT * FROM users WHERE age > %s", + params=(18,), + parse_dates=['create_time'] # 自动解析日期字段 ) -# 直接执行SQL返回原始结果 +# 直接执行 SQL 返回原始结果 result = db.execute_sql( -"SELECT name, email FROM users WHERE status = %s", -params={'status': 1}, # 使用字典参数 -fetch=True # 设为True返回查询结果 + "SELECT name, email FROM users WHERE status = %s", + params={'status': 1}, + fetch=True # 设为 True 返回查询结果 ) -插入数据 -python -下载 -复制 -运行 +``` + +### 插入数据 +```python # 单条插入 data = {'name': '张三', 'age': 25} db.execute_sql( -"INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)", -params=data + "INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)", + params=data ) -# 批量插入DataFrame +# 批量插入 DataFrame import pandas as pd new_users = pd.DataFrame({ -'name': ['李四', '王五'], -'age': [28, 32] + 'name': ['李四', '王五'], + 'age': [28, 32] }) inserted_rows = db.insert_from_df( -'users', -new_users, -chunk_size=500 # 分批插入大小 + 'users', + new_users, + chunk_size=500 # 分批插入大小 ) -更新数据 -python -下载 -复制 -运行 +``` + +### 更新数据 +```python # 条件更新 db.execute_sql( -"UPDATE users SET status = %s WHERE last_login < %s", -params=(0, '2023-01-01') + "UPDATE users SET status = %s WHERE last_login < %s", + params=(0, '2023-01-01') ) -# 使用DataFrame更新 +# 使用 DataFrame 更新 update_df = pd.DataFrame({ -'id': [1, 2], -'status': [1, 0] + 'id': [1, 2], + 'status': [1, 0] }) affected_rows = db.update_from_df( -'users', -update_df, -key_columns='id' # 用于匹配记录的关键列 + 'users', + update_df, + key_columns='id' # 用于匹配记录的关键列 ) -删除数据 -python -下载 -复制 -运行 +``` + +### 删除数据 +```python # 条件删除 db.execute_sql( -"DELETE FROM logs WHERE created_at < %s", -params=('2022-01-01',) + "DELETE FROM logs WHERE created_at < %s", + params=('2022-01-01',) ) -表结构管理 -创建表 -python -下载 -复制 -运行 -# 根据DataFrame自动创建表 +``` + +## 4. 表结构管理 +### 创建表 +```python +# 根据 DataFrame 自动创建表 sample_data = pd.DataFrame({ -'id': pd.Series(dtype='int'), -'name': pd.Series(dtype='str'), -'created_at': pd.Series(dtype='datetime64[ns]') + 'id': pd.Series(dtype='int'), + 'name': pd.Series(dtype='str'), + 'created_at': pd.Series(dtype='datetime64[ns]') }) db.create_table_from_df( -'new_table', -sample_data, -primary_key='id' # 指定主键 + 'new_table', + sample_data, + primary_key='id' # 指定主键 ) # 手动创建表 db.execute_sql(""" CREATE TABLE IF NOT EXISTS products ( -id INT AUTO_INCREMENT PRIMARY KEY, -name VARCHAR(100) NOT NULL, -price DECIMAL(10,2), -stock INT DEFAULT 0 + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(100) NOT NULL, + price DECIMAL(10,2), + stock INT DEFAULT 0 ) """) -表操作 -python -下载 -复制 -运行 +``` + +### 表操作 +```python # 检查表是否存在 if db.table_exists('users'): -print("用户表已存在") + print("用户表已存在") # 删除表 db.drop_table('temp_table') # 获取表结构 schema = db._get_table_info('products') -事务管理 -基本事务 -python -下载 -复制 -运行 +``` +### 字段修改 +```python +# 字段b修改为c并转换数据类型为datetime +try: + db.execute_sql("ALTER TABLE a CHANGE COLUMN b c DATETIME") +except pymysql.err.InternalError as e: + print(f"修改失败: {str(e)}") +``` + +## 5. 事务管理 +### 基本事务 +```python conn = db.begin_transaction() try: -cursor = conn.cursor() -cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") -cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") -db.commit_transaction(conn) + cursor = conn.cursor() + cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") + cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") + db.commit_transaction(conn) except Exception as e: -db.rollback_transaction(conn) + db.rollback_transaction(conn) raise -上下文管理器 -python -下载 -复制 -运行 +``` + +### 上下文管理器 +```python with db.begin_transaction() as conn: -conn.cursor().execute("INSERT INTO logs (message) VALUES ('Transaction start')") -# 其他操作... -# 无需显式commit/rollback -高级功能 -大数据量处理 -python -下载 -复制 -运行 + conn.cursor().execute("INSERT INTO logs (message) VALUES ('Transaction start')") + # 其他操作... + # 无需显式 commit/rollback +``` + +## 6. 高级功能 +### 大数据量处理 +```python # 分块读取大数据 chunk_size = 10000 for chunk in pd.read_sql_query( -"SELECT * FROM large_table", -con=db.get_connection(), -chunksize=chunk_size + "SELECT * FROM large_table", + con=db.get_connection(), + chunksize=chunk_size ): -process_chunk(chunk) + process_chunk(chunk) # 批量插入优化 -large_df = generate_large_data() # 假设返回10万行数据 +large_df = generate_large_data() # 假设返回 10 万行数据 db.insert_from_df( -'target_table', -large_df, -chunk_size=2000 # 根据平台自动调整 + 'target_table', + large_df, + chunk_size=2000 # 根据平台自动调整 ) -并发查询 -python -下载 -复制 -运行 +``` + +### 并发查询 +```python from concurrent.futures import ThreadPoolExecutor def fetch_user(user_id): -return db.query_to_df( -"SELECT * FROM users WHERE id = %s", -params=(user_id,) -) + return db.query_to_df( + "SELECT * FROM users WHERE id = %s", + params=(user_id,) + ) with ThreadPoolExecutor(max_workers=10) as executor: -results = list(executor.map(fetch_user, range(1, 1001))) -注意事项 -1. -​连接管理​ - -• -获取连接后必须确保关闭 -• -推荐使用with语句或try/finally -2. -​事务隔离​ - -• -长时间事务会占用连接池资源 -• -复杂事务考虑使用存储过程 -3. -​性能要点​ - -• -Windows平台减少批次大小 -• -macOS注意SSL证书路径 -• -Linux可增大连接池大小 -4. -​类型映射​ - -Pandas类型 MySQL类型 -int64 BIGINT -float64 DOUBLE -datetime64 DATETIME -object TEXT -示例代码 -完整业务场景 -python -下载 -复制 -运行 -class OrderSystem: -def __init__(self): -self.db = MySQLAgent(get_default_config()) - - def create_order(self, user_id, items): - """创建订单(完整事务示例)""" - conn = self.db.begin_transaction() - try: - # 1. 插入订单主表 - cursor = conn.cursor() - cursor.execute( - "INSERT INTO orders (user_id, total) VALUES (%s, %s)", - (user_id, sum(item['price']*item['quantity'] for item in items)) - ) - order_id = cursor.lastrowid - - # 2. 插入订单明细 - order_items = pd.DataFrame([{ - 'order_id': order_id, - 'product_id': item['product_id'], - 'quantity': item['quantity'], - 'price': item['price'] - } for item in items]) - - self.db.insert_from_df('order_items', order_items, conn=conn) - - # 3. 更新库存 - for item in items: - cursor.execute( - "UPDATE products SET stock = stock - %s WHERE id = %s", - (item['quantity'], item['product_id']) - ) - - self.db.commit_transaction(conn) - return order_id - except Exception as e: - self.db.rollback_transaction(conn) - raise -性能优化 -1. -​连接池调优​ - -python -下载 -复制 -运行 -# 生产环境推荐配置 -config = { -**get_default_config(), -'max_connections': 20, # 根据服务器配置调整 -'maxcached': 15, # 最大空闲连接 -'ping': 2 # 连接检查级别 -} -2. -​查询优化​ - -• -使用EXPLAIN分析慢查询 -• -添加适当索引 -• -避免SELECT * -3. -​批处理建议​ - -操作类型 Windows macOS/Linux -插入批次 100-500 1000-5000 -更新批次 50-200 500-2000 -错误处理 -常见错误码处理 -python -下载 -复制 -运行 -try: -db.execute_sql("INSERT INTO...") -except pymysql.err.IntegrityError as e: -# 唯一键冲突 -if e.args[0] == 1062: -handle_duplicate_entry() -except pymysql.err.OperationalError as e: -# 连接超时 -if "timed out" in str(e): -retry_connection() -except Exception as e: -logger.error(f"Database error: {str(e)}") -raise -连接重试机制 -python -下载 -复制 -运行 -def safe_query(sql, params=None, max_retries=3): -for attempt in range(max_retries): -try: -return db.query_to_df(sql, params) -except (pymysql.err.OperationalError, pymysql.err.InterfaceError) as e: -if attempt == max_retries - 1: -raise -time.sleep(2 ** attempt) # 指数退避 -​提示​:本文档对应代码版本1.2.0,最后更新于2023-08-06。使用前请确保您的环境满足: - -• -Python ≥ 3.8 -• -PyMySQL ≥ 1.0.2 -• -pandas ≥ 1.3.0 + results = list(executor.map(fetch_user, range(1, 1001))) +``` \ No newline at end of file diff --git a/main.py b/main.py index c78525e..afa9770 100644 --- a/main.py +++ b/main.py @@ -1,181 +1,111 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -情报收集系统主程序(明文配置版) -功能: -1. 调度数据采集、处理、存储流程 -2. 生成日报/月报 -3. 异常监控和报警 -""" - -import sys -import logging +# main.py +import signal import time -import threading from datetime import datetime -from typing import Dict, List, Any +from system_management.scheduler.task_scheduler import TaskScheduler +from utils.logger import CrossPlatformLog + +# 初始化日志 +log = CrossPlatformLog.get_logger("Main") -# 自定义模块 -from processors.data_processor import DataProcessor -from storage.mysql_agent import IntelligenceDB -from applications.reporter import ReportGenerator -from applications.alert import AlertService -from utils.logger import setup_logging class IntelligenceSystem: - def __init__(self): - # 初始化核心组件 - setup_logging() - self.logger = logging.getLogger(__name__) - self.db = IntelligenceDB() - self.processor = DataProcessor() - self.alert = AlertService() + def __init__(self, db_config=None): + self.scheduler = TaskScheduler(db_config) + self._running = False + log.info("IntelligenceSystem initialized") + + def run(self): + """启动系统主循环""" + self._running = True + self._register_signal_handlers() + + log.info("Starting main loop") - def run_daily_pipeline(self): - """每日数据采集处理流程""" try: - self.logger.info("开始执行每日数据采集流程") + while self._running: + start_time = time.time() + self._run_cycle() - # 阶段1:数据采集 - raw_data = self._collect_data() + # 精确控制循环间隔(扣除执行时间) + elapsed = time.time() - start_time + sleep_time = max(0, 60 - elapsed) + time.sleep(sleep_time) - # 阶段2:数据处理 - processed_data = self._process_data(raw_data) - - # 阶段3:数据存储 - self._store_data(processed_data) - - # 阶段4:生成日报 - self._generate_reports() - - # 阶段5:异常检测 - self._check_alerts() - - self.logger.info("每日流程执行完成") + except KeyboardInterrupt: + log.info("Received keyboard interrupt") except Exception as e: - self.logger.error(f"主流程执行失败: {str(e)}", exc_info=True) - self.alert.send_critical(f"系统异常: {str(e)}") + log.critical( + "System crashed", + exc_info=True + ) + raise + finally: + self.shutdown() - def _collect_data(self) -> Dict[str, List[Dict]]: - """执行所有数据采集任务""" - collected = {} - for name, collector in self.collectors.items(): - try: - self.logger.info(f"开始采集 {name} 数据...") - data = collector.fetch_data({ - 'keywords': '汽车后市场', - 'max_results': 100 - }) - collected[name] = data - self.logger.info(f"{name} 采集完成,共 {len(data)} 条数据") - except Exception as e: - self.logger.error(f"{name} 采集器异常: {str(e)}") - continue - return collected + def _run_cycle(self): + """单个运行周期""" + try: + # 1. 执行任务调度 + result = self.scheduler.run_pending_tasks() - def _process_data(self, raw_data: Dict) -> Dict: - """处理原始数据""" - processed = {} - for data_type, items in raw_data.items(): - processed[data_type] = [] - for item in items: - try: - # 文本数据标准处理 - if data_type in ['news', 'complaint']: - result = self.processor.process_text(item['content']) - processed_item = { - **item, - 'keywords': result['keywords'], - 'category': result['category'] - } - processed[data_type].append(processed_item) + # 2. 每小时记录系统状态 + if datetime.now().minute == 0: + self._log_system_status() - # 图像处理(预留接口) - elif data_type == 'images': - processed[data_type].append( - self.processor.image_to_text(item) - ) - except Exception as e: - self.logger.warning(f"数据处理失败: {item.get('id', '')} - {str(e)}") - continue - return processed + except Exception as e: + log.error( + "Cycle execution failed", + exc_info=True + ) + raise - def _store_data(self, processed_data: Dict): - """存储到数据库""" - for data_type, items in processed_data.items(): - success_count = 0 - for item in items: - try: - if self.db.insert_data(data_type, item): - success_count += 1 - except Exception as e: - self.logger.error(f"数据存储失败: {str(e)}") + def _log_system_status(self): + """记录系统状态""" + try: + status_df = self.scheduler.get_task_status() + pending = len(status_df[status_df['next_run_time'] <= datetime.now()]) - self.logger.info( - f"{data_type} 数据存储完成,成功 {success_count}/{len(items)} 条" + log.info( + "System status", + pending_tasks=pending, + active_tasks=len(status_df), + last_success=status_df['last_run_time'].max() + ) + except Exception as e: + log.error( + "Failed to log system status", + exc_info=True ) - def _generate_reports(self): - """生成报告并发送""" - try: - # 日报生成 - report_date = datetime.now().date() - report_html = ReportGenerator(self.db).generate_daily() + def _register_signal_handlers(self): + """注册信号处理""" + signal.signal(signal.SIGINT, self._handle_shutdown) + signal.signal(signal.SIGTERM, self._handle_shutdown) + log.debug("Signal handlers registered") - report_path = f"reports/daily_{report_date}.html" - with open(report_path, 'w', encoding='utf-8') as f: - f.write(report_html) - self.logger.info(f"日报已生成: {report_path}") + def _handle_shutdown(self, signum, frame): + """处理关闭信号""" + log.info( + f"Processing shutdown signal {signum}", + signal=signum + ) + self._running = False - # 每月1号生成月报 - if datetime.now().day == 1: - monthly_report = ReportGenerator(self.db).generate_monthly() - # 这里替换为实际的邮件发送逻辑 - self.logger.info("月度报告已生成(邮件发送功能需配置)") + def shutdown(self): + """关闭系统""" + log.info("Performing system shutdown") + # 此处可添加其他清理逻辑 + log.success("System shutdown completed") - except Exception as e: - self.logger.error(f"报告生成失败: {str(e)}") - - def _check_alerts(self): - """检查预警信息""" - negative_keywords = ['投诉', '造假', '违规'] - alerts = self.alert.check_negative(negative_keywords) - - if alerts: - alert_msg = "\n".join([f"[{a['source']}] {a['content']}" for a in alerts]) - self.logger.warning(f"发现负面舆情:\n{alert_msg}") - # 这里替换为实际的通知发送逻辑 - self.alert.send_urgent("负面舆情警报", alert_msg) - - def cleanup(self): - """资源清理""" - self.db.close() - self.logger.info("系统资源已释放") - -def run_scheduled(): - """定时任务执行入口""" - system = IntelligenceSystem() - try: - while True: - now = datetime.now() - if now.hour == 9 and now.minute == 0: # 每天9点执行 - system.run_daily_pipeline() - time.sleep(60) # 避免重复执行 - time.sleep(30) - except KeyboardInterrupt: - system.cleanup() if __name__ == "__main__": - if len(sys.argv) > 1 and sys.argv[1] == "--manual": - # 手动执行模式 + try: system = IntelligenceSystem() - try: - system.logger.info("手动执行模式启动") - system.run_daily_pipeline() - finally: - system.cleanup() - else: - # 定时任务模式 - print("情报收集系统已启动(定时模式)") - print("按 Ctrl+C 退出") - run_scheduled() \ No newline at end of file + system.run() + except Exception as e: + log.critical( + "System startup failed", + exc_info=True + ) + raise diff --git a/system_management/scheduler/task_scheduler.py b/system_management/scheduler/task_scheduler.py new file mode 100644 index 0000000..201fc42 --- /dev/null +++ b/system_management/scheduler/task_scheduler.py @@ -0,0 +1,277 @@ +# system_management/scheduler/task_scheduler.py +import importlib +import time +from datetime import datetime, timedelta +from typing import Dict, List, Optional +from storage.mysql_agent import MySQLAgent +from pathlib import Path + +# 使用您的日志系统 +from utils.logger import CrossPlatformLog +log = CrossPlatformLog.get_logger("TaskScheduler") + +class TaskScheduler: + def __init__(self, db_config: Optional[Dict] = None): + """ + 初始化任务调度器 + + Args: + db_config (Optional[Dict]): 可选的数据库配置,默认使用MySQLAgent默认配置 + """ + self.db = MySQLAgent(db_config or {}) # 使用您提供的MySQLAgent + self._init_task_table() + log.info("TaskScheduler initialized") + + def _init_task_table(self): + """确保任务表存在并包含必要字段""" + if not self.db.table_exists("main_task"): + log.info("Creating main_task table") + create_sql = """ + CREATE TABLE main_task ( + task_id INT AUTO_INCREMENT PRIMARY KEY, + task_name VARCHAR(100) NOT NULL, + module_path VARCHAR(255) NOT NULL COMMENT '例如data_collection.spiders.weibo_spider', + frequency_type ENUM('minute','hourly','daily','weekly','monthly') NOT NULL, + frequency_value INT DEFAULT NULL COMMENT '间隔数值', + last_run_time DATETIME DEFAULT NULL, + next_run_time DATETIME DEFAULT NULL, + last_run_status VARCHAR(20) DEFAULT NULL, + is_active TINYINT(1) DEFAULT 1, + is_running TINYINT(1) DEFAULT 0, + run_count INT DEFAULT 0, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_next_run (next_run_time), + INDEX idx_active (is_active) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + self.db.execute_sql(create_sql) + log.success("main_task table created") + + def run_pending_tasks(self) -> Dict[str, int]: + """ + 执行所有到期的活跃任务 + + Returns: + Dict[str, int]: 包含执行结果的字典 { + 'total': 总任务数, + 'success': 成功数, + 'failed': 失败数 + } + """ + result = {'total': 0, 'success': 0, 'failed': 0} + + try: + # 使用您提供的query_to_df方法获取任务 + tasks_df = self.db.query_to_df( + "SELECT * FROM main_task " + "WHERE is_active = 1 AND next_run_time <= %s " + "ORDER BY next_run_time", + params=(datetime.now(),) + ) + + result['total'] = len(tasks_df) + + if tasks_df.empty: + log.debug("No pending tasks found") + return result + + for _, task in tasks_df.iterrows(): + task_id = task['task_id'] + log.bind(task_id=task_id).info( + f"Starting task {task['task_name']}" + ) + + # 标记任务为执行中 + self._update_task_status( + task_id, + {'is_running': 1, 'last_run_time': datetime.now()} + ) + + try: + self._execute_single_task(task) + self._update_task_status( + task_id, + { + 'last_run_status': 'success', + 'is_running': 0, + 'run_count': task['run_count'] + 1, + 'next_run_time': self._calculate_next_run( + task['frequency_type'], + task['frequency_value'] + ) + } + ) + result['success'] += 1 + log.bind(task_id=task_id).success("Task completed") + except Exception as e: + log.bind(task_id=task_id).error( + f"Task failed: {str(e)}", + exc_info=True + ) + self._update_task_status( + task_id, + { + 'last_run_status': 'failed', + 'is_running': 0, + 'next_run_time': self._calculate_next_run( + task['frequency_type'], + task['frequency_value'], + retry=True + ) + } + ) + result['failed'] += 1 + + log.info( + "Scheduler cycle completed", + total_tasks=result['total'], + success=result['success'], + failed=result['failed'] + ) + return result + + except Exception as e: + log.critical( + "Scheduler main loop failed", + exc_info=True + ) + raise + + def _execute_single_task(self, task: Dict) -> None: + """执行单个任务模块""" + start_time = time.time() + task_log = log.bind( + task_id=task['task_id'], + module=task['module_path'] + ) + + try: + module = importlib.import_module(task['module_path']) + + if not hasattr(module, 'main'): + raise ImportError(f"Module has no main() function") + + # 执行任务 + task_log.debug("Task execution started") + module.main() + + elapsed = time.time() - start_time + task_log.info( + f"Task completed in {elapsed:.2f}s", + duration=elapsed + ) + + except Exception as e: + task_log.error( + "Task execution failed", + exc_info=True + ) + raise + + def _update_task_status(self, task_id: int, updates: Dict) -> None: + """更新任务状态""" + set_clause = ", ".join([f"{k}=%s" for k in updates.keys()]) + sql = f"UPDATE main_task SET {set_clause} WHERE task_id=%s" + + params = list(updates.values()) + [task_id] + + try: + affected = self.db.execute_sql(sql, params=params) + if affected != 1: + log.warning( + "Unexpected row count in update", + task_id=task_id, + expected=1, + affected=affected + ) + except Exception as e: + log.error( + "Failed to update task status", + task_id=task_id, + exc_info=True + ) + raise + + def _calculate_next_run(self, freq_type: str, freq_value: Optional[int] = None, + retry: bool = False) -> datetime: + """ + 计算下次执行时间(带重试逻辑) + """ + base_time = datetime.now() + + if retry: + # 失败后15分钟重试 + log.debug("Calculating retry time") + return base_time + timedelta(minutes=15) + + if freq_type == 'minute': + delta = timedelta(minutes=freq_value or 1) + elif freq_type == 'hourly': + delta = timedelta(hours=freq_value or 1) + elif freq_type == 'daily': + delta = timedelta(days=freq_value or 1) + elif freq_type == 'weekly': + delta = timedelta(weeks=freq_value or 1) + elif freq_type == 'monthly': + # 处理月末日期特殊情况 + next_month = (base_time.replace(day=1) + timedelta(days=32)).replace(day=1) + last_day = (next_month - timedelta(days=1)).day + day = min(base_time.day, last_day) + return base_time.replace(day=1, month=next_month.month, day=day) + else: + raise ValueError(f"Unknown frequency type: {freq_type}") + + return base_time + delta + + def add_task(self, task_name: str, module_path: str, frequency_type: str, + frequency_value: Optional[int] = None) -> int: + """ + 添加新任务到调度系统 + """ + sql = """ + INSERT INTO main_task + (task_name, module_path, frequency_type, frequency_value, next_run_time) + VALUES (%s, %s, %s, %s, %s) + """ + + next_run = self._calculate_next_run(frequency_type, frequency_value) + params = (task_name, module_path, frequency_type, frequency_value, next_run) + + try: + self.db.execute_sql(sql, params=params) + task_id = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id").iloc[0]['id'] + log.info( + "New task added", + task_id=task_id, + task_name=task_name, + next_run=next_run + ) + return task_id + except Exception as e: + log.error( + "Failed to add new task", + task_name=task_name, + exc_info=True + ) + raise + + def get_task_status(self, active_only: bool = True) -> pd.DataFrame: + """ + 获取任务状态 + """ + where = "WHERE is_active = 1" if active_only else "" + log.debug("Fetching task status", active_only=active_only) + return self.db.query_to_df( + f""" + SELECT + task_id, task_name, module_path, + frequency_type, frequency_value, + last_run_time, next_run_time, + last_run_status, run_count, + is_active, is_running + FROM main_task + {where} + ORDER BY next_run_time + """ + ) \ No newline at end of file