277 lines
9.7 KiB
Python
277 lines
9.7 KiB
Python
# system_management/scheduler/task_scheduler.py
|
|
import importlib
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
from storage.mysql_agent import MySQLAgent
|
|
from pathlib import Path
|
|
|
|
# 使用您的日志系统
|
|
from utils.logger import CrossPlatformLog
|
|
log = CrossPlatformLog.get_logger("TaskScheduler")
|
|
|
|
class TaskScheduler:
|
|
def __init__(self, db_config: Optional[Dict] = None):
|
|
"""
|
|
初始化任务调度器
|
|
|
|
Args:
|
|
db_config (Optional[Dict]): 可选的数据库配置,默认使用MySQLAgent默认配置
|
|
"""
|
|
self.db = MySQLAgent(db_config or {}) # 使用您提供的MySQLAgent
|
|
self._init_task_table()
|
|
log.info("TaskScheduler initialized")
|
|
|
|
def _init_task_table(self):
|
|
"""确保任务表存在并包含必要字段"""
|
|
if not self.db.table_exists("main_task"):
|
|
log.info("Creating main_task table")
|
|
create_sql = """
|
|
CREATE TABLE main_task (
|
|
task_id INT AUTO_INCREMENT PRIMARY KEY,
|
|
task_name VARCHAR(100) NOT NULL,
|
|
module_path VARCHAR(255) NOT NULL COMMENT '例如data_collection.spiders.weibo_spider',
|
|
frequency_type ENUM('minute','hourly','daily','weekly','monthly') NOT NULL,
|
|
frequency_value INT DEFAULT NULL COMMENT '间隔数值',
|
|
last_run_time DATETIME DEFAULT NULL,
|
|
next_run_time DATETIME DEFAULT NULL,
|
|
last_run_status VARCHAR(20) DEFAULT NULL,
|
|
is_active TINYINT(1) DEFAULT 1,
|
|
is_running TINYINT(1) DEFAULT 0,
|
|
run_count INT DEFAULT 0,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
INDEX idx_next_run (next_run_time),
|
|
INDEX idx_active (is_active)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
"""
|
|
self.db.execute_sql(create_sql)
|
|
log.success("main_task table created")
|
|
|
|
def run_pending_tasks(self) -> Dict[str, int]:
|
|
"""
|
|
执行所有到期的活跃任务
|
|
|
|
Returns:
|
|
Dict[str, int]: 包含执行结果的字典 {
|
|
'total': 总任务数,
|
|
'success': 成功数,
|
|
'failed': 失败数
|
|
}
|
|
"""
|
|
result = {'total': 0, 'success': 0, 'failed': 0}
|
|
|
|
try:
|
|
# 使用您提供的query_to_df方法获取任务
|
|
tasks_df = self.db.query_to_df(
|
|
"SELECT * FROM main_task "
|
|
"WHERE is_active = 1 AND next_run_time <= %s "
|
|
"ORDER BY next_run_time",
|
|
params=(datetime.now(),)
|
|
)
|
|
|
|
result['total'] = len(tasks_df)
|
|
|
|
if tasks_df.empty:
|
|
log.debug("No pending tasks found")
|
|
return result
|
|
|
|
for _, task in tasks_df.iterrows():
|
|
task_id = task['task_id']
|
|
log.bind(task_id=task_id).info(
|
|
f"Starting task {task['task_name']}"
|
|
)
|
|
|
|
# 标记任务为执行中
|
|
self._update_task_status(
|
|
task_id,
|
|
{'is_running': 1, 'last_run_time': datetime.now()}
|
|
)
|
|
|
|
try:
|
|
self._execute_single_task(task)
|
|
self._update_task_status(
|
|
task_id,
|
|
{
|
|
'last_run_status': 'success',
|
|
'is_running': 0,
|
|
'run_count': task['run_count'] + 1,
|
|
'next_run_time': self._calculate_next_run(
|
|
task['frequency_type'],
|
|
task['frequency_value']
|
|
)
|
|
}
|
|
)
|
|
result['success'] += 1
|
|
log.bind(task_id=task_id).success("Task completed")
|
|
except Exception as e:
|
|
log.bind(task_id=task_id).error(
|
|
f"Task failed: {str(e)}",
|
|
exc_info=True
|
|
)
|
|
self._update_task_status(
|
|
task_id,
|
|
{
|
|
'last_run_status': 'failed',
|
|
'is_running': 0,
|
|
'next_run_time': self._calculate_next_run(
|
|
task['frequency_type'],
|
|
task['frequency_value'],
|
|
retry=True
|
|
)
|
|
}
|
|
)
|
|
result['failed'] += 1
|
|
|
|
log.info(
|
|
"Scheduler cycle completed",
|
|
total_tasks=result['total'],
|
|
success=result['success'],
|
|
failed=result['failed']
|
|
)
|
|
return result
|
|
|
|
except Exception as e:
|
|
log.critical(
|
|
"Scheduler main loop failed",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def _execute_single_task(self, task: Dict) -> None:
|
|
"""执行单个任务模块"""
|
|
start_time = time.time()
|
|
task_log = log.bind(
|
|
task_id=task['task_id'],
|
|
module=task['module_path']
|
|
)
|
|
|
|
try:
|
|
module = importlib.import_module(task['module_path'])
|
|
|
|
if not hasattr(module, 'main'):
|
|
raise ImportError(f"Module has no main() function")
|
|
|
|
# 执行任务
|
|
task_log.debug("Task execution started")
|
|
module.main()
|
|
|
|
elapsed = time.time() - start_time
|
|
task_log.info(
|
|
f"Task completed in {elapsed:.2f}s",
|
|
duration=elapsed
|
|
)
|
|
|
|
except Exception as e:
|
|
task_log.error(
|
|
"Task execution failed",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def _update_task_status(self, task_id: int, updates: Dict) -> None:
|
|
"""更新任务状态"""
|
|
set_clause = ", ".join([f"{k}=%s" for k in updates.keys()])
|
|
sql = f"UPDATE main_task SET {set_clause} WHERE task_id=%s"
|
|
|
|
params = list(updates.values()) + [task_id]
|
|
|
|
try:
|
|
affected = self.db.execute_sql(sql, params=params)
|
|
if affected != 1:
|
|
log.warning(
|
|
"Unexpected row count in update",
|
|
task_id=task_id,
|
|
expected=1,
|
|
affected=affected
|
|
)
|
|
except Exception as e:
|
|
log.error(
|
|
"Failed to update task status",
|
|
task_id=task_id,
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def _calculate_next_run(self, freq_type: str, freq_value: Optional[int] = None,
|
|
retry: bool = False) -> datetime:
|
|
"""
|
|
计算下次执行时间(带重试逻辑)
|
|
"""
|
|
base_time = datetime.now()
|
|
|
|
if retry:
|
|
# 失败后15分钟重试
|
|
log.debug("Calculating retry time")
|
|
return base_time + timedelta(minutes=15)
|
|
|
|
if freq_type == 'minute':
|
|
delta = timedelta(minutes=freq_value or 1)
|
|
elif freq_type == 'hourly':
|
|
delta = timedelta(hours=freq_value or 1)
|
|
elif freq_type == 'daily':
|
|
delta = timedelta(days=freq_value or 1)
|
|
elif freq_type == 'weekly':
|
|
delta = timedelta(weeks=freq_value or 1)
|
|
elif freq_type == 'monthly':
|
|
# 处理月末日期特殊情况
|
|
next_month = (base_time.replace(day=1) + timedelta(days=32)).replace(day=1)
|
|
last_day = (next_month - timedelta(days=1)).day
|
|
day = min(base_time.day, last_day)
|
|
return base_time.replace(day=1, month=next_month.month, day=day)
|
|
else:
|
|
raise ValueError(f"Unknown frequency type: {freq_type}")
|
|
|
|
return base_time + delta
|
|
|
|
def add_task(self, task_name: str, module_path: str, frequency_type: str,
|
|
frequency_value: Optional[int] = None) -> int:
|
|
"""
|
|
添加新任务到调度系统
|
|
"""
|
|
sql = """
|
|
INSERT INTO main_task
|
|
(task_name, module_path, frequency_type, frequency_value, next_run_time)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
"""
|
|
|
|
next_run = self._calculate_next_run(frequency_type, frequency_value)
|
|
params = (task_name, module_path, frequency_type, frequency_value, next_run)
|
|
|
|
try:
|
|
self.db.execute_sql(sql, params=params)
|
|
task_id = self.db.query_to_df("SELECT LAST_INSERT_ID() AS id").iloc[0]['id']
|
|
log.info(
|
|
"New task added",
|
|
task_id=task_id,
|
|
task_name=task_name,
|
|
next_run=next_run
|
|
)
|
|
return task_id
|
|
except Exception as e:
|
|
log.error(
|
|
"Failed to add new task",
|
|
task_name=task_name,
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def get_task_status(self, active_only: bool = True) -> pd.DataFrame:
|
|
"""
|
|
获取任务状态
|
|
"""
|
|
where = "WHERE is_active = 1" if active_only else ""
|
|
log.debug("Fetching task status", active_only=active_only)
|
|
return self.db.query_to_df(
|
|
f"""
|
|
SELECT
|
|
task_id, task_name, module_path,
|
|
frequency_type, frequency_value,
|
|
last_run_time, next_run_time,
|
|
last_run_status, run_count,
|
|
is_active, is_running
|
|
FROM main_task
|
|
{where}
|
|
ORDER BY next_run_time
|
|
"""
|
|
) |