diff --git a/doc/任务调度操作 b/doc/任务调度操作 new file mode 100644 index 0000000..ca75e36 --- /dev/null +++ b/doc/任务调度操作 @@ -0,0 +1,27 @@ +# 列出所有任务 +python system_management/scheduler/task_management.py list + +# 只显示活跃任务 +python system_management/scheduler/task_management.py list --active-only + +# 查看任务详情 +python system_management/scheduler/task_management.py show 1 + +# 更新任务Cron表达式 +python system_management/scheduler/task_management.py --cron "0 10 * * *" + +# 启用任务 +python system_management/scheduler/task_management.py toggle 1 --activate + +# 禁用任务 +python system_management/scheduler/task_management.py toggle 1 --deactivate + +# 手动执行任务 +python system_management/scheduler/task_management.py run 1 + +# 添加新任务 +python system_management/scheduler/task_management.py add \ + --name "hourly_data_check" \ + --type "processor" \ + --module "processors.data_checker" \ + --cron "0 * * * *" \ No newline at end of file diff --git a/system_management/scheduler/task_management.py b/system_management/scheduler/task_management.py new file mode 100644 index 0000000..ace58d7 --- /dev/null +++ b/system_management/scheduler/task_management.py @@ -0,0 +1,138 @@ +import argparse +from datetime import datetime +from system_management.scheduler.task_scheduler import TaskScheduler +from system_management.scheduler.task_manager import TaskManager +from config.config import ConfigManager + + +def main(): + # 初始化配置和组件 + config = ConfigManager() + scheduler = TaskScheduler(config.get("database")) + manager = TaskManager(scheduler) + + # 解析命令行参数 + parser = argparse.ArgumentParser(description="任务管理工具") + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # 列出任务 + list_parser = subparsers.add_parser("list", help="列出所有任务") + list_parser.add_argument("--active-only", action="store_true", help="只显示活跃任务") + + # 查看任务详情 + show_parser = subparsers.add_parser("show", help="显示任务详情") + show_parser.add_argument("task_id", type=int, help="任务ID") + + # 更新任务 + update_parser = subparsers.add_parser("update", help="更新任务属性") + update_parser.add_argument("task_id", type=int, help="任务ID") + update_parser.add_argument("--name", help="任务名称") + update_parser.add_argument("--type", help="任务类型") + update_parser.add_argument("--module", help="模块路径") + update_parser.add_argument("--cron", help="Cron表达式") + update_parser.add_argument("--timezone", help="时区") + + # 启用/禁用任务 + toggle_parser = subparsers.add_parser("toggle", help="启用/禁用任务") + toggle_parser.add_argument("task_id", type=int, help="任务ID") + toggle_parser.add_argument("--activate", action="store_true", help="启用任务") + toggle_parser.add_argument("--deactivate", action="store_true", help="禁用任务") + + # 删除任务 + delete_parser = subparsers.add_parser("delete", help="删除任务") + delete_parser.add_argument("task_id", type=int, help="任务ID") + + # 手动执行任务 + run_parser = subparsers.add_parser("run", help="手动执行任务") + run_parser.add_argument("task_id", type=int, help="任务ID") + + # 添加任务 + add_parser = subparsers.add_parser("add", help="添加新任务") + add_parser.add_argument("--name", required=True, help="任务名称") + add_parser.add_argument("--type", required=True, help="任务类型") + add_parser.add_argument("--module", required=True, help="模块路径") + add_parser.add_argument("--cron", required=True, help="Cron表达式") + add_parser.add_argument("--timezone", default="Asia/Shanghai", help="时区") + + args = parser.parse_args() + + # 执行相应命令 + if args.command == "list": + tasks = manager.get_all_tasks(args.active_only) + manager.print_task_table(tasks) + + elif args.command == "show": + task = manager.get_task_by_id(args.task_id) + if task: + print("\n===== 任务详情 =====") + for key, value in task.items(): + print(f"{key}: {value}") + print("====================") + + elif args.command == "update": + updates = {} + if args.name: + updates['task_name'] = args.name + if args.type: + updates['task_type'] = args.type + if args.module: + updates['module_path'] = args.module + if args.cron: + updates['cron_expression'] = args.cron + if args.timezone: + updates['time_zone'] = args.timezone + + if manager.update_task(args.task_id, updates): + print(f"任务ID {args.task_id} 更新成功") + else: + print(f"任务ID {args.task_id} 更新失败") + + elif args.command == "toggle": + if args.activate: + success = manager.toggle_task_status(args.task_id, True) + action = "启用" + elif args.deactivate: + success = manager.toggle_task_status(args.task_id, False) + action = "禁用" + else: + print("请指定 --activate 或 --deactivate") + return + + if success: + print(f"任务ID {args.task_id} {action}成功") + else: + print(f"任务ID {args.task_id} {action}失败") + + elif args.command == "delete": + confirm = input(f"确定要删除任务ID {args.task_id} 吗? (y/n) ") + if confirm.lower() == 'y': + if manager.delete_task(args.task_id): + print(f"任务ID {args.task_id} 删除成功") + else: + print(f"任务ID {args.task_id} 删除失败") + else: + print("操作已取消") + + elif args.command == "run": + print(f"正在手动执行任务ID {args.task_id}...") + if manager.run_task_manually(args.task_id): + print(f"任务ID {args.task_id} 执行成功") + else: + print(f"任务ID {args.task_id} 执行失败") + + elif args.command == "add": + try: + task_id = scheduler.add_task( + task_name=args.name, + task_type=args.type, + module_path=args.module, + cron_expression=args.cron, + time_zone=args.timezone + ) + print(f"新任务添加成功,ID: {task_id}") + except Exception as e: + print(f"添加任务失败: {str(e)}") + + +if __name__ == "__main__": + main() diff --git a/tools/task_manager.ipynb b/tools/task_manager.ipynb new file mode 100644 index 0000000..ea8ccea --- /dev/null +++ b/tools/task_manager.ipynb @@ -0,0 +1,338 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 1. 初始化(所有操作前必须运行)", + "id": "197b1b81f5528a50" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "initial_id", + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# 初始化任务管理所需组件\n", + "import pandas as pd\n", + "from IPython.display import display, HTML, Markdown\n", + "from datetime import datetime\n", + "\n", + "# 导入系统组件\n", + "from system_management.scheduler.task_scheduler import TaskScheduler\n", + "from system_management.scheduler.task_manager import TaskManager\n", + "from config.config import ConfigManager\n", + "\n", + "# 初始化配置和管理器\n", + "config = ConfigManager()\n", + "scheduler = TaskScheduler(config.get(\"database\"))\n", + "manager = TaskManager(scheduler)\n", + "\n", + "# 设置显示样式\n", + "pd.set_option('display.max_colwidth', 100)\n", + "pd.set_option('display.width', 1000)\n", + "\n", + "def format_datetime(dt):\n", + " \"\"\"格式化日期时间显示\"\"\"\n", + " if pd.isna(dt) or dt is None:\n", + " return \"从未执行\"\n", + " return pd.to_datetime(dt).strftime('%Y-%m-%d %H:%M:%S')" + ] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 2. 列出任务(对应命令行 list)", + "id": "8271189cef3b5f17" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 列出所有任务(包括已禁用的)\n", + "def list_tasks(active_only=True):\n", + " tasks = manager.get_all_tasks(active_only)\n", + " if not tasks:\n", + " display(Markdown(\"### 没有找到任务\"))\n", + " return None\n", + "\n", + " df = pd.DataFrame(tasks)\n", + "\n", + " # 格式化日期列\n", + " if 'last_run_time' in df.columns:\n", + " df['last_run_time'] = df['last_run_time'].apply(format_datetime)\n", + " if 'next_run_time' in df.columns:\n", + " df['next_run_time'] = df['next_run_time'].apply(format_datetime)\n", + "\n", + " # 重命名列名\n", + " df = df.rename(columns={\n", + " 'task_id': '任务ID',\n", + " 'task_name': '任务名称',\n", + " 'task_type': '任务类型',\n", + " 'module_path': '模块路径',\n", + " 'cron_expression': 'Cron表达式',\n", + " 'time_zone': '时区',\n", + " 'last_run_time': '最后运行时间',\n", + " 'next_run_time': '下次运行时间',\n", + " 'last_run_status': '运行状态',\n", + " 'is_active': '是否活跃',\n", + " 'run_count': '运行次数'\n", + " })\n", + "\n", + " display(Markdown(\"### 任务列表\"))\n", + " display(HTML(df.to_html(index=False)))\n", + " return df\n", + "\n", + "# 执行:列出所有任务(包括已禁用)\n", + "list_tasks(active_only=False)\n", + "\n", + "# 或者:只列出活跃任务\n", + "# list_tasks(active_only=True)" + ], + "id": "7b020af55972643" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 3. 查看任务详情(对应命令行 show)", + "id": "7780dcef67a0534c" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 查看指定任务的详情\n", + "def show_task_details(task_id):\n", + " task = manager.get_task_by_id(task_id)\n", + " if not task:\n", + " display(Markdown(f\"### 未找到任务ID为 {task_id} 的任务\"))\n", + " return None\n", + "\n", + " details = [\"### 任务详情\"]\n", + " details.append(f\"**任务ID**: {task.get('task_id')}\")\n", + " details.append(f\"**任务名称**: {task.get('task_name')}\")\n", + " details.append(f\"**任务类型**: {task.get('task_type')}\")\n", + " details.append(f\"**模块路径**: {task.get('module_path')}\")\n", + " details.append(f\"**Cron表达式**: {task.get('cron_expression')}\")\n", + " details.append(f\"**时区**: {task.get('time_zone', 'Asia/Shanghai')}\")\n", + " details.append(f\"**最后运行时间**: {format_datetime(task.get('last_run_time'))}\")\n", + " details.append(f\"**下次运行时间**: {format_datetime(task.get('next_run_time'))}\")\n", + " details.append(f\"**运行状态**: {task.get('last_run_status', '未运行')}\")\n", + " details.append(f\"**是否活跃**: {'是' if task.get('is_active') else '否'}\")\n", + " details.append(f\"**运行次数**: {task.get('run_count', 0)}\")\n", + " details.append(f\"**创建时间**: {format_datetime(task.get('created_at'))}\")\n", + "\n", + " display(Markdown('\\n'.join(details)))\n", + " return task\n", + "\n", + "# 执行:查看任务ID为1的详情(替换为实际ID)\n", + "show_task_details(1)" + ], + "id": "eab90de72c35429e" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 4. 添加新任务(对应命令行 add)", + "id": "a313f1524f5a54bc" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 添加新任务\n", + "def add_new_task(name, task_type, module_path, cron_expression, timezone=\"Asia/Shanghai\"):\n", + " try:\n", + " task_id = scheduler.add_task(\n", + " task_name=name,\n", + " task_type=task_type,\n", + " module_path=module_path,\n", + " cron_expression=cron_expression,\n", + " time_zone=timezone\n", + " )\n", + " display(Markdown(f\"### 任务添加成功!\"))\n", + " display(Markdown(f\"新任务ID: {task_id},任务名称: {name}\"))\n", + " return task_id\n", + " except Exception as e:\n", + " display(Markdown(f\"### 添加任务失败: {str(e)}\"))\n", + " return None\n", + "\n", + "# 执行:添加一个新闻采集任务\n", + "add_new_task(\n", + " name=\"每日新闻采集\",\n", + " task_type=\"collector\",\n", + " module_path=\"collectors.news_collector\",\n", + " cron_expression=\"0 9 * * *\", # 每天9点执行\n", + " timezone=\"Asia/Shanghai\"\n", + ")" + ], + "id": "2b2d723bb8e2784f" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 5. 更新任务属性(对应命令行 update)", + "id": "12373bcbb4a0b434" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 更新任务属性\n", + "def update_task(task_id, **kwargs):\n", + " updates = {}\n", + " if 'name' in kwargs and kwargs['name']:\n", + " updates['task_name'] = kwargs['name']\n", + " if 'type' in kwargs and kwargs['type']:\n", + " updates['task_type'] = kwargs['type']\n", + " if 'module' in kwargs and kwargs['module']:\n", + " updates['module_path'] = kwargs['module']\n", + " if 'cron' in kwargs and kwargs['cron']:\n", + " updates['cron_expression'] = kwargs['cron']\n", + " if 'timezone' in kwargs and kwargs['timezone']:\n", + " updates['time_zone'] = kwargs['timezone']\n", + "\n", + " if not updates:\n", + " display(Markdown(\"### 没有提供任何更新内容\"))\n", + " return False\n", + "\n", + " success = manager.update_task(task_id, updates)\n", + " if success:\n", + " display(Markdown(f\"### 任务ID {task_id} 更新成功\"))\n", + " show_task_details(task_id) # 显示更新后的详情\n", + " else:\n", + " display(Markdown(f\"### 任务ID {task_id} 更新失败\"))\n", + " return success\n", + "\n", + "# 执行:更新任务(示例:修改任务1的Cron表达式为每天10点)\n", + "update_task(1, cron=\"0 10 * * *\")\n", + "\n", + "# 执行:同时更新多个属性(名称和Cron表达式)\n", + "# update_task(1, name=\"每日早间新闻采集\", cron=\"0 8 * * *\")" + ], + "id": "c892fd8ad2f0dd9d" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 6. 启用 / 禁用任务(对应命令行 toggle)", + "id": "37564011cf5aa501" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 启用或禁用任务\n", + "def toggle_task_status(task_id, activate=True):\n", + " success = manager.toggle_task_status(task_id, activate)\n", + " action = \"启用\" if activate else \"禁用\"\n", + " if success:\n", + " display(Markdown(f\"### 任务ID {task_id} {action}成功\"))\n", + " else:\n", + " display(Markdown(f\"### 任务ID {task_id} {action}失败\"))\n", + " return success\n", + "\n", + "# 执行:启用任务ID为1的任务\n", + "toggle_task_status(1, activate=True)\n", + "\n", + "# 执行:禁用任务ID为1的任务\n", + "# toggle_task_status(1, activate=False)" + ], + "id": "65388d10c5c8d407" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 7. 手动执行任务(对应命令行 run)", + "id": "c554c748169d5ac8" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 手动执行任务\n", + "def run_task_manually(task_id):\n", + " display(Markdown(f\"### 正在手动执行任务ID {task_id}...\"))\n", + " success = manager.run_task_manually(task_id)\n", + " if success:\n", + " display(Markdown(f\"### 任务ID {task_id} 执行成功\"))\n", + " else:\n", + " display(Markdown(f\"### 任务ID {task_id} 执行失败\"))\n", + " return success\n", + "\n", + "# 执行:手动运行任务ID为1的任务\n", + "run_task_manually(1)" + ], + "id": "94892f4134316f8e" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 8. 删除任务(对应命令行 delete)", + "id": "c3492a1af7dbf2b1" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "# 删除任务\n", + "def delete_task(task_id, confirm=False):\n", + " if not confirm:\n", + " display(Markdown(f\"### 警告:删除任务是不可逆操作!\"))\n", + " display(Markdown(f\"请运行 `delete_task({task_id}, confirm=True)` 确认删除\"))\n", + " return False\n", + "\n", + " success = manager.delete_task(task_id)\n", + " if success:\n", + " display(Markdown(f\"### 任务ID {task_id} 删除成功\"))\n", + " else:\n", + " display(Markdown(f\"### 任务ID {task_id} 删除失败\"))\n", + " return success\n", + "\n", + "# 执行:第一步 - 确认删除(不会实际删除)\n", + "delete_task(1)\n", + "\n", + "# 执行:第二步 - 实际删除(谨慎操作!)\n", + "# delete_task(1, confirm=True)" + ], + "id": "6936dcc673933a8d" + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}