Files
intelligence_system/system_management/scheduler/task_scheduler.py
T
2025-10-30 17:24:28 +08:00

483 lines
20 KiB
Python

import importlib
import threading
from datetime import datetime
from typing import Dict, List, Optional, Any
import croniter
import pytz
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
from sqlalchemy.exc import SQLAlchemyError
from utils.mysql_agent import MySQLAgent
from utils.logger import CrossPlatformLog
# 初始化调度器日志
log = CrossPlatformLog.get_logger("TaskScheduler")
class TaskScheduler:
def __init__(self, db_config: Optional[Dict] = None, max_workers: int = 5):
"""初始化任务调度器(基于Cron表达式)"""
self.db = MySQLAgent(db_config or {})
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# 并发容量控制:限制同时运行的后台任务不超过 max_workers
self._running_semaphore = threading.Semaphore(max_workers)
# 任务统计
self.hourly_stats = {'成功': 0, '失败': 0, '总数': 0}
self.hourly_stats_lock = threading.Lock()
log.info(f"任务调度器已初始化,最大工作线程数: {max_workers}")
def _resolve_callable(self, module_path: str):
"""解析模块路径,支持模块、模块内类/函数,并返回可调用对象
兼容以下形式:
- package.module -> 期望模块内存在 main()
- package.module.ClassName -> 调用 ClassName.main() 或实例化后调用 main()
- package.module.func_name -> 直接调用该函数
- package.module.ClassName.method_name -> 调用指定方法
"""
if not module_path or not isinstance(module_path, str):
raise ImportError("无效的模块路径")
parts = module_path.split('.')
last_import_error = None
# 从最长前缀开始尝试导入模块,逐步回退
for i in range(len(parts), 0, -1):
module_name = '.'.join(parts[:i])
try:
module = importlib.import_module(module_name)
attr_chain = parts[i:]
# 从模块开始逐级解析属性
target = module
for attr in attr_chain:
if not hasattr(target, attr):
raise AttributeError(f"{target} 中未找到属性: {attr}")
target = getattr(target, attr)
# 若目标是类,优先尝试类方法/实例方法 main
if isinstance(target, type):
# 类方法 main
if hasattr(target, 'main') and callable(getattr(target, 'main')):
return getattr(target, 'main')
# 实例方法 main
try:
instance = target()
if hasattr(instance, 'main') and callable(getattr(instance, 'main')):
return getattr(instance, 'main')
except Exception:
pass
# 不把“类本身”当作任务入口(否则只会构造实例不执行 main)
raise AttributeError(f"{target.__name__} 缺少可调用的 main() 作为任务入口")
# 目标非类:若本身可调用,则直接作为入口返回
if callable(target):
return target
# 否则尝试对象上的 main()
if hasattr(target, 'main') and callable(getattr(target, 'main')):
return getattr(target, 'main')
raise AttributeError(f"路径 {module_path} 未解析到可调用入口(缺少 main 或不可调用)")
except Exception as e:
last_import_error = e
continue
# 如果所有尝试均失败,则抛出最后的错误
raise ImportError(f"模块 {module_path} 导入/解析失败: {str(last_import_error)}")
def check_and_run_tasks(self, print_empty_status: bool = False) -> Dict[str, int]:
"""检查并执行所有到期的任务,优化空任务处理和异常容错
Args:
print_empty_status: 是否打印空任务状态(默认False,避免频繁输出)
"""
result = {'总任务数': 0, '成功': 0, '失败': 0}
try:
# 获取当前时间(带时区转换为本地时间)
tz = pytz.timezone('Asia/Shanghai')
now = datetime.now(tz).replace(tzinfo=None) # 移除时区信息,与数据库存储一致
log.debug(f"当前检查时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
# 查询所有到期的活跃任务(使用参数化查询防止注入)
tasks_df = self.db.query_to_df("""
SELECT *
FROM main_task
WHERE is_active = 1
AND next_run_time <= %s
AND is_running = 0
ORDER BY next_run_time
""", params=(now,),is_print=False)
result['总任务数'] = len(tasks_df)
if tasks_df.empty:
# 空任务时根据参数决定是否输出
if print_empty_status:
print(f"当前没有到期的任务,等待新任务加入...{now.strftime('%Y-%m-%d %H:%M:%S')}")
return result
# 并发执行任务
futures = []
for _, task in tasks_df.iterrows():
# 传递任务字典的副本避免线程安全问题
task_copy = task.to_dict()
futures.append(self.executor.submit(self._process_single_task, task_copy))
# 收集执行结果
for future in as_completed(futures):
try:
if future.result():
result['成功'] += 1
else:
result['失败'] += 1
except Exception as e:
log.error(f"任务线程执行失败: {str(e)}", exc_info=True)
result['失败'] += 1
# 更新小时统计
with self.hourly_stats_lock:
self.hourly_stats['成功'] += result['成功']
self.hourly_stats['失败'] += result['失败']
self.hourly_stats['总数'] += result['总任务数']
log.info(
"任务调度周期完成",
总任务数=result['总任务数'],
成功=result['成功'],
失败=result['失败']
)
return result
except SQLAlchemyError as e: # 数据库异常处理优化
log.error(f"数据库操作失败,将在下次轮询重试: {str(e)}", exc_info=True)
return result # 不中断,返回当前结果
except Exception as e:
log.error("调度器周期执行异常,将在下次轮询重试", exc_info=True)
return result # 不中断主循环,允许下次重试
def _process_single_task(self, task: Dict[str, Any]) -> bool:
"""处理单个任务(线程安全)"""
task_id = task['task_id']
task_name = task['task_name']
task_log = log.bind(task_id=task_id, task_name=task_name)
task_log.info(f"开始执行任务: {task_name}")
try:
# 阻塞等待可用的执行槽位,保证同时运行的任务不超过最大工作线程数
self._running_semaphore.acquire()
# 标记任务为运行中(使用当前时间的时区感知对象)
tz = pytz.timezone(task.get('time_zone', 'Asia/Shanghai'))
current_time = datetime.now(tz).replace(tzinfo=None)
self._update_task_status(task_id, {
'is_running': 1,
'last_run_time': current_time
})
# 将任务主体放到后台线程执行,当前线程快速返回
self.executor.submit(self._run_task_async, task.copy())
task_log.debug("任务已提交至后台执行队列")
return True # 表示已成功提交
except Exception as e:
task_log.error(f"任务执行失败: {str(e)}", exc_info=True)
# 失败时计算下次重试时间(15分钟后)
next_retry_time = datetime.now() + pd.Timedelta(minutes=15)
# 即使任务执行失败,也要确保状态更新
try:
self._update_task_status(task_id, {
'last_run_status': 'failed',
'is_running': 0,
'next_run_time': next_retry_time
})
except Exception as update_err:
task_log.error(f"任务失败后状态更新失败: {str(update_err)}", exc_info=True)
# 若已占用并发槽位,释放之
try:
self._running_semaphore.release()
except Exception:
pass
return False
def _run_task_async(self, task: Dict[str, Any]) -> None:
"""在后台线程中执行任务主体,并在结束后更新状态"""
task_id = task['task_id']
task_name = task['task_name']
task_log = log.bind(task_id=task_id, task_name=task_name)
try:
# 如果 module_path 指向类,先实例化以触发初始化日志,然后执行 main
self._execute_task_logic(task)
# 成功后计算下次运行时间
next_run_time = self._calculate_next_run_time(
cron_expr=task['cron_expression'],
time_zone=task.get('time_zone', 'Asia/Shanghai')
)
self._update_task_status(task_id, {
'last_run_status': 'success',
'is_running': 0,
'run_count': task['run_count'] + 1,
'next_run_time': next_run_time
})
task_log.info(f"任务执行成功: {task_name}")
except Exception:
task_log.error("任务后台执行失败", exc_info=True)
next_retry_time = datetime.now() + pd.Timedelta(minutes=15)
try:
self._update_task_status(task_id, {
'last_run_status': 'failed',
'is_running': 0,
'next_run_time': next_retry_time
})
except Exception:
task_log.error("任务失败后状态更新失败(后台)", exc_info=True)
finally:
# 释放并发槽位
try:
self._running_semaphore.release()
except Exception:
pass
def _execute_task_logic(self, task):
"""
执行任务逻辑的核心方法
支持类方法、静态方法和实例方法的调用
"""
module_path = task.get('module_path')
if not module_path:
raise ValueError("任务缺少 module_path 配置")
# 解析模块路径和类名
try:
path_parts = module_path.split('.')
if len(path_parts) < 2:
raise ValueError(f"无效的模块路径: {module_path}")
module_name = '.'.join(path_parts[:-1])
class_name = path_parts[-1]
method_name = 'main' # 默认方法名
except Exception as e:
raise ValueError(f"解析模块路径失败: {str(e)}")
# 动态导入模块
try:
import importlib
module = importlib.import_module(module_name)
except ImportError as e:
raise ImportError(f"无法导入模块 {module_name}: {str(e)}")
# 获取类和方法
if not hasattr(module, class_name):
raise AttributeError(f"模块 {module_name} 中未找到类 {class_name}")
cls = getattr(module, class_name)
# 检查是否存在指定方法
if not hasattr(cls, method_name):
raise AttributeError(f"{class_name} 中未找到方法 {method_name}")
method = getattr(cls, method_name)
# 根据方法类型决定如何调用
import inspect
callable_entry = None
# 判断是否为静态方法或类方法
if isinstance(method, staticmethod):
# 静态方法可以直接调用
callable_entry = method
elif isinstance(method, classmethod):
# 类方法需要传入类作为第一个参数
callable_entry = method
else:
# 实例方法或普通函数
try:
# 尝试检查方法签名
sig = inspect.signature(method)
params = list(sig.parameters.values())
# 如果第一个参数是self且没有默认值,则认为是实例方法
if params and params[0].name == 'self' and params[0].default == inspect.Parameter.empty:
# 创建实例并获取绑定方法
instance = cls()
callable_entry = getattr(instance, method_name)
else:
# 可能是普通函数或者是带有默认self参数的方法
callable_entry = method
except Exception:
# 如果检查签名失败,默认尝试创建实例
try:
instance = cls()
callable_entry = getattr(instance, method_name)
except Exception:
# 如果创建实例也失败,则直接调用方法(适用于不需要self的特殊情况)
callable_entry = method
# 执行任务
if not callable(callable_entry):
raise TypeError(f"{module_path}.{method_name} 不是可调用对象")
try:
# 执行任务逻辑
callable_entry()
except Exception as e:
self.logger.error(f"任务逻辑执行失败: {str(e)}")
raise
def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime:
"""基于Cron表达式计算下次运行时间"""
try:
tz = pytz.timezone(time_zone)
now = datetime.now(tz) # 使用任务指定时区的当前时间
cron = croniter.croniter(cron_expr, now)
next_run = cron.get_next(datetime)
return next_run.replace(tzinfo=None) # 移除时区信息,适应数据库存储
except Exception as e:
log.error(f"Cron表达式解析失败: {cron_expr}, 错误: {str(e)}")
raise ValueError(f"无效的Cron表达式: {cron_expr}")
def _update_task_status(self, task_id: int, updates: Dict[str, Any]) -> None:
"""更新任务状态到数据库(适配SQLAlchemy的参数传递方式)"""
if not updates:
log.warning(f"任务ID {task_id} 未提供任何更新字段")
return
# 构建UPDATE语句(确保字段名安全)
valid_fields = {'is_running', 'last_run_time', 'last_run_status',
'run_count', 'next_run_time', 'updated_at'}
filtered_updates = {k: v for k, v in updates.items() if k in valid_fields}
if not filtered_updates:
log.warning(f"任务ID {task_id} 没有有效的更新字段")
return
set_clause = ", ".join([f"{k}=%s" for k in filtered_updates.keys()])
sql = f"UPDATE main_task SET {set_clause}, updated_at=NOW() WHERE task_id=%s"
params = list(filtered_updates.values()) + [task_id]
try:
# 执行更新并获取受影响的行数
affected_rows = self.db.execute_sql(sql, params=params)
if affected_rows != 1:
log.warning(
"任务状态更新异常",
task_id=task_id,
预期影响行数=1,
实际影响行数=affected_rows
)
except SQLAlchemyError as e:
log.error(f"任务状态更新失败(数据库错误),task_id: {task_id}", exc_info=True)
raise
except Exception as e:
log.error(f"任务状态更新失败,task_id: {task_id}", exc_info=True)
raise
def add_task(self,
task_name: str,
task_type: str,
module_path: str,
cron_expression: str,
time_zone: str = 'Asia/Shanghai') -> int:
"""添加新的Cron任务"""
if not cron_expression:
raise ValueError("Cron表达式不能为空")
# 验证模块路径可解析(提前检查,避免添加无效任务)
try:
_ = self._resolve_callable(module_path)
except Exception as e:
raise ValueError(f"模块路径不可用: {module_path},错误: {str(e)}")
# 计算首次运行时间
first_run_time = self._calculate_next_run_time(cron_expression, time_zone)
# 插入数据库
sql = """
INSERT INTO main_task
(task_name, task_type, module_path, cron_expression, time_zone,
next_run_time, is_active, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, 1, NOW(), NOW()) \
"""
params = (task_name, task_type, module_path, cron_expression, time_zone, first_run_time)
try:
self.db.execute_sql(sql, params=params)
# 获取插入的任务ID
result_df = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id")
if result_df.empty or 'id' not in result_df.columns:
raise ValueError("无法获取新添加任务的ID")
task_id = result_df.iloc[0]['id']
log.info(
"新任务添加成功",
task_id=task_id,
task_name=task_name,
cron表达式=cron_expression,
首次运行时间=first_run_time.strftime('%Y-%m-%d %H:%M:%S')
)
return task_id
except SQLAlchemyError as e:
log.error(f"添加任务失败(数据库错误): {task_name}", exc_info=True)
raise
except Exception as e:
log.error(f"添加任务失败: {task_name}", exc_info=True)
raise
def get_pending_tasks_count(self) -> int:
"""获取待执行任务数量(用于优雅关闭)"""
try:
tz = pytz.timezone('Asia/Shanghai')
now = datetime.now(tz).replace(tzinfo=None)
sql = """
SELECT COUNT(*) as cnt
FROM main_task
WHERE is_active = 1
AND next_run_time <= %s
AND is_running = 0
"""
df = self.db.query_to_df(sql, params=(now,))
return df['cnt'].iloc[0] if not df.empty else 0
except Exception as e:
log.error(f"查询待执行任务数量失败: {str(e)}", exc_info=True)
return 0 # 出错时返回0,避免影响关闭流程
def get_pending_tasks(self) -> List[Dict[str, Any]]:
"""查询所有待执行任务(兼容原有逻辑)"""
try:
tz = pytz.timezone('Asia/Shanghai')
now = datetime.now(tz).replace(tzinfo=None)
sql = """
SELECT *
FROM main_task
WHERE is_active = 1
AND next_run_time <= %s
AND is_running = 0
ORDER BY next_run_time
"""
tasks_df = self.db.query_to_df(sql, params=(now,))
if tasks_df.empty:
log.info("当前任务列表为空,等待新任务加入...")
return []
log.info(f"查询到{len(tasks_df)}个待执行任务")
return tasks_df.to_dict('records')
except Exception as e:
log.error(f"查询待执行任务失败,将重试: {str(e)}", exc_info=True)
return []
def get_and_reset_hourly_stats(self) -> Dict[str, int]:
"""获取并重置小时统计数据(用于每小时统计)"""
with self.hourly_stats_lock:
stats = self.hourly_stats.copy()
# 重置统计
self.hourly_stats = {'成功': 0, '失败': 0, '总数': 0}
return stats