# system_management/scheduler/task_scheduler.py import importlib import time from datetime import datetime, timedelta from typing import Dict, List, Optional from storage.mysql_agent import MySQLAgent from pathlib import Path # 使用您的日志系统 from utils.logger import CrossPlatformLog log = CrossPlatformLog.get_logger("TaskScheduler") class TaskScheduler: def __init__(self, db_config: Optional[Dict] = None): """ 初始化任务调度器 Args: db_config (Optional[Dict]): 可选的数据库配置,默认使用MySQLAgent默认配置 """ self.db = MySQLAgent(db_config or {}) # 使用您提供的MySQLAgent self._init_task_table() log.info("TaskScheduler initialized") def _init_task_table(self): """确保任务表存在并包含必要字段""" if not self.db.table_exists("main_task"): log.info("Creating main_task table") create_sql = """ CREATE TABLE main_task ( task_id INT AUTO_INCREMENT PRIMARY KEY, task_name VARCHAR(100) NOT NULL, module_path VARCHAR(255) NOT NULL COMMENT '例如data_collection.spiders.weibo_spider', frequency_type ENUM('minute','hourly','daily','weekly','monthly') NOT NULL, frequency_value INT DEFAULT NULL COMMENT '间隔数值', last_run_time DATETIME DEFAULT NULL, next_run_time DATETIME DEFAULT NULL, last_run_status VARCHAR(20) DEFAULT NULL, is_active TINYINT(1) DEFAULT 1, is_running TINYINT(1) DEFAULT 0, run_count INT DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_next_run (next_run_time), INDEX idx_active (is_active) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ self.db.execute_sql(create_sql) log.success("main_task table created") def run_pending_tasks(self) -> Dict[str, int]: """ 执行所有到期的活跃任务 Returns: Dict[str, int]: 包含执行结果的字典 { 'total': 总任务数, 'success': 成功数, 'failed': 失败数 } """ result = {'total': 0, 'success': 0, 'failed': 0} try: # 使用您提供的query_to_df方法获取任务 tasks_df = self.db.query_to_df( "SELECT * FROM main_task " "WHERE is_active = 1 AND next_run_time <= %s " "ORDER BY next_run_time", params=(datetime.now(),) ) result['total'] = len(tasks_df) if tasks_df.empty: log.debug("No pending tasks found") return result for _, task in tasks_df.iterrows(): task_id = task['task_id'] log.bind(task_id=task_id).info( f"Starting task {task['task_name']}" ) # 标记任务为执行中 self._update_task_status( task_id, {'is_running': 1, 'last_run_time': datetime.now()} ) try: self._execute_single_task(task) self._update_task_status( task_id, { 'last_run_status': 'success', 'is_running': 0, 'run_count': task['run_count'] + 1, 'next_run_time': self._calculate_next_run( task['frequency_type'], task['frequency_value'] ) } ) result['success'] += 1 log.bind(task_id=task_id).success("Task completed") except Exception as e: log.bind(task_id=task_id).error( f"Task failed: {str(e)}", exc_info=True ) self._update_task_status( task_id, { 'last_run_status': 'failed', 'is_running': 0, 'next_run_time': self._calculate_next_run( task['frequency_type'], task['frequency_value'], retry=True ) } ) result['failed'] += 1 log.info( "Scheduler cycle completed", total_tasks=result['total'], success=result['success'], failed=result['failed'] ) return result except Exception as e: log.critical( "Scheduler main loop failed", exc_info=True ) raise def _execute_single_task(self, task: Dict) -> None: """执行单个任务模块""" start_time = time.time() task_log = log.bind( task_id=task['task_id'], module=task['module_path'] ) try: module = importlib.import_module(task['module_path']) if not hasattr(module, 'main'): raise ImportError(f"Module has no main() function") # 执行任务 task_log.debug("Task execution started") module.main() elapsed = time.time() - start_time task_log.info( f"Task completed in {elapsed:.2f}s", duration=elapsed ) except Exception as e: task_log.error( "Task execution failed", exc_info=True ) raise def _update_task_status(self, task_id: int, updates: Dict) -> None: """更新任务状态""" set_clause = ", ".join([f"{k}=%s" for k in updates.keys()]) sql = f"UPDATE main_task SET {set_clause} WHERE task_id=%s" params = list(updates.values()) + [task_id] try: affected = self.db.execute_sql(sql, params=params) if affected != 1: log.warning( "Unexpected row count in update", task_id=task_id, expected=1, affected=affected ) except Exception as e: log.error( "Failed to update task status", task_id=task_id, exc_info=True ) raise def _calculate_next_run(self, freq_type: str, freq_value: Optional[int] = None, retry: bool = False) -> datetime: """ 计算下次执行时间(带重试逻辑) """ base_time = datetime.now() if retry: # 失败后15分钟重试 log.debug("Calculating retry time") return base_time + timedelta(minutes=15) if freq_type == 'minute': delta = timedelta(minutes=freq_value or 1) elif freq_type == 'hourly': delta = timedelta(hours=freq_value or 1) elif freq_type == 'daily': delta = timedelta(days=freq_value or 1) elif freq_type == 'weekly': delta = timedelta(weeks=freq_value or 1) elif freq_type == 'monthly': # 处理月末日期特殊情况 next_month = (base_time.replace(day=1) + timedelta(days=32)).replace(day=1) last_day = (next_month - timedelta(days=1)).day day = min(base_time.day, last_day) return base_time.replace(day=1, month=next_month.month, day=day) else: raise ValueError(f"Unknown frequency type: {freq_type}") return base_time + delta def add_task(self, task_name: str, module_path: str, frequency_type: str, frequency_value: Optional[int] = None) -> int: """ 添加新任务到调度系统 """ sql = """ INSERT INTO main_task (task_name, module_path, frequency_type, frequency_value, next_run_time) VALUES (%s, %s, %s, %s, %s) """ next_run = self._calculate_next_run(frequency_type, frequency_value) params = (task_name, module_path, frequency_type, frequency_value, next_run) try: self.db.execute_sql(sql, params=params) task_id = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id").iloc[0]['id'] log.info( "New task added", task_id=task_id, task_name=task_name, next_run=next_run ) return task_id except Exception as e: log.error( "Failed to add new task", task_name=task_name, exc_info=True ) raise def get_task_status(self, active_only: bool = True) -> pd.DataFrame: """ 获取任务状态 """ where = "WHERE is_active = 1" if active_only else "" log.debug("Fetching task status", active_only=active_only) return self.db.query_to_df( f""" SELECT task_id, task_name, module_path, frequency_type, frequency_value, last_run_time, next_run_time, last_run_status, run_count, is_active, is_running FROM main_task {where} ORDER BY next_run_time """ )