minio对象存储数据库链接

This commit is contained in:
z66
2025-09-16 14:41:24 +08:00
parent 2074d5b9ed
commit 8e92acf5d5
5 changed files with 455 additions and 81 deletions
+18 -1
View File
@@ -1 +1,18 @@
DROP DATABASE test
CREATE TABLE IF NOT EXISTS main_task (
task_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '任务唯一ID',
task_name VARCHAR(100) NOT NULL COMMENT '任务名称',
task_type VARCHAR(50) NOT NULL COMMENT '任务类型(如processor、collector等)',
module_path VARCHAR(255) NOT NULL COMMENT '任务模块路径(如processors.data_checker',
cron_expression VARCHAR(100) NOT NULL COMMENT 'Cron表达式(调度频率)',
time_zone VARCHAR(50) DEFAULT 'Asia/Shanghai' COMMENT '时区', -- 补充此字段
next_run_time DATETIME NOT NULL COMMENT '下次运行时间',
last_run_time DATETIME NULL COMMENT '上次运行时间',
last_run_status ENUM('success', 'failed', 'pending') DEFAULT 'pending' COMMENT '上次运行状态',
run_count INT DEFAULT 0 COMMENT '运行次数统计',
is_active TINYINT(1) DEFAULT 1 COMMENT '是否活跃(1=启用,0=禁用)',
is_running TINYINT(1) DEFAULT 0 COMMENT '是否正在运行(1=运行中,0=未运行)',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_next_run (next_run_time) COMMENT '优化下次运行时间查询', -- 建议保留索引提升性能
INDEX idx_active (is_active) COMMENT '优化活跃任务查询' -- 建议保留索引提升性能
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务调度主表';
+325 -28
View File
@@ -8,38 +8,335 @@
},
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"collapsed": true
"collapsed": true,
"ExecuteTime": {
"end_time": "2025-09-12T06:19:31.379133Z",
"start_time": "2025-09-12T06:19:30.001155Z"
}
},
"outputs": [],
"source": [
"# 初始化任务管理所需组件\n",
"import pandas as pd\n",
"from IPython.display import display, HTML, Markdown\n",
"from datetime import datetime\n",
"\n",
"# 导入系统组件\n",
"from system_management.scheduler.task_scheduler import TaskScheduler\n",
"from system_management.scheduler.task_manager import TaskManager\n",
"from config.config import ConfigManager\n",
"\n",
"# 初始化配置和管理器\n",
"config = ConfigManager()\n",
"scheduler = TaskScheduler(config.get(\"database\"))\n",
"manager = TaskManager(scheduler)\n",
"\n",
"# 设置显示样式\n",
"pd.set_option('display.max_colwidth', 100)\n",
"pd.set_option('display.width', 1000)\n",
"\n",
"def format_datetime(dt):\n",
" \"\"\"格式化日期时间显示\"\"\"\n",
" if pd.isna(dt) or dt is None:\n",
" return \"从未执行\"\n",
" return pd.to_datetime(dt).strftime('%Y-%m-%d %H:%M:%S')"
]
"{\n",
" \"cells\": [\n",
" {\n",
" \"cell_type\": \"markdown\",\n",
" \"metadata\": {},\n",
" \"source\": [\n",
" \"# TaskManager 任务管理类\\n\",\n",
" \"任务管理核心组件,负责任务的CRUD、状态切换、手动执行等操作\"\n",
" ]\n",
" },\n",
" {\n",
" \"cell_type\": \"code\",\n",
" \"execution_count\": null,\n",
" \"metadata\": {},\n",
" \"outputs\": [],\n",
" \"source\": [\n",
" \"import pandas as pd\\n\",\n",
" \"from datetime import datetime\\n\",\n",
" \"from typing import Dict, List, Optional, Any\\n\",\n",
" \"import pytz\\n\",\n",
" \"import croniter\\n\",\n",
" \"\\n\",\n",
" \"from utils.mysql_agent import MySQLAgent\\n\",\n",
" \"from utils.logger import CrossPlatformLog\\n\",\n",
" \"from system_management.scheduler.task_scheduler import TaskScheduler\\n\"\n",
" ]\n",
" },\n",
" {\n",
" \"cell_type\": \"code\",\n",
" \"execution_count\": null,\n",
" \"metadata\": {},\n",
" \"outputs\": [],\n",
" \"source\": [\n",
" \"class TaskManager:\\n\",\n",
" \" def __init__(self, scheduler: TaskScheduler):\\n\",\n",
" \" \\\"\\\"\\\"初始化任务管理器\\n\",\n",
" \" Args:\\n\",\n",
" \" scheduler: 任务调度器实例\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" self.scheduler = scheduler\\n\",\n",
" \" self.db = scheduler.db # 复用调度器的数据库连接\\n\",\n",
" \" self.log = CrossPlatformLog.get_logger(\\\"TaskManager\\\")\\n\",\n",
" \"\\n\",\n",
" \" def get_all_tasks(self, active_only: bool = False) -> List[Dict[str, Any]]:\\n\",\n",
" \" \\\"\\\"\\\"获取所有任务列表\\n\",\n",
" \" Args:\\n\",\n",
" \" active_only: 是否只返回活跃任务\\n\",\n",
" \" Returns:\\n\",\n",
" \" 任务字典列表\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" try:\\n\",\n",
" \" query = \\\"SELECT * FROM main_task\\\"\\n\",\n",
" \" params = []\\n\",\n",
" \" if active_only:\\n\",\n",
" \" query += \\\" WHERE is_active = 1\\\"\\n\",\n",
" \" query += \\\" ORDER BY task_id\\\"\\n\",\n",
" \"\\n\",\n",
" \" tasks_df = self.db.query_to_df(query, params=params)\\n\",\n",
" \" return tasks_df.to_dict('records')\\n\",\n",
" \" except Exception as e:\\n\",\n",
" \" self.log.error(\\\"获取任务列表失败\\\", exc_info=True)\\n\",\n",
" \" return []\\n\",\n",
" \"\\n\",\n",
" \" def get_task_by_id(self, task_id: int) -> Optional[Dict[str, Any]]:\\n\",\n",
" \" \\\"\\\"\\\"通过ID获取任务详情\\n\",\n",
" \" Args:\\n\",\n",
" \" task_id: 任务ID\\n\",\n",
" \" Returns:\\n\",\n",
" \" 任务字典,不存在则返回None\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" try:\\n\",\n",
" \" task_df = self.db.query_to_df(\\n\",\n",
" \" \\\"SELECT * FROM main_task WHERE task_id = %s\\\",\\n\",\n",
" \" params=(task_id,)\\n\",\n",
" \" )\\n\",\n",
" \" if task_df.empty:\\n\",\n",
" \" return None\\n\",\n",
" \" return task_df.iloc[0].to_dict()\\n\",\n",
" \" except Exception as e:\\n\",\n",
" \" self.log.error(f\\\"获取任务详情失败 (ID: {task_id})\\\")\\n\",\n",
" \" return None\\n\",\n",
" \"\\n\",\n",
" \" def update_task(self, task_id: int, updates: Dict[str, Any]) -> bool:\\n\",\n",
" \" \\\"\\\"\\\"更新任务属性\\n\",\n",
" \" Args:\\n\",\n",
" \" task_id: 任务ID\\n\",\n",
" \" updates: 需要更新的字段字典\\n\",\n",
" \" Returns:\\n\",\n",
" \" 更新是否成功\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" if not updates:\\n\",\n",
" \" self.log.warning(\\\"未提供更新字段\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" try:\\n\",\n",
" \" # 处理Cron表达式更新(需重新计算下次运行时间)\\n\",\n",
" \" if 'cron_expression' in updates or 'time_zone' in updates:\\n\",\n",
" \" task = self.get_task_by_id(task_id)\\n\",\n",
" \" if not task:\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" cron_expr = updates.get('cron_expression', task['cron_expression'])\\n\",\n",
" \" time_zone = updates.get('time_zone', task['time_zone'])\\n\",\n",
" \" updates['next_run_time'] = self.scheduler._calculate_next_run_time(cron_expr, time_zone)\\n\",\n",
" \"\\n\",\n",
" \" # 执行更新\\n\",\n",
" \" self.scheduler._update_task_status(task_id, updates)\\n\",\n",
" \" self.log.info(f\\\"任务更新成功 (ID: {task_id})\\\")\\n\",\n",
" \" return True\\n\",\n",
" \" except Exception as e:\\n\",\n",
" \" self.log.error(f\\\"任务更新失败 (ID: {task_id})\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" def toggle_task_status(self, task_id: int, activate: bool) -> bool:\\n\",\n",
" \" \\\"\\\"\\\"切换任务激活状态\\n\",\n",
" \" Args:\\n\",\n",
" \" task_id: 任务ID\\n\",\n",
" \" activate: True=激活, False=禁用\\n\",\n",
" \" Returns:\\n\",\n",
" \" 操作是否成功\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" try:\\n\",\n",
" \" status = 1 if activate else 0\\n\",\n",
" \" # 激活时重新计算下次运行时间\\n\",\n",
" \" updates = {'is_active': status}\\n\",\n",
" \" if activate:\\n\",\n",
" \" task = self.get_task_by_id(task_id)\\n\",\n",
" \" if task:\\n\",\n",
" \" updates['next_run_time'] = self.scheduler._calculate_next_run_time(\\n\",\n",
" \" task['cron_expression'], task['time_zone']\\n\",\n",
" \" )\\n\",\n",
" \"\\n\",\n",
" \" self.scheduler._update_task_status(task_id, updates)\\n\",\n",
" \" self.log.info(f\\\"任务{'激活' if activate else '禁用'}成功 (ID: {task_id})\\\")\\n\",\n",
" \" return True\\n\",\n",
" \" except Exception as e:\\n\",\n",
" \" self.log.error(f\\\"任务状态切换失败 (ID: {task_id})\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" def delete_task(self, task_id: int) -> bool:\\n\",\n",
" \" \\\"\\\"\\\"删除任务\\n\",\n",
" \" Args:\\n\",\n",
" \" task_id: 任务ID\\n\",\n",
" \" Returns:\\n\",\n",
" \" 删除是否成功\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" try:\\n\",\n",
" \" # 检查任务是否存在\\n\",\n",
" \" if not self.get_task_by_id(task_id):\\n\",\n",
" \" self.log.warning(f\\\"任务不存在 (ID: {task_id})\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" # 执行删除\\n\",\n",
" \" self.db.execute_sql(\\n\",\n",
" \" \\\"DELETE FROM main_task WHERE task_id = %s\\\",\\n\",\n",
" \" params=(task_id,)\\n\",\n",
" \" )\\n\",\n",
" \" self.log.info(f\\\"任务删除成功 (ID: {task_id})\\\")\\n\",\n",
" \" return True\\n\",\n",
" \" except Exception as e:\\n\",\n",
" \" self.log.error(f\\\"任务删除失败 (ID: {task_id})\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" def run_task_manually(self, task_id: int) -> bool:\\n\",\n",
" \" \\\"\\\"\\\"手动执行任务\\n\",\n",
" \" Args:\\n\",\n",
" \" task_id: 任务ID\\n\",\n",
" \" Returns:\\n\",\n",
" \" 执行是否成功\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" try:\\n\",\n",
" \" task = self.get_task_by_id(task_id)\\n\",\n",
" \" if not task:\\n\",\n",
" \" self.log.warning(f\\\"任务不存在 (ID: {task_id})\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" # 标记任务为运行中\\n\",\n",
" \" self.scheduler._update_task_status(task_id, {\\n\",\n",
" \" 'is_running': 1,\\n\",\n",
" \" 'last_run_time': datetime.now()\\n\",\n",
" \" })\\n\",\n",
" \"\\n\",\n",
" \" # 执行任务逻辑\\n\",\n",
" \" self.scheduler._execute_task_logic(task)\\n\",\n",
" \"\\n\",\n",
" \" # 更新任务状态\\n\",\n",
" \" next_run_time = self.scheduler._calculate_next_run_time(\\n\",\n",
" \" task['cron_expression'], task['time_zone']\\n\",\n",
" \" )\\n\",\n",
" \" self.scheduler._update_task_status(task_id, {\\n\",\n",
" \" 'is_running': 0,\\n\",\n",
" \" 'last_run_status': 'success',\\n\",\n",
" \" 'run_count': task['run_count'] + 1,\\n\",\n",
" \" 'next_run_time': next_run_time\\n\",\n",
" \" })\\n\",\n",
" \" return True\\n\",\n",
" \" except Exception as e:\\n\",\n",
" \" self.scheduler._update_task_status(task_id, {\\n\",\n",
" \" 'is_running': 0,\\n\",\n",
" \" 'last_run_status': 'failed'\\n\",\n",
" \" })\\n\",\n",
" \" self.log.error(f\\\"任务手动执行失败 (ID: {task_id})\\\")\\n\",\n",
" \" return False\\n\",\n",
" \"\\n\",\n",
" \" def print_task_table(self, tasks: List[Dict[str, Any]]) -> None:\\n\",\n",
" \" \\\"\\\"\\\"格式化打印任务列表\\n\",\n",
" \" Args:\\n\",\n",
" \" tasks: 任务列表\\n\",\n",
" \" \\\"\\\"\\\"\\n\",\n",
" \" if not tasks:\\n\",\n",
" \" print(\\\"没有任务数据\\\")\\n\",\n",
" \" return\\n\",\n",
" \"\\n\",\n",
" \" # 转换为DataFrame并筛选显示字段\\n\",\n",
" \" df = pd.DataFrame(tasks)\\n\",\n",
" \" display_cols = [\\n\",\n",
" \" 'task_id', 'task_name', 'task_type', 'cron_expression',\\n\",\n",
" \" 'is_active', 'last_run_status', 'next_run_time'\\n\",\n",
" \" ]\\n\",\n",
" \" # 处理状态显示\\n\",\n",
" \" df['is_active'] = df['is_active'].map({1: '活跃', 0: '禁用'})\\n\",\n",
" \" print(f\\\"共 {len(df)} 个任务\\\\n\\\")\\n\",\n",
" \" print(df[display_cols].to_string(index=False))\"\n",
" ]\n",
" },\n",
" {\n",
" \"cell_type\": \"markdown\",\n",
" \"metadata\": {},\n",
" \"source\": [\n",
" \"## 使用示例\"\n",
" ]\n",
" },\n",
" {\n",
" \"cell_type\": \"code\",\n",
" \"execution_count\": null,\n",
" \"metadata\": {},\n",
" \"outputs\": [],\n",
" \"source\": [\n",
" \"# 初始化示例\\n\",\n",
" \"from config.config import ConfigManager\\n\",\n",
" \"\\n\",\n",
" \"# 加载配置\\n\",\n",
" \"config = ConfigManager()\\n\",\n",
" \"scheduler = TaskScheduler(config.get(\\\"database\\\"))\\n\",\n",
" \"manager = TaskManager(scheduler)\\n\",\n",
" \"\\n\",\n",
" \"# 1. 列出所有任务\\n\",\n",
" \"all_tasks = manager.get_all_tasks()\\n\",\n",
" \"manager.print_task_table(all_tasks)\\n\",\n",
" \"\\n\",\n",
" \"# 2. 获取单个任务详情\\n\",\n",
" \"task = manager.get_task_by_id(1)\\n\",\n",
" \"if task:\\n\",\n",
" \" print(\\\"\\\\n任务详情:\\\")\\n\",\n",
" \" for k, v in task.items():\\n\",\n",
" \" print(f\\\"{k}: {v}\\\")\\n\",\n",
" \"\\n\",\n",
" \"# 3. 更新任务Cron表达式\\n\",\n",
" \"manager.update_task(1, {'cron_expression': '0 */2 * * *'})\\n\",\n",
" \"\\n\",\n",
" \"# 4. 激活任务\\n\",\n",
" \"manager.toggle_task_status(1, activate=True)\\n\",\n",
" \"\\n\",\n",
" \"# 5. 手动执行任务\\n\",\n",
" \"manager.run_task_manually(1)\\n\",\n",
" \"\\n\",\n",
" \"# 6. 删除任务(谨慎操作)\\n\",\n",
" \"# manager.delete_task(1)\"\n",
" ]\n",
" },\n",
" {\n",
" \"cell_type\": \"markdown\",\n",
" \"metadata\": {},\n",
" \"source\": [\n",
" \"## 核心功能说明\\n\",\n",
" \"- 依赖 `TaskScheduler` 实现底层调度逻辑与数据库交互\\n\",\n",
" \"- 支持任务全生命周期管理(查询/更新/激活/执行/删除)\\n\",\n",
" \"- 内置日志记录与错误处理\\n\",\n",
" \"- 兼容Windows/macOS/Linux多平台\"\n",
" ]\n",
" }\n",
" ],\n",
" \"metadata\": {\n",
" \"kernelspec\": {\n",
" \"display_name\": \"Python 3\",\n",
" \"language\": \"python\",\n",
" \"name\": \"python3\"\n",
" },\n",
" \"language_info\": {\n",
" \"codemirror_mode\": {\n",
" \"name\": \"ipython\",\n",
" \"version\": 3\n",
" },\n",
" \"file_extension\": \".py\",\n",
" \"mimetype\": \"text/x-python\",\n",
" \"name\": \"python\",\n",
" \"nbconvert_exporter\": \"python\",\n",
" \"pygments_lexer\": \"ipython3\",\n",
" \"version\": \"3.8.10\"\n",
" }\n",
" },\n",
" \"nbformat\": 4,\n",
" \"nbformat_minor\": 4\n",
"}"
],
"outputs": [
{
"ename": "ModuleNotFoundError",
"evalue": "No module named 'system_management.scheduler.task_manager'",
"output_type": "error",
"traceback": [
"\u001B[31m---------------------------------------------------------------------------\u001B[39m",
"\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)",
"\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[1]\u001B[39m\u001B[32m, line 8\u001B[39m\n\u001B[32m 6\u001B[39m \u001B[38;5;66;03m# 导入系统组件\u001B[39;00m\n\u001B[32m 7\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01msystem_management\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mscheduler\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mtask_scheduler\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m TaskScheduler\n\u001B[32m----> \u001B[39m\u001B[32m8\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01msystem_management\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mscheduler\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mtask_management\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m TaskManager\n\u001B[32m 9\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mconfig\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m Config\n\u001B[32m 11\u001B[39m \u001B[38;5;66;03m# 初始化配置和管理器\u001B[39;00m\n",
"\u001B[36mFile \u001B[39m\u001B[32mD:\\Idea Project\\intelligence_system\\system_management\\scheduler\\task_management.py:4\u001B[39m\n\u001B[32m 2\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mdatetime\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m datetime\n\u001B[32m 3\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01msystem_management\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mscheduler\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mtask_scheduler\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m TaskScheduler\n\u001B[32m----> \u001B[39m\u001B[32m4\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01msystem_management\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mscheduler\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mtask_manager\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m TaskManager\n\u001B[32m 5\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mconfig\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mconfig\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m ConfigManager\n\u001B[32m 8\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mmain\u001B[39m():\n\u001B[32m 9\u001B[39m \u001B[38;5;66;03m# 初始化配置和组件\u001B[39;00m\n",
"\u001B[31mModuleNotFoundError\u001B[39m: No module named 'system_management.scheduler.task_manager'"
]
}
],
"execution_count": 1
},
{
"metadata": {},