调度系统开发

This commit is contained in:
z66
2025-09-09 16:29:20 +08:00
parent 65cbb712f3
commit cf78104a5b
8 changed files with 188 additions and 297 deletions
+147 -214
View File
@@ -1,277 +1,210 @@
# system_management/scheduler/task_scheduler.py
import importlib
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from datetime import datetime
from typing import Dict, List, Optional, Any
import croniter
import pytz
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
from storage.mysql_agent import MySQLAgent
from pathlib import Path
# 使用您的日志系统
from utils.logger import CrossPlatformLog
# 初始化调度器日志
log = CrossPlatformLog.get_logger("TaskScheduler")
class TaskScheduler:
def __init__(self, db_config: Optional[Dict] = None):
"""
初始化任务调度器
def __init__(self, db_config: Optional[Dict] = None, max_workers: int = 5):
"""初始化任务调度器(基于Cron表达式)"""
self.db = MySQLAgent(db_config or {})
self.executor = ThreadPoolExecutor(max_workers=max_workers)
log.info(f"任务调度器已初始化,最大工作线程数: {max_workers}")
Args:
db_config (Optional[Dict]): 可选的数据库配置,默认使用MySQLAgent默认配置
"""
self.db = MySQLAgent(db_config or {}) # 使用您提供的MySQLAgent
self._init_task_table()
log.info("TaskScheduler initialized")
def _init_task_table(self):
"""确保任务表存在并包含必要字段"""
if not self.db.table_exists("main_task"):
log.info("Creating main_task table")
create_sql = """
CREATE TABLE main_task (
task_id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(100) NOT NULL,
module_path VARCHAR(255) NOT NULL COMMENT '例如data_collection.spiders.weibo_spider',
frequency_type ENUM('minute','hourly','daily','weekly','monthly') NOT NULL,
frequency_value INT DEFAULT NULL COMMENT '间隔数值',
last_run_time DATETIME DEFAULT NULL,
next_run_time DATETIME DEFAULT NULL,
last_run_status VARCHAR(20) DEFAULT NULL,
is_active TINYINT(1) DEFAULT 1,
is_running TINYINT(1) DEFAULT 0,
run_count INT DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_next_run (next_run_time),
INDEX idx_active (is_active)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
self.db.execute_sql(create_sql)
log.success("main_task table created")
def run_pending_tasks(self) -> Dict[str, int]:
"""
执行所有到期的活跃任务
Returns:
Dict[str, int]: 包含执行结果的字典 {
'total': 总任务数,
'success': 成功数,
'failed': 失败数
}
"""
result = {'total': 0, 'success': 0, 'failed': 0}
def check_and_run_tasks(self) -> Dict[str, int]:
"""检查并执行所有到期的任务"""
result = {'总任务数': 0, '成功': 0, '失败': 0}
try:
# 使用您提供的query_to_df方法获取任务
tasks_df = self.db.query_to_df(
"SELECT * FROM main_task "
"WHERE is_active = 1 AND next_run_time <= %s "
"ORDER BY next_run_time",
params=(datetime.now(),)
)
# 获取当前时间(带时区)
now = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
result['total'] = len(tasks_df)
# 查询所有到期的活跃任务
tasks_df = self.db.query_to_df("""
SELECT * FROM main_task
WHERE is_active = 1
AND next_run_time <= %s
AND is_running = 0
ORDER BY next_run_time
""", params=(now,))
result['总任务数'] = len(tasks_df)
if tasks_df.empty:
log.debug("No pending tasks found")
log.debug("没有到期的任务需要执行")
return result
# 并发执行任务
futures = []
for _, task in tasks_df.iterrows():
task_id = task['task_id']
log.bind(task_id=task_id).info(
f"Starting task {task['task_name']}"
)
# 标记任务为执行中
self._update_task_status(
task_id,
{'is_running': 1, 'last_run_time': datetime.now()}
)
futures.append(self.executor.submit(self._process_single_task, task))
# 收集执行结果
for future in as_completed(futures):
try:
self._execute_single_task(task)
self._update_task_status(
task_id,
{
'last_run_status': 'success',
'is_running': 0,
'run_count': task['run_count'] + 1,
'next_run_time': self._calculate_next_run(
task['frequency_type'],
task['frequency_value']
)
}
)
result['success'] += 1
log.bind(task_id=task_id).success("Task completed")
if future.result():
result['成功'] += 1
else:
result['失败'] += 1
except Exception as e:
log.bind(task_id=task_id).error(
f"Task failed: {str(e)}",
exc_info=True
)
self._update_task_status(
task_id,
{
'last_run_status': 'failed',
'is_running': 0,
'next_run_time': self._calculate_next_run(
task['frequency_type'],
task['frequency_value'],
retry=True
)
}
)
result['failed'] += 1
log.error(f"任务线程执行失败: {str(e)}")
result['失败'] += 1
log.info(
"Scheduler cycle completed",
total_tasks=result['total'],
success=result['success'],
failed=result['failed']
"任务调度周期完成",
总任务数=result['总任务数'],
成功=result['成功'],
失败=result['失败']
)
return result
except Exception as e:
log.critical(
"Scheduler main loop failed",
exc_info=True
)
log.critical("调度器主循环执行失败", exc_info=True)
raise
def _execute_single_task(self, task: Dict) -> None:
"""执行单个任务模块"""
start_time = time.time()
task_log = log.bind(
task_id=task['task_id'],
module=task['module_path']
)
def _process_single_task(self, task: Dict[str, Any]) -> bool:
"""处理单个任务(线程安全)"""
task_id = task['task_id']
task_log = log.bind(task_id=task_id, task_name=task['task_name'])
task_log.info(f"开始执行任务: {task['task_name']}")
try:
module = importlib.import_module(task['module_path'])
# 标记任务为运行中
self._update_task_status(task_id, {
'is_running': 1,
'last_run_time': datetime.now()
})
if not hasattr(module, 'main'):
raise ImportError(f"Module has no main() function")
# 执行任务逻辑
self._execute_task_logic(task)
# 执行任务
task_log.debug("Task execution started")
module.main()
elapsed = time.time() - start_time
task_log.info(
f"Task completed in {elapsed:.2f}s",
duration=elapsed
# 计算下次运行时间(基于Cron表达式)
next_run_time = self._calculate_next_run_time(
cron_expr=task['cron_expression'],
time_zone=task.get('time_zone', 'Asia/Shanghai')
)
# 更新任务状态为成功
self._update_task_status(task_id, {
'last_run_status': 'success',
'is_running': 0,
'run_count': task['run_count'] + 1,
'next_run_time': next_run_time
})
task_log.info(f"任务执行成功: {task['task_name']}")
return True
except Exception as e:
task_log.error(
"Task execution failed",
exc_info=True
)
task_log.error(f"任务执行失败: {str(e)}", exc_info=True)
# 失败时计算下次重试时间(15分钟后)
next_retry_time = datetime.now() + pd.Timedelta(minutes=15)
self._update_task_status(task_id, {
'last_run_status': 'failed',
'is_running': 0,
'next_run_time': next_retry_time
})
return False
def _execute_task_logic(self, task: Dict[str, Any]) -> None:
"""执行任务的具体逻辑(动态导入模块)"""
start_time = time.time()
task_log = log.bind(task_id=task['task_id'], module=task['module_path'])
try:
# 动态导入任务模块
module = importlib.import_module(task['module_path'])
if not hasattr(module, 'main'):
raise ImportError(f"模块 {task['module_path']} 中未找到 main() 函数")
task_log.debug("开始执行模块中的 main() 函数")
module.main() # 调用任务主函数
task_log.info(f"任务执行完成,耗时: {time.time() - start_time:.2f}")
except Exception as e:
task_log.error("任务逻辑执行失败", exc_info=True)
raise
def _update_task_status(self, task_id: int, updates: Dict) -> None:
"""更新任务状态"""
set_clause = ", ".join([f"{k}=%s" for k in updates.keys()])
sql = f"UPDATE main_task SET {set_clause} WHERE task_id=%s"
def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime:
"""基于Cron表达式计算下次运行时间"""
try:
tz = pytz.timezone(time_zone)
now = datetime.now(tz)
cron = croniter.croniter(cron_expr, now)
next_run = cron.get_next(datetime)
return next_run.replace(tzinfo=None) # 移除时区信息,适应数据库存储
except Exception as e:
log.error(f"Cron表达式解析失败: {cron_expr}, 错误: {str(e)}")
raise ValueError(f"无效的Cron表达式: {cron_expr}")
def _update_task_status(self, task_id: int, updates: Dict[str, Any]) -> None:
"""更新任务状态到数据库"""
set_clause = ", ".join([f"{k}=%s" for k in updates.keys()])
sql = f"UPDATE main_task SET {set_clause}, updated_at=NOW() WHERE task_id=%s"
params = list(updates.values()) + [task_id]
try:
affected = self.db.execute_sql(sql, params=params)
if affected != 1:
affected_rows = self.db.execute_sql(sql, params=params)
if affected_rows != 1:
log.warning(
"Unexpected row count in update",
"任务状态更新异常",
task_id=task_id,
expected=1,
affected=affected
预期影响行数=1,
实际影响行数=affected_rows
)
except Exception as e:
log.error(
"Failed to update task status",
task_id=task_id,
exc_info=True
)
log.error(f"任务状态更新失败,task_id: {task_id}", exc_info=True)
raise
def _calculate_next_run(self, freq_type: str, freq_value: Optional[int] = None,
retry: bool = False) -> datetime:
"""
计算下次执行时间(带重试逻辑)
"""
base_time = datetime.now()
def add_task(self,
task_name: str,
task_type: str,
module_path: str,
cron_expression: str,
time_zone: str = 'Asia/Shanghai') -> int:
"""添加新的Cron任务"""
if not cron_expression:
raise ValueError("Cron表达式不能为空")
if retry:
# 失败后15分钟重试
log.debug("Calculating retry time")
return base_time + timedelta(minutes=15)
# 计算首次运行时间
first_run_time = self._calculate_next_run_time(cron_expression, time_zone)
if freq_type == 'minute':
delta = timedelta(minutes=freq_value or 1)
elif freq_type == 'hourly':
delta = timedelta(hours=freq_value or 1)
elif freq_type == 'daily':
delta = timedelta(days=freq_value or 1)
elif freq_type == 'weekly':
delta = timedelta(weeks=freq_value or 1)
elif freq_type == 'monthly':
# 处理月末日期特殊情况
next_month = (base_time.replace(day=1) + timedelta(days=32)).replace(day=1)
last_day = (next_month - timedelta(days=1)).day
day = min(base_time.day, last_day)
return base_time.replace(day=1, month=next_month.month, day=day)
else:
raise ValueError(f"Unknown frequency type: {freq_type}")
return base_time + delta
def add_task(self, task_name: str, module_path: str, frequency_type: str,
frequency_value: Optional[int] = None) -> int:
"""
添加新任务到调度系统
"""
# 插入数据库
sql = """
INSERT INTO main_task
(task_name, module_path, frequency_type, frequency_value, next_run_time)
VALUES (%s, %s, %s, %s, %s)
(task_name, task_type, module_path, cron_expression, time_zone,
next_run_time, is_active)
VALUES (%s, %s, %s, %s, %s, %s, 1)
"""
next_run = self._calculate_next_run(frequency_type, frequency_value)
params = (task_name, module_path, frequency_type, frequency_value, next_run)
params = (task_name, task_type, module_path, cron_expression, time_zone, first_run_time)
try:
self.db.execute_sql(sql, params=params)
task_id = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id").iloc[0]['id']
log.info(
"New task added",
f"新任务添加成功",
task_id=task_id,
task_name=task_name,
next_run=next_run
cron表达式=cron_expression,
首次运行时间=first_run_time
)
return task_id
except Exception as e:
log.error(
"Failed to add new task",
task_name=task_name,
exc_info=True
)
log.error(f"添加任务失败: {task_name}", exc_info=True)
raise
def get_task_status(self, active_only: bool = True) -> pd.DataFrame:
"""
获取任务状态
"""
where = "WHERE is_active = 1" if active_only else ""
log.debug("Fetching task status", active_only=active_only)
return self.db.query_to_df(
f"""
SELECT
task_id, task_name, module_path,
frequency_type, frequency_value,
last_run_time, next_run_time,
last_run_status, run_count,
is_active, is_running
FROM main_task
{where}
ORDER BY next_run_time
"""
)
def get_pending_tasks_count(self) -> int:
"""获取当前等待执行的任务数量"""
result = self.db.query_to_df("""
SELECT COUNT(*) AS count FROM main_task
WHERE is_active = 1 AND next_run_time <= %s
""", params=(datetime.now(),))
return result.iloc[0]['count'] if not result.empty else 0