Files
intelligence_system/tools/task_manager.ipynb
T
2025-09-09 17:38:27 +08:00

339 lines
11 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "## 1. 初始化(所有操作前必须运行)",
"id": "197b1b81f5528a50"
},
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# 初始化任务管理所需组件\n",
"import pandas as pd\n",
"from IPython.display import display, HTML, Markdown\n",
"from datetime import datetime\n",
"\n",
"# 导入系统组件\n",
"from system_management.scheduler.task_scheduler import TaskScheduler\n",
"from system_management.scheduler.task_manager import TaskManager\n",
"from config.config import ConfigManager\n",
"\n",
"# 初始化配置和管理器\n",
"config = ConfigManager()\n",
"scheduler = TaskScheduler(config.get(\"database\"))\n",
"manager = TaskManager(scheduler)\n",
"\n",
"# 设置显示样式\n",
"pd.set_option('display.max_colwidth', 100)\n",
"pd.set_option('display.width', 1000)\n",
"\n",
"def format_datetime(dt):\n",
" \"\"\"格式化日期时间显示\"\"\"\n",
" if pd.isna(dt) or dt is None:\n",
" return \"从未执行\"\n",
" return pd.to_datetime(dt).strftime('%Y-%m-%d %H:%M:%S')"
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 2. 列出任务(对应命令行 list",
"id": "8271189cef3b5f17"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 列出所有任务(包括已禁用的)\n",
"def list_tasks(active_only=True):\n",
" tasks = manager.get_all_tasks(active_only)\n",
" if not tasks:\n",
" display(Markdown(\"### 没有找到任务\"))\n",
" return None\n",
"\n",
" df = pd.DataFrame(tasks)\n",
"\n",
" # 格式化日期列\n",
" if 'last_run_time' in df.columns:\n",
" df['last_run_time'] = df['last_run_time'].apply(format_datetime)\n",
" if 'next_run_time' in df.columns:\n",
" df['next_run_time'] = df['next_run_time'].apply(format_datetime)\n",
"\n",
" # 重命名列名\n",
" df = df.rename(columns={\n",
" 'task_id': '任务ID',\n",
" 'task_name': '任务名称',\n",
" 'task_type': '任务类型',\n",
" 'module_path': '模块路径',\n",
" 'cron_expression': 'Cron表达式',\n",
" 'time_zone': '时区',\n",
" 'last_run_time': '最后运行时间',\n",
" 'next_run_time': '下次运行时间',\n",
" 'last_run_status': '运行状态',\n",
" 'is_active': '是否活跃',\n",
" 'run_count': '运行次数'\n",
" })\n",
"\n",
" display(Markdown(\"### 任务列表\"))\n",
" display(HTML(df.to_html(index=False)))\n",
" return df\n",
"\n",
"# 执行:列出所有任务(包括已禁用)\n",
"list_tasks(active_only=False)\n",
"\n",
"# 或者:只列出活跃任务\n",
"# list_tasks(active_only=True)"
],
"id": "7b020af55972643"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 3. 查看任务详情(对应命令行 show)",
"id": "7780dcef67a0534c"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 查看指定任务的详情\n",
"def show_task_details(task_id):\n",
" task = manager.get_task_by_id(task_id)\n",
" if not task:\n",
" display(Markdown(f\"### 未找到任务ID为 {task_id} 的任务\"))\n",
" return None\n",
"\n",
" details = [\"### 任务详情\"]\n",
" details.append(f\"**任务ID**: {task.get('task_id')}\")\n",
" details.append(f\"**任务名称**: {task.get('task_name')}\")\n",
" details.append(f\"**任务类型**: {task.get('task_type')}\")\n",
" details.append(f\"**模块路径**: {task.get('module_path')}\")\n",
" details.append(f\"**Cron表达式**: {task.get('cron_expression')}\")\n",
" details.append(f\"**时区**: {task.get('time_zone', 'Asia/Shanghai')}\")\n",
" details.append(f\"**最后运行时间**: {format_datetime(task.get('last_run_time'))}\")\n",
" details.append(f\"**下次运行时间**: {format_datetime(task.get('next_run_time'))}\")\n",
" details.append(f\"**运行状态**: {task.get('last_run_status', '未运行')}\")\n",
" details.append(f\"**是否活跃**: {'是' if task.get('is_active') else '否'}\")\n",
" details.append(f\"**运行次数**: {task.get('run_count', 0)}\")\n",
" details.append(f\"**创建时间**: {format_datetime(task.get('created_at'))}\")\n",
"\n",
" display(Markdown('\\n'.join(details)))\n",
" return task\n",
"\n",
"# 执行:查看任务ID为1的详情(替换为实际ID)\n",
"show_task_details(1)"
],
"id": "eab90de72c35429e"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 4. 添加新任务(对应命令行 add",
"id": "a313f1524f5a54bc"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 添加新任务\n",
"def add_new_task(name, task_type, module_path, cron_expression, timezone=\"Asia/Shanghai\"):\n",
" try:\n",
" task_id = scheduler.add_task(\n",
" task_name=name,\n",
" task_type=task_type,\n",
" module_path=module_path,\n",
" cron_expression=cron_expression,\n",
" time_zone=timezone\n",
" )\n",
" display(Markdown(f\"### 任务添加成功!\"))\n",
" display(Markdown(f\"新任务ID: {task_id},任务名称: {name}\"))\n",
" return task_id\n",
" except Exception as e:\n",
" display(Markdown(f\"### 添加任务失败: {str(e)}\"))\n",
" return None\n",
"\n",
"# 执行:添加一个新闻采集任务\n",
"add_new_task(\n",
" name=\"每日新闻采集\",\n",
" task_type=\"collector\",\n",
" module_path=\"collectors.news_collector\",\n",
" cron_expression=\"0 9 * * *\", # 每天9点执行\n",
" timezone=\"Asia/Shanghai\"\n",
")"
],
"id": "2b2d723bb8e2784f"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 5. 更新任务属性(对应命令行 update)",
"id": "12373bcbb4a0b434"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 更新任务属性\n",
"def update_task(task_id, **kwargs):\n",
" updates = {}\n",
" if 'name' in kwargs and kwargs['name']:\n",
" updates['task_name'] = kwargs['name']\n",
" if 'type' in kwargs and kwargs['type']:\n",
" updates['task_type'] = kwargs['type']\n",
" if 'module' in kwargs and kwargs['module']:\n",
" updates['module_path'] = kwargs['module']\n",
" if 'cron' in kwargs and kwargs['cron']:\n",
" updates['cron_expression'] = kwargs['cron']\n",
" if 'timezone' in kwargs and kwargs['timezone']:\n",
" updates['time_zone'] = kwargs['timezone']\n",
"\n",
" if not updates:\n",
" display(Markdown(\"### 没有提供任何更新内容\"))\n",
" return False\n",
"\n",
" success = manager.update_task(task_id, updates)\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} 更新成功\"))\n",
" show_task_details(task_id) # 显示更新后的详情\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} 更新失败\"))\n",
" return success\n",
"\n",
"# 执行:更新任务(示例:修改任务1的Cron表达式为每天10点)\n",
"update_task(1, cron=\"0 10 * * *\")\n",
"\n",
"# 执行:同时更新多个属性(名称和Cron表达式)\n",
"# update_task(1, name=\"每日早间新闻采集\", cron=\"0 8 * * *\")"
],
"id": "c892fd8ad2f0dd9d"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 6. 启用 / 禁用任务(对应命令行 toggle",
"id": "37564011cf5aa501"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 启用或禁用任务\n",
"def toggle_task_status(task_id, activate=True):\n",
" success = manager.toggle_task_status(task_id, activate)\n",
" action = \"启用\" if activate else \"禁用\"\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} {action}成功\"))\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} {action}失败\"))\n",
" return success\n",
"\n",
"# 执行:启用任务ID为1的任务\n",
"toggle_task_status(1, activate=True)\n",
"\n",
"# 执行:禁用任务ID为1的任务\n",
"# toggle_task_status(1, activate=False)"
],
"id": "65388d10c5c8d407"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 7. 手动执行任务(对应命令行 run)",
"id": "c554c748169d5ac8"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 手动执行任务\n",
"def run_task_manually(task_id):\n",
" display(Markdown(f\"### 正在手动执行任务ID {task_id}...\"))\n",
" success = manager.run_task_manually(task_id)\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} 执行成功\"))\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} 执行失败\"))\n",
" return success\n",
"\n",
"# 执行:手动运行任务ID为1的任务\n",
"run_task_manually(1)"
],
"id": "94892f4134316f8e"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 8. 删除任务(对应命令行 delete",
"id": "c3492a1af7dbf2b1"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 删除任务\n",
"def delete_task(task_id, confirm=False):\n",
" if not confirm:\n",
" display(Markdown(f\"### 警告:删除任务是不可逆操作!\"))\n",
" display(Markdown(f\"请运行 `delete_task({task_id}, confirm=True)` 确认删除\"))\n",
" return False\n",
"\n",
" success = manager.delete_task(task_id)\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} 删除成功\"))\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} 删除失败\"))\n",
" return success\n",
"\n",
"# 执行:第一步 - 确认删除(不会实际删除)\n",
"delete_task(1)\n",
"\n",
"# 执行:第二步 - 实际删除(谨慎操作!)\n",
"# delete_task(1, confirm=True)"
],
"id": "6936dcc673933a8d"
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}