Files
2025-10-30 17:24:28 +08:00

1074 lines
36 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"id": "197b1b81f5528a50",
"metadata": {},
"source": [
"## 1. 初始化(所有操作前必须运行)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "initial_id",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-29T02:25:08.582541Z",
"start_time": "2025-10-29T02:25:08.473381Z"
},
"collapsed": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"PROJECT_ROOT = d:\\Idea Project\\intelligence_system\n",
"\u001b[32m2025-10-30 13:57:07\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m任务调度器已初始化,最大工作线程数: 5\u001b[0m\n"
]
}
],
"source": [
"# 使 Notebook 可从项目根导入\n",
"import sys\n",
"from pathlib import Path\n",
"\n",
"def add_project_root(marker_dirs=(\"utils\", \"system_management\")):\n",
" p = Path.cwd()\n",
" for _ in range(6):\n",
" if all((p / d).exists() for d in marker_dirs):\n",
" if str(p) not in sys.path:\n",
" sys.path.insert(0, str(p))\n",
" return p\n",
" p = p.parent\n",
" raise RuntimeError(\"未找到项目根目录,请从项目根启动 Notebook 或手动设置 sys.path\")\n",
"\n",
"PROJECT_ROOT = add_project_root()\n",
"print(f\"PROJECT_ROOT = {PROJECT_ROOT}\")\n",
"\n",
"# 依赖与日志\n",
"import pandas as pd\n",
"from IPython.display import display, HTML, Markdown\n",
"from utils.logger import CrossPlatformLog\n",
"log = CrossPlatformLog.get_logger(\"TaskNotebook\")\n",
"\n",
"# 配置与调度器\n",
"from config import Config # 若你使用 ConfigManager,请改为: from config.config import ConfigManager\n",
"from system_management.scheduler.task_scheduler import TaskScheduler\n",
"\n",
"# 初始化调度器(根据你的项目配置选一段)\n",
"scheduler = TaskScheduler(Config.MYSQL_CONFIG)\n",
"# 或使用 ConfigManager\n",
"# config = ConfigManager()\n",
"# scheduler = TaskScheduler(config.get(\"database\"))\n",
"\n",
"# 在 Notebook 中实现一个最小可用的 TaskManager\n",
"class TaskManager:\n",
" def __init__(self, scheduler: TaskScheduler):\n",
" self.scheduler = scheduler\n",
" self.db = scheduler.db # 复用调度器里的 MySQLAgent\n",
"\n",
" def get_all_tasks(self, active_only: bool = False):\n",
" sql = \"\"\"\n",
" SELECT *\n",
" FROM main_task\n",
" {where}\n",
" ORDER BY created_at DESC, task_id DESC\n",
" \"\"\"\n",
" where = \"WHERE is_active = 1\" if active_only else \"\"\n",
" df = self.db.query_to_df(sql.format(where=where))\n",
" return [] if df is None or df.empty else df.to_dict(\"records\")\n",
"\n",
" def get_task_by_id(self, task_id: int):\n",
" df = self.db.query_to_df(\n",
" \"SELECT * FROM main_task WHERE task_id = %s\",\n",
" params=(task_id,)\n",
" )\n",
" return None if df is None or df.empty else df.iloc[0].to_dict()\n",
"\n",
" def update_task(self, task_id: int, updates: dict) -> bool:\n",
" if not updates:\n",
" return False\n",
" # 允许更新的字段(与调度器一致)\n",
" allowed = {\n",
" \"task_name\", \"task_type\", \"module_path\",\n",
" \"cron_expression\", \"time_zone\"\n",
" }\n",
" filtered = {k: v for k, v in updates.items() if k in allowed}\n",
" if not filtered:\n",
" return False\n",
"\n",
" set_clause = \", \".join([f\"{k}=%s\" for k in filtered.keys()])\n",
" params = list(filtered.values()) + [task_id]\n",
" sql = f\"UPDATE main_task SET {set_clause}, updated_at=NOW() WHERE task_id=%s\"\n",
" affected = self.db.execute_sql(sql, params=params)\n",
" return affected == 1\n",
"\n",
" def toggle_task_status(self, task_id: int, activate: bool) -> bool:\n",
" sql = \"UPDATE main_task SET is_active=%s, updated_at=NOW() WHERE task_id=%s\"\n",
" affected = self.db.execute_sql(sql, params=(1 if activate else 0, task_id))\n",
" return affected == 1\n",
"\n",
" def delete_task(self, task_id: int) -> bool:\n",
" # 如果你更偏好软删除,可以改为: UPDATE main_task SET is_active=0, updated_at=NOW() WHERE task_id=%s\n",
" affected = self.db.execute_sql(\"DELETE FROM main_task WHERE task_id=%s\", params=(task_id,))\n",
" return affected == 1\n",
"\n",
" def run_task_manually(self, task_id: int) -> bool:\n",
" # 读取任务,直接复用调度器的单任务执行逻辑\n",
" task = self.get_task_by_id(task_id)\n",
" if not task:\n",
" return False\n",
" # _process_single_task 期望 dict\n",
" try:\n",
" return bool(self.scheduler._process_single_task(task)) # 注意:使用了调度器的内部方法\n",
" except Exception:\n",
" log.exception(\"手动执行任务失败\")\n",
" return False\n",
"\n",
" def run_task_synchronously(self, task_id: int) -> dict:\n",
" \"\"\"同步执行任务并返回详细结果(用于Notebook中查看执行过程)\"\"\"\n",
" import time\n",
" import sys\n",
" from io import StringIO\n",
" \n",
" task = self.get_task_by_id(task_id)\n",
" if not task:\n",
" return {\n",
" 'success': False,\n",
" 'error': f'未找到任务ID: {task_id}',\n",
" 'output': ''\n",
" }\n",
" \n",
" # 捕获标准输出\n",
" old_stdout = sys.stdout\n",
" sys.stdout = output_buffer = StringIO()\n",
" \n",
" start_time = time.time()\n",
" success = False\n",
" error_msg = None\n",
" \n",
" try:\n",
" # 直接同步执行任务逻辑\n",
" self.scheduler._execute_task_logic(task)\n",
" success = True\n",
" \n",
" # 更新任务状态\n",
" next_run_time = self.scheduler._calculate_next_run_time(\n",
" cron_expr=task['cron_expression'],\n",
" time_zone=task.get('time_zone', 'Asia/Shanghai')\n",
" )\n",
" \n",
" self.scheduler._update_task_status(task['task_id'], {\n",
" 'last_run_status': 'success',\n",
" 'is_running': 0,\n",
" 'run_count': task['run_count'] + 1,\n",
" 'next_run_time': next_run_time\n",
" })\n",
" \n",
" except Exception as e:\n",
" success = False\n",
" error_msg = str(e)\n",
" log.exception(f\"任务执行失败: {task['task_name']}\")\n",
" \n",
" # 更新失败状态\n",
" try:\n",
" next_retry_time = datetime.now() + pd.Timedelta(minutes=15)\n",
" self.scheduler._update_task_status(task['task_id'], {\n",
" 'last_run_status': 'failed',\n",
" 'is_running': 0,\n",
" 'next_run_time': next_retry_time\n",
" })\n",
" except Exception:\n",
" pass\n",
" \n",
" finally:\n",
" # 恢复标准输出\n",
" sys.stdout = old_stdout\n",
" output_text = output_buffer.getvalue()\n",
" \n",
" execution_time = time.time() - start_time\n",
" \n",
" return {\n",
" 'success': success,\n",
" 'task_name': task['task_name'],\n",
" 'task_id': task['task_id'],\n",
" 'execution_time': execution_time,\n",
" 'output': output_text,\n",
" 'error': error_msg\n",
" }\n",
"\n",
"# 在这里创建 manager(供后续单元使用)\n",
"manager = TaskManager(scheduler)\n",
"\n",
"# 常用辅助函数\n",
"def format_datetime(dt):\n",
" if dt is None:\n",
" return \"-\"\n",
" try:\n",
" if isinstance(dt, pd.Timestamp) and pd.isna(dt):\n",
" return \"-\"\n",
" return dt.strftime(\"%Y-%m-%d %H:%M:%S\")\n",
" except Exception:\n",
" try:\n",
" if pd.isna(dt):\n",
" return \"-\"\n",
" except Exception:\n",
" pass\n",
" return str(dt)"
]
},
{
"cell_type": "markdown",
"id": "8271189cef3b5f17",
"metadata": {},
"source": [
"## 2. 列出任务(对应命令行 list"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "7b020af55972643",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-17T05:43:18.499582Z",
"start_time": "2025-10-17T05:43:18.394863Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-29 09:54:09\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
]
},
{
"data": {
"text/markdown": [
"### 任务列表"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th>任务ID</th>\n",
" <th>任务名称</th>\n",
" <th>任务类型</th>\n",
" <th>模块路径</th>\n",
" <th>Cron表达式</th>\n",
" <th>时区</th>\n",
" <th>下次运行时间</th>\n",
" <th>最后运行时间</th>\n",
" <th>运行状态</th>\n",
" <th>运行次数</th>\n",
" <th>是否活跃</th>\n",
" <th>is_running</th>\n",
" <th>created_at</th>\n",
" <th>updated_at</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <td>2</td>\n",
" <td>RSS基于规则数据处理</td>\n",
" <td>processor</td>\n",
" <td>processors.processor_rss_data</td>\n",
" <td>0 8,20 * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-28 20:00:00</td>\n",
" <td>2025-10-28 13:34:49</td>\n",
" <td>success</td>\n",
" <td>10</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-22 16:06:42</td>\n",
" <td>2025-10-28 13:34:50</td>\n",
" </tr>\n",
" <tr>\n",
" <td>1</td>\n",
" <td>RSS新闻订阅</td>\n",
" <td>collector</td>\n",
" <td>collectors.rss_subscriptions.NewsAPIClient</td>\n",
" <td>*/5 * * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-28 13:40:00</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" <td>success</td>\n",
" <td>495</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-16 15:47:34</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>任务ID</th>\n",
" <th>任务名称</th>\n",
" <th>任务类型</th>\n",
" <th>模块路径</th>\n",
" <th>Cron表达式</th>\n",
" <th>时区</th>\n",
" <th>下次运行时间</th>\n",
" <th>最后运行时间</th>\n",
" <th>运行状态</th>\n",
" <th>运行次数</th>\n",
" <th>是否活跃</th>\n",
" <th>is_running</th>\n",
" <th>created_at</th>\n",
" <th>updated_at</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2</td>\n",
" <td>RSS基于规则数据处理</td>\n",
" <td>processor</td>\n",
" <td>processors.processor_rss_data</td>\n",
" <td>0 8,20 * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-28 20:00:00</td>\n",
" <td>2025-10-28 13:34:49</td>\n",
" <td>success</td>\n",
" <td>10</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-22 16:06:42</td>\n",
" <td>2025-10-28 13:34:50</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>RSS新闻订阅</td>\n",
" <td>collector</td>\n",
" <td>collectors.rss_subscriptions.NewsAPIClient</td>\n",
" <td>*/5 * * * *</td>\n",
" <td>Asia/Shanghai</td>\n",
" <td>2025-10-28 13:40:00</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" <td>success</td>\n",
" <td>495</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>2025-10-16 15:47:34</td>\n",
" <td>2025-10-28 13:35:09</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" 任务ID 任务名称 任务类型 模块路径 \\\n",
"0 2 RSS基于规则数据处理 processor processors.processor_rss_data \n",
"1 1 RSS新闻订阅 collector collectors.rss_subscriptions.NewsAPIClient \n",
"\n",
" Cron表达式 时区 下次运行时间 最后运行时间 \\\n",
"0 0 8,20 * * * Asia/Shanghai 2025-10-28 20:00:00 2025-10-28 13:34:49 \n",
"1 */5 * * * * Asia/Shanghai 2025-10-28 13:40:00 2025-10-28 13:35:09 \n",
"\n",
" 运行状态 运行次数 是否活跃 is_running created_at updated_at \n",
"0 success 10 1 0 2025-10-22 16:06:42 2025-10-28 13:34:50 \n",
"1 success 495 1 0 2025-10-16 15:47:34 2025-10-28 13:35:09 "
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 列出所有任务(包括已禁用的)\n",
"def list_tasks(active_only=True):\n",
" tasks = manager.get_all_tasks(active_only)\n",
" if not tasks:\n",
" display(Markdown(\"### 没有找到任务\"))\n",
" return None\n",
"\n",
" df = pd.DataFrame(tasks)\n",
"\n",
" # 格式化日期列\n",
" if 'last_run_time' in df.columns:\n",
" df['last_run_time'] = df['last_run_time'].apply(format_datetime)\n",
" if 'next_run_time' in df.columns:\n",
" df['next_run_time'] = df['next_run_time'].apply(format_datetime)\n",
"\n",
" # 重命名列名\n",
" df = df.rename(columns={\n",
" 'task_id': '任务ID',\n",
" 'task_name': '任务名称',\n",
" 'task_type': '任务类型',\n",
" 'module_path': '模块路径',\n",
" 'cron_expression': 'Cron表达式',\n",
" 'time_zone': '时区',\n",
" 'last_run_time': '最后运行时间',\n",
" 'next_run_time': '下次运行时间',\n",
" 'last_run_status': '运行状态',\n",
" 'is_active': '是否活跃',\n",
" 'run_count': '运行次数'\n",
" })\n",
"\n",
" display(Markdown(\"### 任务列表\"))\n",
" display(HTML(df.to_html(index=False)))\n",
" return df\n",
"\n",
"# 执行:列出所有任务(包括已禁用)\n",
"list_tasks(active_only=False)\n",
"\n",
"# 或者:只列出活跃任务\n",
"# list_tasks(active_only=True)"
]
},
{
"cell_type": "markdown",
"id": "7780dcef67a0534c",
"metadata": {},
"source": [
"## 3. 查看任务详情(对应命令行 show)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "eab90de72c35429e",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-29T02:26:12.873536Z",
"start_time": "2025-10-29T02:26:12.648420Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-29 10:26:12\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
]
},
{
"data": {
"text/markdown": [
"### 任务详情\n",
"**任务ID**: 1\n",
"**任务名称**: RSS新闻订阅\n",
"**任务类型**: collector\n",
"**模块路径**: processors.processor_rss_data.RSSDataProcessor\n",
"**Cron表达式**: */5 * * * *\n",
"**时区**: Asia/Shanghai\n",
"**最后运行时间**: 2025-10-28 13:35:09\n",
"**下次运行时间**: 2025-10-29 10:25:00\n",
"**运行状态**: success\n",
"**是否活跃**: 是\n",
"**运行次数**: 496\n",
"**创建时间**: 2025-10-16 15:47:34"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"{'task_id': 1,\n",
" 'task_name': 'RSS新闻订阅',\n",
" 'task_type': 'collector',\n",
" 'module_path': 'processors.processor_rss_data.RSSDataProcessor',\n",
" 'cron_expression': '*/5 * * * *',\n",
" 'time_zone': 'Asia/Shanghai',\n",
" 'next_run_time': Timestamp('2025-10-29 10:25:00'),\n",
" 'last_run_time': Timestamp('2025-10-28 13:35:09'),\n",
" 'last_run_status': 'success',\n",
" 'run_count': 496,\n",
" 'is_active': 1,\n",
" 'is_running': 0,\n",
" 'created_at': Timestamp('2025-10-16 15:47:34'),\n",
" 'updated_at': Timestamp('2025-10-29 10:24:49')}"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 查看指定任务的详情\n",
"def show_task_details(task_id):\n",
" task = manager.get_task_by_id(task_id)\n",
" if not task:\n",
" display(Markdown(f\"### 未找到任务ID为 {task_id} 的任务\"))\n",
" return None\n",
"\n",
" details = [\"### 任务详情\"]\n",
" details.append(f\"**任务ID**: {task.get('task_id')}\")\n",
" details.append(f\"**任务名称**: {task.get('task_name')}\")\n",
" details.append(f\"**任务类型**: {task.get('task_type')}\")\n",
" details.append(f\"**模块路径**: {task.get('module_path')}\")\n",
" details.append(f\"**Cron表达式**: {task.get('cron_expression')}\")\n",
" details.append(f\"**时区**: {task.get('time_zone', 'Asia/Shanghai')}\")\n",
" details.append(f\"**最后运行时间**: {format_datetime(task.get('last_run_time'))}\")\n",
" details.append(f\"**下次运行时间**: {format_datetime(task.get('next_run_time'))}\")\n",
" details.append(f\"**运行状态**: {task.get('last_run_status', '未运行')}\")\n",
" details.append(f\"**是否活跃**: {'是' if task.get('is_active') else '否'}\")\n",
" details.append(f\"**运行次数**: {task.get('run_count', 0)}\")\n",
" details.append(f\"**创建时间**: {format_datetime(task.get('created_at'))}\")\n",
"\n",
" display(Markdown('\\n'.join(details)))\n",
" return task\n",
"\n",
"# 执行:查看任务ID为1的详情(替换为实际ID)\n",
"show_task_details(1)"
]
},
{
"cell_type": "markdown",
"id": "a313f1524f5a54bc",
"metadata": {},
"source": [
"## 4. 添加新任务(对应命令行 add)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "2b2d723bb8e2784f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-29 09:56:52\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
"\u001b[32m2025-10-29 09:56:52\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m新任务添加成功\u001b[0m\n"
]
},
{
"data": {
"text/markdown": [
"### 任务添加成功!"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"新任务ID: 0,任务名称: AI处理RSS新闻"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"np.int64(0)"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 添加新任务\n",
"def add_new_task(name, task_type, module_path, cron_expression, timezone=\"Asia/Shanghai\"):\n",
" try:\n",
" task_id = scheduler.add_task(\n",
" task_name=name,\n",
" task_type=task_type,\n",
" module_path=module_path,\n",
" cron_expression=cron_expression,\n",
" time_zone=timezone\n",
" )\n",
" display(Markdown(f\"### 任务添加成功!\"))\n",
" display(Markdown(f\"新任务ID: {task_id},任务名称: {name}\"))\n",
" return task_id\n",
" except Exception as e:\n",
" display(Markdown(f\"### 添加任务失败: {str(e)}\"))\n",
" return None\n",
"\n",
"# 执行:添加一个新闻采集任务\n",
"add_new_task(\n",
" name=\"AI处理RSS新闻\",\n",
" task_type=\"processor\",\n",
" module_path=\"processors.ai_processors.ai_processor_rss_data.RSSDataAIProcessor\",\n",
" cron_expression=\"5 0 * * *\", # 每5分钟执行1次\n",
" timezone=\"Asia/Shanghai\"\n",
")"
]
},
{
"cell_type": "markdown",
"id": "12373bcbb4a0b434",
"metadata": {},
"source": [
"## 5. 更新任务属性(对应命令行 update)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "c892fd8ad2f0dd9d",
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-29T02:29:56.088085Z",
"start_time": "2025-10-29T02:29:55.754298Z"
}
},
"outputs": [
{
"data": {
"text/markdown": [
"### 任务ID 2 更新成功"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-29 10:29:56\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
]
},
{
"data": {
"text/markdown": [
"### 任务详情\n",
"**任务ID**: 2\n",
"**任务名称**: RSS基于规则数据处理\n",
"**任务类型**: processor\n",
"**模块路径**: processors.processor_rss_data\n",
"**Cron表达式**: 0 8,20 * * *\n",
"**时区**: Asia/Shanghai\n",
"**最后运行时间**: 2025-10-28 13:34:49\n",
"**下次运行时间**: 2025-10-28 20:00:00\n",
"**运行状态**: success\n",
"**是否活跃**: 是\n",
"**运行次数**: 10\n",
"**创建时间**: 2025-10-22 16:06:42"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 更新任务属性\n",
"def update_task(task_id, **kwargs):\n",
" updates = {}\n",
" if 'name' in kwargs and kwargs['name']:\n",
" updates['task_name'] = kwargs['name']\n",
" if 'type' in kwargs and kwargs['type']:\n",
" updates['task_type'] = kwargs['type']\n",
" if 'module' in kwargs and kwargs['module']:\n",
" updates['module_path'] = kwargs['module']\n",
" if 'cron' in kwargs and kwargs['cron']:\n",
" updates['cron_expression'] = kwargs['cron']\n",
" if 'timezone' in kwargs and kwargs['timezone']:\n",
" updates['time_zone'] = kwargs['timezone']\n",
"\n",
" if not updates:\n",
" display(Markdown(\"### 没有提供任何更新内容\"))\n",
" return False\n",
"\n",
" success = manager.update_task(task_id, updates)\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} 更新成功\"))\n",
" show_task_details(task_id) # 显示更新后的详情\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} 更新失败\"))\n",
" return success\n",
"\n",
"# 执行:更新任务(示例:修改任务1的Cron表达式为每天10点)\n",
"update_task(2, module = \"processors.processor_rss_data\")\n",
"\n",
"# 执行:同时更新多个属性(名称和Cron表达式)\n",
"# update_task(1, name=\"每日早间新闻采集\", cron=\"0 8 * * *\")"
]
},
{
"cell_type": "markdown",
"id": "37564011cf5aa501",
"metadata": {},
"source": [
"## 6. 启用 / 禁用任务(对应命令行 toggle"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "65388d10c5c8d407",
"metadata": {},
"outputs": [
{
"data": {
"text/markdown": [
"### 任务ID 1 启用成功"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 启用或禁用任务\n",
"def toggle_task_status(task_id, activate=True):\n",
" success = manager.toggle_task_status(task_id, activate)\n",
" action = \"启用\" if activate else \"禁用\"\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} {action}成功\"))\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} {action}失败\"))\n",
" return success\n",
"\n",
"# 执行:启用任务ID为1的任务\n",
"toggle_task_status(1, activate=True)\n",
"\n",
"# 执行:禁用任务ID为1的任务\n",
"# toggle_task_status(1, activate=False)"
]
},
{
"cell_type": "markdown",
"id": "c554c748169d5ac8",
"metadata": {},
"source": [
"## 7. 手动执行任务(对应命令行 run\n",
"\n",
"自动识别main,即main的上一级"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "94892f4134316f8e",
"metadata": {
"ExecuteTime": {
"start_time": "2025-10-29T02:30:10.298891Z"
},
"jupyter": {
"is_executing": true
}
},
"outputs": [
{
"data": {
"text/markdown": [
"### 开始执行任务ID 2"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"---"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-30 13:57:49\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:49\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理器初始化完成\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:49\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m开始处理RSS数据...\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载 6 条未处理的RSS数据\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载停用词表,共 98 个词\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载汽车后市场关键词,共 37 个\u001b[0m\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Building prefix dict from the default dictionary ...\n",
"Loading model from cache C:\\Users\\zy187\\AppData\\Local\\Temp\\jieba.cache\n",
"Loading model cost 0.839 seconds.\n",
"Prefix dict has been built successfully.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m数据处理完成,共处理 6 条记录\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:50\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m过滤出 0 条汽车后市场相关新闻\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:51\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功标记 6 条数据为已处理\u001b[0m\n",
"\u001b[32m2025-10-30 13:57:51\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理完成\u001b[0m\n"
]
},
{
"data": {
"text/markdown": [
"**任务名称**: RSS基于规则数据处理"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"**任务ID**: 2"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"**执行时长**: 4.41 秒"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"---"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/markdown": [
"### ✅ 任务执行成功"
],
"text/plain": [
"<IPython.core.display.Markdown object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"{'success': True,\n",
" 'task_name': 'RSS基于规则数据处理',\n",
" 'task_id': 2,\n",
" 'execution_time': 4.414557695388794,\n",
" 'output': '',\n",
" 'error': None}"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 手动执行任务(异步方式,快速返回)\n",
"def run_task_manually(task_id):\n",
" display(Markdown(f\"### 正在手动执行任务ID {task_id}...\"))\n",
" success = manager.run_task_manually(task_id)\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} 执行成功\"))\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} 执行失败\"))\n",
" return success\n",
"\n",
"# 手动执行任务(同步方式,显示详细执行过程)\n",
"def run_task_with_details(task_id):\n",
" display(Markdown(f\"### 开始执行任务ID {task_id}\"))\n",
" display(Markdown(\"---\"))\n",
" \n",
" result = manager.run_task_synchronously(task_id)\n",
" \n",
" if not result['success'] and result.get('error') and 'task_id' not in result:\n",
" display(Markdown(f\"### ❌ 错误: {result['error']}\"))\n",
" return result\n",
" \n",
" # 显示任务基本信息\n",
" display(Markdown(f\"**任务名称**: {result['task_name']}\"))\n",
" display(Markdown(f\"**任务ID**: {result['task_id']}\"))\n",
" display(Markdown(f\"**执行时长**: {result['execution_time']:.2f} 秒\"))\n",
" display(Markdown(\"---\"))\n",
" \n",
" # 显示执行输出\n",
" if result['output']:\n",
" display(Markdown(\"### 📋 执行输出:\"))\n",
" print(result['output'])\n",
" display(Markdown(\"---\"))\n",
" \n",
" # 显示执行结果\n",
" if result['success']:\n",
" display(Markdown(\"### ✅ 任务执行成功\"))\n",
" else:\n",
" display(Markdown(f\"### ❌ 任务执行失败\"))\n",
" if result['error']:\n",
" display(Markdown(f\"**错误信息**: {result['error']}\"))\n",
" \n",
" return result\n",
"\n",
"# 执行:手动运行任务ID为2的任务(显示详细执行过程)\n",
"run_task_with_details(2)"
]
},
{
"cell_type": "markdown",
"id": "c3492a1af7dbf2b1",
"metadata": {},
"source": [
"## 8. 删除任务(对应命令行 delete"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6936dcc673933a8d",
"metadata": {},
"outputs": [],
"source": [
"# 删除任务\n",
"def delete_task(task_id, confirm=False):\n",
" if not confirm:\n",
" display(Markdown(f\"### 警告:删除任务是不可逆操作!\"))\n",
" display(Markdown(f\"请运行 `delete_task({task_id}, confirm=True)` 确认删除\"))\n",
" return False\n",
"\n",
" success = manager.delete_task(task_id)\n",
" if success:\n",
" display(Markdown(f\"### 任务ID {task_id} 删除成功\"))\n",
" else:\n",
" display(Markdown(f\"### 任务ID {task_id} 删除失败\"))\n",
" return success\n",
"\n",
"# 执行:第一步 - 确认删除(不会实际删除)\n",
"delete_task(1)\n",
"\n",
"# 执行:第二步 - 实际删除(谨慎操作!)\n",
"# delete_task(1, confirm=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "intelligence_system",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}