import importlib import threading import time from datetime import datetime from typing import Dict, List, Optional, Any import croniter import pytz from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd from sqlalchemy.exc import SQLAlchemyError from utils.mysql_agent import MySQLAgent from utils.logger import CrossPlatformLog # 初始化调度器日志 log = CrossPlatformLog.get_logger("TaskScheduler") class TaskScheduler: def __init__(self, db_config: Optional[Dict] = None, max_workers: int = 5): """初始化任务调度器(基于Cron表达式)""" self.db = MySQLAgent(db_config or {}) self.executor = ThreadPoolExecutor(max_workers=max_workers) # 并发容量控制:限制同时运行的后台任务不超过 max_workers self._running_semaphore = threading.Semaphore(max_workers) # 任务统计 self.hourly_stats = {'成功': 0, '失败': 0, '总数': 0} self.hourly_stats_lock = threading.Lock() log.info(f"任务调度器已初始化,最大工作线程数: {max_workers}") def _resolve_callable(self, module_path: str): """解析模块路径,支持模块、模块内类/函数,并返回可调用对象 兼容以下形式: - package.module -> 期望模块内存在 main() - package.module.ClassName -> 调用 ClassName.main() 或实例化后调用 main() - package.module.func_name -> 直接调用该函数 - package.module.ClassName.method_name -> 调用指定方法 """ if not module_path or not isinstance(module_path, str): raise ImportError("无效的模块路径") parts = module_path.split('.') last_import_error = None # 从最长前缀开始尝试导入模块,逐步回退 for i in range(len(parts), 0, -1): module_name = '.'.join(parts[:i]) try: module = importlib.import_module(module_name) attr_chain = parts[i:] # 从模块开始逐级解析属性 target = module for attr in attr_chain: if not hasattr(target, attr): raise AttributeError(f"在 {target} 中未找到属性: {attr}") target = getattr(target, attr) # 若目标是类,优先尝试类方法/实例方法 main if isinstance(target, type): # 类方法 main if hasattr(target, 'main') and callable(getattr(target, 'main')): return getattr(target, 'main') # 实例方法 main try: instance = target() if hasattr(instance, 'main') and callable(getattr(instance, 'main')): return getattr(instance, 'main') except Exception: pass # 不把“类本身”当作任务入口(否则只会构造实例不执行 main) raise AttributeError(f"类 {target.__name__} 缺少可调用的 main() 作为任务入口") # 目标非类:若本身可调用,则直接作为入口返回 if callable(target): return target # 否则尝试对象上的 main() if hasattr(target, 'main') and callable(getattr(target, 'main')): return getattr(target, 'main') raise AttributeError(f"路径 {module_path} 未解析到可调用入口(缺少 main 或不可调用)") except Exception as e: last_import_error = e continue # 如果所有尝试均失败,则抛出最后的错误 raise ImportError(f"模块 {module_path} 导入/解析失败: {str(last_import_error)}") def check_and_run_tasks(self, print_empty_status: bool = False) -> Dict[str, int]: """检查并执行所有到期的任务,优化空任务处理和异常容错 Args: print_empty_status: 是否打印空任务状态(默认False,避免频繁输出) """ result = {'总任务数': 0, '成功': 0, '失败': 0} try: # 获取当前时间(带时区转换为本地时间) tz = pytz.timezone('Asia/Shanghai') now = datetime.now(tz).replace(tzinfo=None) # 移除时区信息,与数据库存储一致 log.debug(f"当前检查时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") # 查询所有到期的活跃任务(使用参数化查询防止注入) tasks_df = self.db.query_to_df(""" SELECT * FROM main_task WHERE is_active = 1 AND next_run_time <= %s AND is_running = 0 ORDER BY next_run_time """, params=(now,),is_print=False) result['总任务数'] = len(tasks_df) if tasks_df.empty: # 空任务时根据参数决定是否输出 if print_empty_status: print(f"当前没有到期的任务,等待新任务加入...{now.strftime('%Y-%m-%d %H:%M:%S')}") return result # 并发执行任务 futures = [] for _, task in tasks_df.iterrows(): # 传递任务字典的副本避免线程安全问题 task_copy = task.to_dict() futures.append(self.executor.submit(self._process_single_task, task_copy)) # 收集执行结果 for future in as_completed(futures): try: if future.result(): result['成功'] += 1 else: result['失败'] += 1 except Exception as e: log.error(f"任务线程执行失败: {str(e)}", exc_info=True) result['失败'] += 1 # 更新小时统计 with self.hourly_stats_lock: self.hourly_stats['成功'] += result['成功'] self.hourly_stats['失败'] += result['失败'] self.hourly_stats['总数'] += result['总任务数'] log.info( "任务调度周期完成", 总任务数=result['总任务数'], 成功=result['成功'], 失败=result['失败'] ) return result except SQLAlchemyError as e: # 数据库异常处理优化 log.error(f"数据库操作失败,将在下次轮询重试: {str(e)}", exc_info=True) return result # 不中断,返回当前结果 except Exception as e: log.error("调度器周期执行异常,将在下次轮询重试", exc_info=True) return result # 不中断主循环,允许下次重试 def _process_single_task(self, task: Dict[str, Any]) -> bool: """处理单个任务(线程安全)""" task_id = task['task_id'] task_name = task['task_name'] task_log = log.bind(task_id=task_id, task_name=task_name) task_log.info(f"开始执行任务: {task_name}") try: # 阻塞等待可用的执行槽位,保证同时运行的任务不超过最大工作线程数 self._running_semaphore.acquire() # 标记任务为运行中(使用当前时间的时区感知对象) tz = pytz.timezone(task.get('time_zone', 'Asia/Shanghai')) current_time = datetime.now(tz).replace(tzinfo=None) self._update_task_status(task_id, { 'is_running': 1, 'last_run_time': current_time }) # 将任务主体放到后台线程执行,当前线程快速返回 self.executor.submit(self._run_task_async, task.copy()) task_log.debug("任务已提交至后台执行队列") return True # 表示已成功提交 except Exception as e: task_log.error(f"任务执行失败: {str(e)}", exc_info=True) # 失败时计算下次重试时间(15分钟后) next_retry_time = datetime.now() + pd.Timedelta(minutes=15) # 即使任务执行失败,也要确保状态更新 try: self._update_task_status(task_id, { 'last_run_status': 'failed', 'is_running': 0, 'next_run_time': next_retry_time }) except Exception as update_err: task_log.error(f"任务失败后状态更新失败: {str(update_err)}", exc_info=True) # 若已占用并发槽位,释放之 try: self._running_semaphore.release() except Exception: pass return False def _run_task_async(self, task: Dict[str, Any]) -> None: """在后台线程中执行任务主体,并在结束后更新状态""" task_id = task['task_id'] task_name = task['task_name'] task_log = log.bind(task_id=task_id, task_name=task_name) try: # 如果 module_path 指向类,先实例化以触发初始化日志,然后执行 main self._execute_task_logic(task) # 成功后计算下次运行时间 next_run_time = self._calculate_next_run_time( cron_expr=task['cron_expression'], time_zone=task.get('time_zone', 'Asia/Shanghai') ) self._update_task_status(task_id, { 'last_run_status': 'success', 'is_running': 0, 'run_count': task['run_count'] + 1, 'next_run_time': next_run_time }) task_log.info(f"任务执行成功: {task_name}") except Exception: task_log.error("任务后台执行失败", exc_info=True) next_retry_time = datetime.now() + pd.Timedelta(minutes=15) try: self._update_task_status(task_id, { 'last_run_status': 'failed', 'is_running': 0, 'next_run_time': next_retry_time }) except Exception: task_log.error("任务失败后状态更新失败(后台)", exc_info=True) finally: # 释放并发槽位 try: self._running_semaphore.release() except Exception: pass def _execute_task_logic(self, task: Dict[str, Any]) -> None: """执行任务的具体逻辑(动态导入模块)""" start_time = time.time() task_id = task['task_id'] module_path = task['module_path'] task_log = log.bind(task_id=task_id, module=module_path) try: # 解析可调用入口(支持模块/类/函数路径) # 若路径最终为类,先实例化再调 main;否则直接调用 target_obj = None parts = module_path.split('.') if isinstance(module_path, str) else [] resolved = None try: # 尝试导入尽可能深的模块 for i in range(len(parts), 0, -1): mod = importlib.import_module('.'.join(parts[:i])) attr_chain = parts[i:] obj = mod for attr in attr_chain: obj = getattr(obj, attr) resolved = obj break except Exception: resolved = None if isinstance(resolved, type): try: target_obj = resolved() # 触发 __init__ 日志 if hasattr(target_obj, 'main') and callable(getattr(target_obj, 'main')): task_log.debug("开始执行实例的 main()") getattr(target_obj, 'main')() else: raise AttributeError(f"类 {resolved.__name__} 未提供可调用的 main()") except Exception as e: raise else: callable_entry = self._resolve_callable(module_path) task_log.debug("开始执行任务入口函数") callable_entry() task_log.info(f"任务执行完成,耗时: {time.time() - start_time:.2f}秒") except Exception as e: task_log.error("任务逻辑执行失败", exc_info=True) raise def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime: """基于Cron表达式计算下次运行时间""" try: tz = pytz.timezone(time_zone) now = datetime.now(tz) # 使用任务指定时区的当前时间 cron = croniter.croniter(cron_expr, now) next_run = cron.get_next(datetime) return next_run.replace(tzinfo=None) # 移除时区信息,适应数据库存储 except Exception as e: log.error(f"Cron表达式解析失败: {cron_expr}, 错误: {str(e)}") raise ValueError(f"无效的Cron表达式: {cron_expr}") def _update_task_status(self, task_id: int, updates: Dict[str, Any]) -> None: """更新任务状态到数据库(适配SQLAlchemy的参数传递方式)""" if not updates: log.warning(f"任务ID {task_id} 未提供任何更新字段") return # 构建UPDATE语句(确保字段名安全) valid_fields = {'is_running', 'last_run_time', 'last_run_status', 'run_count', 'next_run_time', 'updated_at'} filtered_updates = {k: v for k, v in updates.items() if k in valid_fields} if not filtered_updates: log.warning(f"任务ID {task_id} 没有有效的更新字段") return set_clause = ", ".join([f"{k}=%s" for k in filtered_updates.keys()]) sql = f"UPDATE main_task SET {set_clause}, updated_at=NOW() WHERE task_id=%s" params = list(filtered_updates.values()) + [task_id] try: # 执行更新并获取受影响的行数 affected_rows = self.db.execute_sql(sql, params=params) if affected_rows != 1: log.warning( "任务状态更新异常", task_id=task_id, 预期影响行数=1, 实际影响行数=affected_rows ) except SQLAlchemyError as e: log.error(f"任务状态更新失败(数据库错误),task_id: {task_id}", exc_info=True) raise except Exception as e: log.error(f"任务状态更新失败,task_id: {task_id}", exc_info=True) raise def add_task(self, task_name: str, task_type: str, module_path: str, cron_expression: str, time_zone: str = 'Asia/Shanghai') -> int: """添加新的Cron任务""" if not cron_expression: raise ValueError("Cron表达式不能为空") # 验证模块路径可解析(提前检查,避免添加无效任务) try: _ = self._resolve_callable(module_path) except Exception as e: raise ValueError(f"模块路径不可用: {module_path},错误: {str(e)}") # 计算首次运行时间 first_run_time = self._calculate_next_run_time(cron_expression, time_zone) # 插入数据库 sql = """ INSERT INTO main_task (task_name, task_type, module_path, cron_expression, time_zone, next_run_time, is_active, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, 1, NOW(), NOW()) \ """ params = (task_name, task_type, module_path, cron_expression, time_zone, first_run_time) try: self.db.execute_sql(sql, params=params) # 获取插入的任务ID result_df = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id") if result_df.empty or 'id' not in result_df.columns: raise ValueError("无法获取新添加任务的ID") task_id = result_df.iloc[0]['id'] log.info( "新任务添加成功", task_id=task_id, task_name=task_name, cron表达式=cron_expression, 首次运行时间=first_run_time.strftime('%Y-%m-%d %H:%M:%S') ) return task_id except SQLAlchemyError as e: log.error(f"添加任务失败(数据库错误): {task_name}", exc_info=True) raise except Exception as e: log.error(f"添加任务失败: {task_name}", exc_info=True) raise def get_pending_tasks_count(self) -> int: """获取待执行任务数量(用于优雅关闭)""" try: tz = pytz.timezone('Asia/Shanghai') now = datetime.now(tz).replace(tzinfo=None) sql = """ SELECT COUNT(*) as cnt FROM main_task WHERE is_active = 1 AND next_run_time <= %s AND is_running = 0 """ df = self.db.query_to_df(sql, params=(now,)) return df['cnt'].iloc[0] if not df.empty else 0 except Exception as e: log.error(f"查询待执行任务数量失败: {str(e)}", exc_info=True) return 0 # 出错时返回0,避免影响关闭流程 def get_pending_tasks(self) -> List[Dict[str, Any]]: """查询所有待执行任务(兼容原有逻辑)""" try: tz = pytz.timezone('Asia/Shanghai') now = datetime.now(tz).replace(tzinfo=None) sql = """ SELECT * FROM main_task WHERE is_active = 1 AND next_run_time <= %s AND is_running = 0 ORDER BY next_run_time """ tasks_df = self.db.query_to_df(sql, params=(now,)) if tasks_df.empty: log.info("当前任务列表为空,等待新任务加入...") return [] log.info(f"查询到{len(tasks_df)}个待执行任务") return tasks_df.to_dict('records') except Exception as e: log.error(f"查询待执行任务失败,将重试: {str(e)}", exc_info=True) return [] def get_and_reset_hourly_stats(self) -> Dict[str, int]: """获取并重置小时统计数据(用于每小时统计)""" with self.hourly_stats_lock: stats = self.hourly_stats.copy() # 重置统计 self.hourly_stats = {'成功': 0, '失败': 0, '总数': 0} return stats