Compare commits
4 Commits
e1db06dd79
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| c894e344aa | |||
| 5d1155bd20 | |||
| fc18fa74c3 | |||
| c5f6e8288d |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,11 +1,23 @@
|
||||
import os
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
MYSQL_CONFIG = {
|
||||
'host': '123.60.167.249',
|
||||
'port': 3306,
|
||||
'user': 'intelligence',
|
||||
'password': '123123',
|
||||
'database': "intelligence_system",
|
||||
'max_connections': 10
|
||||
}
|
||||
|
||||
OFFLINE_MYSQL_CONFIG = {
|
||||
'host': 'localhost',
|
||||
'port': 3306,
|
||||
'user': 'root',
|
||||
'password': '123123',
|
||||
'database':"intelligence_system",
|
||||
'database': "intelligence_system",
|
||||
'max_connections': 10
|
||||
}
|
||||
|
||||
@@ -14,4 +26,19 @@ class Config:
|
||||
'access_key': 'admin',
|
||||
'secret_key': 'abc88888888',
|
||||
'secure': False # 社区版默认不启用SSL
|
||||
}
|
||||
|
||||
# 百度AI API配置(千帆平台)
|
||||
# 优先从环境变量读取,如果没有则使用默认值(需要用户自行配置)
|
||||
BAIDU_AI_CONFIG = {
|
||||
'api_key': os.getenv('BAIDU_API_KEY', 'bce-v3/ALTAK-SFA4vEP3uBYLsyqCZcERg/1f43596d40d9a2c8318b13d5888a5e8e4e7a7f30'), # 百度千帆API Key
|
||||
'model': 'ernie-x1-turbo-32k', # 使用的模型
|
||||
}
|
||||
|
||||
# AI处理器配置
|
||||
AI_PROCESSOR_CONFIG = {
|
||||
'batch_size': 10, # 批量处理的默认大小
|
||||
'delay': 1.5, # 每条记录之间的延迟(秒),避免API限流
|
||||
'source_table': 'processed_rss_data', # 源数据表
|
||||
'result_table': 'ai_processor_rss_analysis', # AI分析结果表
|
||||
}
|
||||
Binary file not shown.
Binary file not shown.
+53441
File diff suppressed because it is too large
Load Diff
+2090
File diff suppressed because it is too large
Load Diff
@@ -11,17 +11,32 @@ log = CrossPlatformLog.get_logger("Main")
|
||||
|
||||
|
||||
class IntelligenceSystem:
|
||||
def __init__(self, db_config=None):
|
||||
"""初始化系统(仅作为容器,不包含业务逻辑)"""
|
||||
def __init__(self, db_config=None, run_all_on_startup=False):
|
||||
"""初始化系统(仅作为容器,不包含业务逻辑)
|
||||
|
||||
Args:
|
||||
db_config: 数据库配置
|
||||
run_all_on_startup: 启动时是否立即执行所有到期任务(默认False)
|
||||
"""
|
||||
self.scheduler = TaskScheduler(Config.MYSQL_CONFIG, max_workers=5)
|
||||
self._running = False
|
||||
log.info("情报系统已初始化(Cron模式)")
|
||||
self.run_all_on_startup = run_all_on_startup
|
||||
log.info(f"情报系统已初始化(Cron模式),启动时执行任务: {run_all_on_startup}")
|
||||
|
||||
def start(self):
|
||||
"""启动系统主入口"""
|
||||
self._running = True
|
||||
self._setup_signal_handlers()
|
||||
log.info("系统启动 - 运行在Cron调度模式")
|
||||
|
||||
# 启动时执行所有到期任务(如果开关开启)
|
||||
if self.run_all_on_startup:
|
||||
print(f"\n{'='*60}")
|
||||
print("🚀 启动时执行所有到期任务...")
|
||||
print(f"{'='*60}\n")
|
||||
log.info("启动时执行所有到期任务")
|
||||
result = self.scheduler.check_and_run_tasks(print_empty_status=True)
|
||||
print(f"\n启动任务执行完成: 总数={result['总任务数']}, 成功={result['成功']}, 失败={result['失败']}\n")
|
||||
|
||||
# 时间追踪变量
|
||||
last_status_print_time = time.time() # 上次打印状态的时间
|
||||
@@ -110,7 +125,9 @@ class IntelligenceSystem:
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# 启动系统 - 仅作为入口,不包含调度逻辑
|
||||
system = IntelligenceSystem()
|
||||
# run_all_on_startup=True: 启动时立即执行所有到期任务
|
||||
# run_all_on_startup=False: 启动时不执行任务,等待下次调度周期
|
||||
system = IntelligenceSystem(run_all_on_startup=False)
|
||||
system.start()
|
||||
except Exception as e:
|
||||
log.critical("情报系统启动失败", exc_info=True)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,453 @@
|
||||
# RSS数据AI处理模块
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import pandas as pd
|
||||
from typing import List, Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
from openai import OpenAI
|
||||
|
||||
# 添加项目根目录到路径
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.dirname(os.path.dirname(current_dir))
|
||||
if parent_dir not in sys.path:
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
from utils.mysql_agent import MySQLAgent
|
||||
from utils.logger import log
|
||||
from config import Config
|
||||
|
||||
|
||||
class RSSDataAIProcessor:
|
||||
"""RSS数据AI处理主类
|
||||
|
||||
负责:
|
||||
- 从数据库加载未处理的RSS数据
|
||||
- 调用AI进行分析
|
||||
- 保存分析结果
|
||||
- 更新处理状态
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""初始化AI处理器"""
|
||||
self.log = log.bind(module="RSSDataAIProcessor")
|
||||
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
|
||||
|
||||
# 从Config读取配置
|
||||
self.source_table = Config.AI_PROCESSOR_CONFIG['source_table']
|
||||
self.ai_table = Config.AI_PROCESSOR_CONFIG['result_table']
|
||||
self.default_batch_size = Config.AI_PROCESSOR_CONFIG['batch_size']
|
||||
self.default_delay = Config.AI_PROCESSOR_CONFIG['delay']
|
||||
|
||||
# 初始化百度千帆API客户端
|
||||
self.api_key = Config.BAIDU_AI_CONFIG.get('api_key')
|
||||
if self.api_key:
|
||||
self.ai_client = OpenAI(
|
||||
base_url='https://qianfan.baidubce.com/v2',
|
||||
api_key=self.api_key
|
||||
)
|
||||
self.model = Config.BAIDU_AI_CONFIG.get('model', 'ernie-x1-turbo-32k')
|
||||
self.log.info("RSS数据AI处理器初始化完成")
|
||||
else:
|
||||
self.ai_client = None
|
||||
self.log.warning("百度AI未配置,AI处理功能将不可用")
|
||||
self.log.warning("请在config.py中配置 BAIDU_AI_CONFIG['api_key']")
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
"""检查是否已配置API"""
|
||||
return self.ai_client is not None
|
||||
|
||||
def main(self, batch_size: Optional[int] = 200, delay: Optional[float] = None) -> Dict[str, Any]:
|
||||
"""主程序:批量处理RSS数据的完整流程
|
||||
|
||||
Args:
|
||||
batch_size: 批量处理的记录数,None则使用配置的默认值
|
||||
delay: 每条记录之间的延迟(秒),None则使用配置的默认值
|
||||
|
||||
Returns:
|
||||
dict: 处理结果统计信息
|
||||
"""
|
||||
# 使用传入参数或默认配置
|
||||
batch_size = batch_size or self.default_batch_size
|
||||
delay = delay or self.default_delay
|
||||
|
||||
try:
|
||||
# 1. 检查配置
|
||||
if not self.is_configured():
|
||||
error_msg = "百度AI未配置,请在config.py中配置 BAIDU_AI_CONFIG['api_key']"
|
||||
self.log.error(error_msg)
|
||||
return {
|
||||
'success': False,
|
||||
'message': error_msg,
|
||||
'processed_count': 0,
|
||||
'failed_count': 0
|
||||
}
|
||||
|
||||
self.log.info(f"开始批量处理数据,批次大小: {batch_size}, 延迟: {delay}秒")
|
||||
|
||||
# 2. 准备数据库表结构
|
||||
self.ensure_ai_processed_column()
|
||||
if not self.db_agent.table_exists(self.ai_table):
|
||||
self.create_ai_result_table()
|
||||
|
||||
# 3. 加载未处理的数据
|
||||
df = self.load_unprocessed_data(batch_size)
|
||||
if df.empty:
|
||||
self.log.info("没有需要处理的数据")
|
||||
return {
|
||||
'success': True,
|
||||
'message': '没有需要处理的数据',
|
||||
'processed_count': 0,
|
||||
'failed_count': 0
|
||||
}
|
||||
|
||||
# 4. 处理每条记录
|
||||
results = []
|
||||
processed_ids = []
|
||||
failed_count = 0
|
||||
|
||||
for idx, record in df.iterrows():
|
||||
try:
|
||||
self.log.debug(f"处理记录 {record['id']} ({idx + 1}/{len(df)})")
|
||||
|
||||
result = self.process_single_record(record.to_dict())
|
||||
|
||||
if result:
|
||||
results.append(result)
|
||||
processed_ids.append(record['id'])
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
# 延迟,避免API限流
|
||||
if delay > 0 and idx < len(df) - 1:
|
||||
time.sleep(delay)
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"处理记录 {record['id']} 异常: {str(e)}", exc_info=True)
|
||||
failed_count += 1
|
||||
|
||||
# 5. 保存结果
|
||||
saved_count = 0
|
||||
if results:
|
||||
saved_count = self.save_ai_results(results)
|
||||
|
||||
# 6. 标记为已处理
|
||||
if processed_ids:
|
||||
self.mark_as_processed(processed_ids)
|
||||
|
||||
# 7. 返回统计信息
|
||||
stats = {
|
||||
'success': True,
|
||||
'message': 'AI处理完成',
|
||||
'total_count': len(df),
|
||||
'processed_count': len(processed_ids),
|
||||
'saved_count': saved_count,
|
||||
'failed_count': failed_count,
|
||||
'relevant_count': sum(1 for r in results if r.get('是否相关')),
|
||||
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
self.log.info("批量处理完成", **stats)
|
||||
return stats
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"批量处理失败: {str(e)}"
|
||||
self.log.error(error_msg, exc_info=True)
|
||||
return {
|
||||
'success': False,
|
||||
'message': error_msg,
|
||||
'processed_count': 0,
|
||||
'failed_count': 0
|
||||
}
|
||||
|
||||
def ensure_ai_processed_column(self):
|
||||
"""确保processed_rss_data表有"是否ai处理"字段"""
|
||||
try:
|
||||
# 检查字段是否存在
|
||||
check_sql = """
|
||||
SELECT COUNT(*) as count
|
||||
FROM information_schema.COLUMNS
|
||||
WHERE TABLE_SCHEMA = %s
|
||||
AND TABLE_NAME = %s
|
||||
AND COLUMN_NAME = '是否ai处理'
|
||||
"""
|
||||
|
||||
result = self.db_agent.execute_sql(
|
||||
check_sql,
|
||||
params=(Config.MYSQL_CONFIG['database'], self.source_table),
|
||||
fetch=True
|
||||
)
|
||||
|
||||
if result[0][0] == 0:
|
||||
# 字段不存在,添加字段
|
||||
alter_sql = f"""
|
||||
ALTER TABLE {self.source_table}
|
||||
ADD COLUMN 是否ai处理 TINYINT(1) DEFAULT 0 COMMENT 'AI处理标记:0-未处理,1-已处理'
|
||||
"""
|
||||
self.db_agent.execute_sql(alter_sql)
|
||||
self.log.info(f"成功为表 {self.source_table} 添加 '是否ai处理' 字段")
|
||||
else:
|
||||
self.log.debug(f"表 {self.source_table} 已存在 '是否ai处理' 字段")
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"检查/添加字段失败: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
def create_ai_result_table(self):
|
||||
"""创建AI处理结果表"""
|
||||
create_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {self.ai_table} (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
|
||||
source_id INT NOT NULL COMMENT '来源数据ID(processed_rss_data.id)',
|
||||
文章标题 TEXT COMMENT '文章标题',
|
||||
文章摘要 TEXT COMMENT '文章摘要',
|
||||
发布时间 DATETIME COMMENT '发布时间',
|
||||
来源URL VARCHAR(1024) COMMENT '来源URL',
|
||||
文章链接 VARCHAR(1024) COMMENT '文章链接',
|
||||
是否相关 BOOLEAN COMMENT 'AI判断是否与汽车后市场相关',
|
||||
相关度评分 INT COMMENT '相关度评分(0-100)',
|
||||
标签 TEXT COMMENT 'AI生成的标签(JSON数组)',
|
||||
分类 VARCHAR(100) COMMENT 'AI判断的主要分类',
|
||||
分析说明 TEXT COMMENT 'AI分析说明',
|
||||
处理时间 DATETIME COMMENT 'AI处理时间',
|
||||
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
|
||||
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
|
||||
INDEX idx_source_id (source_id),
|
||||
INDEX idx_是否相关 (是否相关),
|
||||
INDEX idx_分类 (分类),
|
||||
INDEX idx_处理时间 (处理时间)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='RSS数据AI分析结果表'
|
||||
"""
|
||||
|
||||
try:
|
||||
self.db_agent.execute_sql(create_sql)
|
||||
self.log.info(f"成功创建AI结果表: {self.ai_table}")
|
||||
except Exception as e:
|
||||
self.log.error(f"创建AI结果表失败: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
def load_unprocessed_data(self, limit: int = 100) -> pd.DataFrame:
|
||||
"""加载未经AI处理的数据
|
||||
|
||||
Args:
|
||||
limit: 每次处理的记录数量
|
||||
|
||||
Returns:
|
||||
未处理的数据DataFrame
|
||||
"""
|
||||
try:
|
||||
sql = f"""
|
||||
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
|
||||
FROM {self.source_table}
|
||||
WHERE 是否ai处理 = 0 OR 是否ai处理 IS NULL
|
||||
ORDER BY 创建时间 DESC
|
||||
LIMIT %s
|
||||
"""
|
||||
|
||||
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
|
||||
self.log.info(f"成功加载 {len(df)} 条未处理的数据")
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"加载未处理数据失败: {str(e)}", exc_info=True)
|
||||
return pd.DataFrame()
|
||||
|
||||
def analyze_news(self, title: str, summary: str) -> Dict[str, Any]:
|
||||
"""调用AI分析新闻(保留原有提示词)"""
|
||||
# 构建提示词(保留原有格式)
|
||||
prompt = f"""分析以下新闻是否与汽车后市场相关,返回JSON格式:
|
||||
|
||||
标题:{title}
|
||||
摘要:{summary}
|
||||
|
||||
返回格式:
|
||||
{{
|
||||
"is_relevant": true/false,
|
||||
"relevance_score": 0-100,
|
||||
"tags": ["标签1", "标签2"],
|
||||
"category": "分类(配件/维修/保养/改装/美容/装饰/二手车/金融/保险/其他)",
|
||||
"analysis": "简要说明"
|
||||
}}
|
||||
|
||||
注意:只返回JSON格式的结果,不要包含其他说明文字。"""
|
||||
|
||||
try:
|
||||
# 调用百度千帆API
|
||||
response = self.ai_client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": prompt
|
||||
}]
|
||||
)
|
||||
|
||||
# 获取响应内容
|
||||
raw_content = response.choices[0].message.content
|
||||
|
||||
# 解析JSON(处理markdown包裹)
|
||||
if '```json' in raw_content:
|
||||
json_str = raw_content.split('```json')[1].split('```')[0].strip()
|
||||
elif '```' in raw_content:
|
||||
json_str = raw_content.split('```')[1].split('```')[0].strip()
|
||||
else:
|
||||
json_str = raw_content.strip()
|
||||
|
||||
result = json.loads(json_str)
|
||||
|
||||
# 补充缺失字段
|
||||
return {
|
||||
'is_relevant': result.get('is_relevant', False),
|
||||
'relevance_score': result.get('relevance_score', 0),
|
||||
'tags': result.get('tags', []),
|
||||
'category': result.get('category', '其他'),
|
||||
'analysis': result.get('analysis', '')
|
||||
}
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
self.log.warning(f"JSON解析失败: {str(e)}, 原始响应: {raw_content[:200]}")
|
||||
return {
|
||||
'is_relevant': False,
|
||||
'relevance_score': 0,
|
||||
'tags': [],
|
||||
'category': '其他',
|
||||
'analysis': f"解析失败: {raw_content[:100]}"
|
||||
}
|
||||
except Exception as e:
|
||||
self.log.error(f"AI调用异常: {str(e)}", exc_info=True)
|
||||
return {
|
||||
'is_relevant': False,
|
||||
'relevance_score': 0,
|
||||
'tags': [],
|
||||
'category': '其他',
|
||||
'analysis': f"处理异常: {str(e)}"
|
||||
}
|
||||
|
||||
def process_single_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""处理单条记录
|
||||
|
||||
Args:
|
||||
record: 记录字典
|
||||
|
||||
Returns:
|
||||
处理结果字典
|
||||
"""
|
||||
if not self.is_configured():
|
||||
self.log.error("AI客户端未配置,无法处理数据")
|
||||
return None
|
||||
|
||||
try:
|
||||
title = str(record.get('文章标题', '')).strip()
|
||||
summary = str(record.get('文章摘要', '')).strip()
|
||||
|
||||
if not title and not summary:
|
||||
self.log.warning(f"记录 {record.get('id')} 标题和摘要均为空,跳过处理")
|
||||
return None
|
||||
|
||||
# 调用AI分析
|
||||
analysis_result = self.analyze_news(title, summary)
|
||||
|
||||
# 构建结果记录
|
||||
result = {
|
||||
'source_id': record['id'],
|
||||
'文章标题': title,
|
||||
'文章摘要': summary,
|
||||
'发布时间': record.get('发布时间'),
|
||||
'来源URL': record.get('来源URL'),
|
||||
'文章链接': record.get('文章链接'),
|
||||
'是否相关': analysis_result.get('is_relevant', False),
|
||||
'相关度评分': analysis_result.get('relevance_score', 0),
|
||||
'标签': json.dumps(analysis_result.get('tags', []), ensure_ascii=False),
|
||||
'分类': analysis_result.get('category', '其他'),
|
||||
'分析说明': analysis_result.get('analysis', ''),
|
||||
'处理时间': datetime.now()
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"处理记录 {record.get('id')} 失败: {str(e)}", exc_info=True)
|
||||
return None
|
||||
|
||||
def save_ai_results(self, results: List[Dict[str, Any]]) -> int:
|
||||
"""保存AI处理结果
|
||||
|
||||
Args:
|
||||
results: 处理结果列表
|
||||
|
||||
Returns:
|
||||
成功保存的记录数
|
||||
"""
|
||||
if not results:
|
||||
return 0
|
||||
|
||||
try:
|
||||
df = pd.DataFrame(results)
|
||||
inserted = self.db_agent.insert_from_df(
|
||||
table_name=self.ai_table,
|
||||
df=df,
|
||||
ignore_duplicates=True
|
||||
)
|
||||
self.log.info(f"成功保存 {inserted} 条AI处理结果")
|
||||
return inserted
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"保存AI处理结果失败: {str(e)}", exc_info=True)
|
||||
return 0
|
||||
|
||||
def mark_as_processed(self, ids: List[int]) -> bool:
|
||||
"""标记记录为已处理
|
||||
|
||||
Args:
|
||||
ids: 记录ID列表
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
if not ids:
|
||||
return True
|
||||
|
||||
try:
|
||||
id_placeholders = ','.join(['%s'] * len(ids))
|
||||
sql = f"""
|
||||
UPDATE {self.source_table}
|
||||
SET 是否ai处理 = 1
|
||||
WHERE id IN ({id_placeholders})
|
||||
"""
|
||||
|
||||
self.db_agent.execute_sql(sql, params=ids)
|
||||
self.log.info(f"成功标记 {len(ids)} 条记录为已处理")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"标记记录为已处理失败: {str(e)}", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
"""命令行直接运行"""
|
||||
# 实例化处理器并调用main方法
|
||||
processor = RSSDataAIProcessor()
|
||||
result = processor.main()
|
||||
|
||||
# 输出结果
|
||||
if result['success']:
|
||||
print("\n" + "=" * 60)
|
||||
print("✓ AI处理完成")
|
||||
print("=" * 60)
|
||||
print(f"总记录数: {result.get('total_count', 0)}")
|
||||
print(f"成功处理: {result.get('processed_count', 0)}")
|
||||
print(f"保存记录: {result.get('saved_count', 0)}")
|
||||
print(f"失败记录: {result.get('failed_count', 0)}")
|
||||
print(f"相关记录: {result.get('relevant_count', 0)}")
|
||||
print(f"处理时间: {result.get('processing_time', '')}")
|
||||
print("=" * 60 + "\n")
|
||||
else:
|
||||
print("\n" + "=" * 60)
|
||||
print("✗ 处理失败")
|
||||
print("=" * 60)
|
||||
print(f"错误信息: {result['message']}")
|
||||
print("\n提示: 请设置环境变量")
|
||||
print(" Windows: $env:BAIDU_API_KEY = 'your_key'")
|
||||
print(" Linux/Mac: export BAIDU_API_KEY='your_key'")
|
||||
print("=" * 60 + "\n")
|
||||
@@ -0,0 +1,37 @@
|
||||
汽车配件
|
||||
汽车维修
|
||||
汽车保养
|
||||
汽车改装
|
||||
汽车美容
|
||||
汽车装饰
|
||||
轮胎
|
||||
机油
|
||||
刹车片
|
||||
火花塞
|
||||
滤清器
|
||||
蓄电池
|
||||
车灯
|
||||
保险杠
|
||||
车门
|
||||
座椅
|
||||
方向盘
|
||||
仪表盘
|
||||
音响
|
||||
导航
|
||||
汽车用品
|
||||
车载设备
|
||||
汽车电子
|
||||
汽车安全
|
||||
汽车保险
|
||||
二手车
|
||||
汽车交易
|
||||
汽车金融
|
||||
汽车租赁
|
||||
汽车服务
|
||||
4S店
|
||||
汽修店
|
||||
汽车后市场
|
||||
汽车产业链
|
||||
汽车供应链
|
||||
汽车
|
||||
车
|
||||
@@ -0,0 +1,409 @@
|
||||
# RSS数据处理模块 - 汽车后市场新闻分词和过滤
|
||||
import pandas as pd
|
||||
import jieba
|
||||
import jieba.posseg as pseg
|
||||
import os
|
||||
import sys
|
||||
from typing import List, Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
# 添加项目根目录到路径
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.dirname(current_dir)
|
||||
if parent_dir not in sys.path:
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
from utils.mysql_agent import MySQLAgent
|
||||
from utils.logger import log
|
||||
from config import Config
|
||||
|
||||
class RSSDataProcessor:
|
||||
"""RSS数据处理器 - 专门处理汽车后市场相关新闻"""
|
||||
|
||||
def __init__(self):
|
||||
"""初始化处理器"""
|
||||
self.log = log.bind(module="RSSDataProcessor")
|
||||
self.db_agent = MySQLAgent(Config.MYSQL_CONFIG)
|
||||
self.table_name = "collector_rss_subscriptions"
|
||||
self.processed_table_name = "processed_rss_data"
|
||||
|
||||
# 获取项目根目录
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
self.project_root = os.path.dirname(current_dir)
|
||||
|
||||
# 设置文件路径(相对于项目根目录)
|
||||
self.keywords_file = os.path.join(self.project_root, "processors", "keywords.txt")
|
||||
self.stopwords_file = os.path.join(self.project_root, "processors", "stopwords.txt")
|
||||
|
||||
# 汽车后市场相关关键词(默认值,实际从文件加载)
|
||||
self.auto_aftermarket_keywords = {
|
||||
'汽车配件', '汽车维修', '汽车保养', '汽车改装', '汽车美容', '汽车装饰',
|
||||
'轮胎', '机油', '刹车片', '火花塞', '滤清器', '蓄电池', '车灯',
|
||||
'保险杠', '车门', '座椅', '方向盘', '仪表盘', '音响', '导航',
|
||||
'汽车用品', '车载设备', '汽车电子', '汽车安全', '汽车保险',
|
||||
'二手车', '汽车交易', '汽车金融', '汽车租赁', '汽车服务',
|
||||
'4S店', '汽修店', '汽车后市场', '汽车产业链', '汽车供应链', '汽车', '车'
|
||||
}
|
||||
|
||||
# 停用词表(默认值,实际从文件加载)
|
||||
self.stopwords = {
|
||||
'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个',
|
||||
'上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好',
|
||||
'自己', '这', '那', '它', '他', '她', '我们', '你们', '他们', '什么', '怎么',
|
||||
'为什么', '因为', '所以', '但是', '然后', '如果', '虽然', '而且', '或者',
|
||||
'可以', '应该', '必须', '需要', '想要', '希望', '觉得', '认为', '知道',
|
||||
'了解', '明白', '清楚', '简单', '容易', '困难', '重要', '主要', '基本',
|
||||
'一般', '特别', '非常', '十分', '相当', '比较', '更加', '最', '更',
|
||||
'已经', '正在', '将要', '可能', '也许', '大概', '大约', '左右', '上下',
|
||||
'今天', '明天', '昨天', '现在', '以前', '以后', '时候', '时间', '地方',
|
||||
'这里', '那里', '这样', '那样', '如此', '这样', '那样', '如何', '怎样'
|
||||
}
|
||||
|
||||
# 缓存关键词,避免重复加载
|
||||
self._cached_keywords = None
|
||||
|
||||
self.log.info("RSS数据处理器初始化完成")
|
||||
|
||||
def load_keywords(self, keywords_file: Optional[str] = None) -> set:
|
||||
"""从文件加载汽车后市场关键词(带缓存)"""
|
||||
# 如果已经缓存,直接返回
|
||||
if self._cached_keywords is not None:
|
||||
return self._cached_keywords
|
||||
|
||||
# 使用默认路径(项目根目录下的文件)
|
||||
if keywords_file is None:
|
||||
keywords_file = self.keywords_file
|
||||
|
||||
keywords = set()
|
||||
try:
|
||||
if os.path.exists(keywords_file):
|
||||
with open(keywords_file, 'r', encoding='utf-8') as f:
|
||||
keywords = set(line.strip() for line in f if line.strip())
|
||||
self.log.info(f"成功加载汽车后市场关键词,共 {len(keywords)} 个")
|
||||
else:
|
||||
self.log.warning(f"关键词文件不存在: {keywords_file}")
|
||||
# 使用默认关键词
|
||||
keywords = self.auto_aftermarket_keywords
|
||||
except Exception as e:
|
||||
self.log.error(f"加载关键词失败: {str(e)}")
|
||||
keywords = self.auto_aftermarket_keywords
|
||||
|
||||
# 缓存关键词
|
||||
self._cached_keywords = keywords
|
||||
return keywords
|
||||
|
||||
def load_rss_data(self, limit: int = 1000) -> pd.DataFrame:
|
||||
"""从数据库加载未处理的RSS数据"""
|
||||
try:
|
||||
sql = f"""
|
||||
SELECT id, 文章标题, 文章摘要, 发布时间, 来源URL, 文章链接
|
||||
FROM {self.table_name}
|
||||
WHERE 是否已处理 = 0
|
||||
ORDER BY 发布时间 DESC
|
||||
LIMIT %s
|
||||
"""
|
||||
|
||||
df = self.db_agent.query_to_df(sql, params=(limit,), is_print=False)
|
||||
self.log.info(f"成功加载 {len(df)} 条未处理的RSS数据")
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"加载RSS数据失败: {str(e)}", exc_info=True)
|
||||
return pd.DataFrame()
|
||||
|
||||
def mark_as_processed(self, ids: List[int]) -> bool:
|
||||
"""标记指定ID的数据为已处理"""
|
||||
if not ids:
|
||||
return True
|
||||
|
||||
try:
|
||||
# 将ID列表转换为字符串格式用于SQL IN语句
|
||||
id_placeholders = ','.join(['%s'] * len(ids))
|
||||
sql = f"""
|
||||
UPDATE {self.table_name}
|
||||
SET 是否已处理 = 1
|
||||
WHERE id IN ({id_placeholders})
|
||||
"""
|
||||
|
||||
result = self.db_agent.execute_sql(sql, params=ids)
|
||||
self.log.info(f"成功标记 {len(ids)} 条数据为已处理")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"标记数据为已处理失败: {str(e)}", exc_info=True)
|
||||
return False
|
||||
|
||||
def load_stopwords(self, stopwords_file: Optional[str] = None) -> set:
|
||||
"""加载停用词表"""
|
||||
# 使用默认路径(项目根目录下的文件)
|
||||
if stopwords_file is None:
|
||||
stopwords_file = self.stopwords_file
|
||||
|
||||
try:
|
||||
if os.path.exists(stopwords_file):
|
||||
with open(stopwords_file, 'r', encoding='utf-8') as f:
|
||||
stopwords = set(line.strip() for line in f if line.strip())
|
||||
self.log.info(f"成功加载停用词表,共 {len(stopwords)} 个词")
|
||||
return stopwords
|
||||
else:
|
||||
self.log.warning(f"停用词文件不存在: {stopwords_file},使用默认停用词")
|
||||
return self.stopwords
|
||||
except Exception as e:
|
||||
self.log.error(f"加载停用词表失败: {str(e)}")
|
||||
return self.stopwords
|
||||
|
||||
def add_custom_dict(self, custom_dict_file: Optional[str] = None):
|
||||
"""添加自定义词典"""
|
||||
if custom_dict_file and os.path.exists(custom_dict_file):
|
||||
try:
|
||||
jieba.load_userdict(custom_dict_file)
|
||||
self.log.info("成功加载自定义词典")
|
||||
except Exception as e:
|
||||
self.log.warning(f"加载自定义词典失败: {str(e)}")
|
||||
|
||||
# 从文件加载汽车后市场关键词并添加到jieba词典
|
||||
keywords = self.load_keywords()
|
||||
for keyword in keywords:
|
||||
jieba.add_word(keyword, freq=1000, tag='n')
|
||||
|
||||
def segment_and_pos(self, text: str, stopwords: set) -> List[str]:
|
||||
"""分词并标注词性,过滤停用词"""
|
||||
if not text or pd.isna(text):
|
||||
return []
|
||||
|
||||
words = pseg.cut(str(text))
|
||||
result = []
|
||||
# 汽车后市场相关的词性标签
|
||||
allowed_flags = {'n', 'vn', 'np', 'ns', 'nr', 'nt'} # 名词、动词、动名词、名词短语、处所词、人名、机构名
|
||||
|
||||
for word, flag in words:
|
||||
word = word.strip()
|
||||
if (len(word) >= 1 and
|
||||
word not in stopwords and
|
||||
flag in allowed_flags and
|
||||
not word.isdigit()): # 过滤纯数字
|
||||
result.append(word)
|
||||
|
||||
return result
|
||||
|
||||
def is_auto_aftermarket_related(self, text: str) -> bool:
|
||||
"""判断文本是否与汽车后市场相关"""
|
||||
if not text:
|
||||
return False
|
||||
|
||||
text_lower = str(text).lower()
|
||||
|
||||
# 从文件加载关键词
|
||||
keywords = self.load_keywords()
|
||||
|
||||
# 检查是否包含汽车后市场关键词
|
||||
for keyword in keywords:
|
||||
if keyword in text_lower:
|
||||
return True
|
||||
|
||||
# 检查分词结果中是否包含相关词汇
|
||||
words = self.segment_and_pos(text, self.stopwords)
|
||||
for word in words:
|
||||
if word in keywords:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def process_dataframe(self, df: pd.DataFrame, stopwords: set) -> pd.DataFrame:
|
||||
"""处理整个DataFrame,进行分词和过滤"""
|
||||
if df.empty:
|
||||
self.log.warning("输入的DataFrame为空")
|
||||
return df
|
||||
|
||||
# 确保所有文本都是字符串,并处理NaN值
|
||||
df['文章标题'] = df['文章标题'].fillna('').astype(str)
|
||||
df['文章摘要'] = df['文章摘要'].fillna('').astype(str)
|
||||
|
||||
# 合并标题和摘要进行分词
|
||||
df['combined_text'] = df['文章标题'] + ' ' + df['文章摘要']
|
||||
|
||||
# 分词处理
|
||||
df['segmented_words'] = df['combined_text'].apply(lambda x: self.segment_and_pos(x, stopwords))
|
||||
|
||||
# 判断是否与汽车后市场相关(只要出现关键词就入库)
|
||||
df['is_auto_related'] = df['combined_text'].apply(self.is_auto_aftermarket_related)
|
||||
df['is_filtered'] = df['is_auto_related']
|
||||
|
||||
# 添加处理时间
|
||||
df['processed_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self.log.info(f"数据处理完成,共处理 {len(df)} 条记录")
|
||||
return df
|
||||
|
||||
def filter_auto_aftermarket_news(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""过滤出汽车后市场相关的新闻"""
|
||||
if df.empty:
|
||||
return df
|
||||
|
||||
# 过滤出包含关键词的文章
|
||||
filtered_df = df[df['is_filtered'] == True].copy()
|
||||
|
||||
self.log.info(f"过滤出 {len(filtered_df)} 条汽车后市场相关新闻")
|
||||
return filtered_df
|
||||
|
||||
def save_to_database(self, df: pd.DataFrame) -> bool:
|
||||
"""保存处理结果到数据库"""
|
||||
if df.empty:
|
||||
self.log.warning("没有数据需要保存")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 准备保存的数据
|
||||
save_df = df[['文章标题', '文章摘要', '发布时间', '来源URL', '文章链接',
|
||||
'segmented_words', 'is_auto_related', 'processed_time']].copy()
|
||||
|
||||
# 将分词结果转换为字符串
|
||||
save_df['分词结果'] = save_df['segmented_words'].apply(lambda x: ' '.join(x))
|
||||
|
||||
# 重命名列名为中文
|
||||
save_df = save_df.rename(columns={
|
||||
'is_auto_related': '是否汽车相关',
|
||||
'processed_time': '处理时间'
|
||||
})
|
||||
|
||||
# 删除不需要的列
|
||||
save_df = save_df.drop('segmented_words', axis=1)
|
||||
|
||||
# 检查目标表是否存在,不存在则创建
|
||||
if not self.db_agent.table_exists(self.processed_table_name):
|
||||
self.create_processed_table()
|
||||
|
||||
# 插入数据
|
||||
inserted_rows = self.db_agent.insert_from_df(
|
||||
table_name=self.processed_table_name,
|
||||
df=save_df,
|
||||
ignore_duplicates=True
|
||||
)
|
||||
|
||||
self.log.info(f"成功保存 {inserted_rows} 条处理结果到数据库")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"保存到数据库失败: {str(e)}", exc_info=True)
|
||||
return False
|
||||
|
||||
def create_processed_table(self):
|
||||
"""创建处理结果表"""
|
||||
create_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {self.processed_table_name} (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
文章标题 TEXT,
|
||||
文章摘要 TEXT,
|
||||
发布时间 DATETIME,
|
||||
来源URL VARCHAR(1024),
|
||||
文章链接 VARCHAR(1024),
|
||||
分词结果 TEXT,
|
||||
是否汽车相关 BOOLEAN,
|
||||
处理时间 DATETIME,
|
||||
创建时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
更新时间 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
|
||||
"""
|
||||
|
||||
try:
|
||||
self.db_agent.execute_sql(create_sql)
|
||||
self.log.info(f"成功创建处理结果表: {self.processed_table_name}")
|
||||
except Exception as e:
|
||||
self.log.error(f"创建表失败: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
def get_processing_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
|
||||
"""获取处理统计信息"""
|
||||
if df.empty:
|
||||
return {}
|
||||
|
||||
total_count = len(df)
|
||||
filtered_count = len(df[df['is_filtered'] == True])
|
||||
|
||||
stats = {
|
||||
'total_articles': total_count,
|
||||
'filtered_articles': filtered_count,
|
||||
'filter_rate': filtered_count / total_count if total_count > 0 else 0,
|
||||
'processing_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
def process_rss_data(self, limit: int = 1000, save_to_db: bool = True) -> Dict[str, Any]:
|
||||
"""处理RSS数据的主函数"""
|
||||
try:
|
||||
self.log.info("开始处理RSS数据...")
|
||||
|
||||
# 1. 加载RSS数据
|
||||
df = self.load_rss_data(limit)
|
||||
if df.empty:
|
||||
self.log.warning("没有加载到RSS数据")
|
||||
return {'success': False, 'message': '没有数据可处理'}
|
||||
|
||||
# 2. 加载停用词表
|
||||
stopwords = self.load_stopwords()
|
||||
|
||||
# 3. 添加自定义词典
|
||||
self.add_custom_dict()
|
||||
|
||||
# 4. 处理数据
|
||||
processed_df = self.process_dataframe(df, stopwords)
|
||||
|
||||
# 5. 过滤汽车后市场相关新闻
|
||||
filtered_df = self.filter_auto_aftermarket_news(processed_df)
|
||||
|
||||
# 6. 获取统计信息
|
||||
stats = self.get_processing_statistics(processed_df)
|
||||
|
||||
# 7. 保存到数据库
|
||||
if save_to_db and not filtered_df.empty:
|
||||
save_success = self.save_to_database(filtered_df)
|
||||
stats['save_success'] = save_success
|
||||
|
||||
# 8. 标记数据为已处理
|
||||
if not df.empty and 'id' in df.columns:
|
||||
processed_ids = df['id'].tolist()
|
||||
mark_success = self.mark_as_processed(processed_ids)
|
||||
stats['mark_success'] = mark_success
|
||||
if not mark_success:
|
||||
self.log.warning("部分数据标记为已处理失败")
|
||||
|
||||
# 9. 输出结果
|
||||
self.log.info("RSS数据处理完成", **stats)
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'RSS数据处理完成',
|
||||
'statistics': stats,
|
||||
'filtered_data': filtered_df
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
self.log.error(f"RSS数据处理失败: {str(e)}", exc_info=True)
|
||||
return {'success': False, 'message': f'处理失败: {str(e)}'}
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数入口"""
|
||||
try:
|
||||
# 创建处理器实例
|
||||
processor = RSSDataProcessor()
|
||||
|
||||
# 处理RSS数据
|
||||
result = processor.process_rss_data(
|
||||
limit=5000, # 处理最近5000条数据
|
||||
save_to_db=True # 保存到数据库
|
||||
)
|
||||
|
||||
if result['success']:
|
||||
print("RSS数据处理完成!")
|
||||
print(f"处理统计: {result['statistics']}")
|
||||
else:
|
||||
print(f"处理失败: {result['message']}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"程序运行出错: {str(e)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,100 @@
|
||||
的
|
||||
了
|
||||
在
|
||||
是
|
||||
我
|
||||
有
|
||||
和
|
||||
就
|
||||
不
|
||||
人
|
||||
都
|
||||
一
|
||||
一个
|
||||
上
|
||||
也
|
||||
很
|
||||
到
|
||||
说
|
||||
要
|
||||
去
|
||||
你
|
||||
会
|
||||
着
|
||||
没有
|
||||
看
|
||||
好
|
||||
自己
|
||||
这
|
||||
那
|
||||
它
|
||||
他
|
||||
她
|
||||
我们
|
||||
你们
|
||||
他们
|
||||
什么
|
||||
怎么
|
||||
为什么
|
||||
因为
|
||||
所以
|
||||
但是
|
||||
然后
|
||||
如果
|
||||
虽然
|
||||
而且
|
||||
或者
|
||||
可以
|
||||
应该
|
||||
必须
|
||||
需要
|
||||
想要
|
||||
希望
|
||||
觉得
|
||||
认为
|
||||
知道
|
||||
了解
|
||||
明白
|
||||
清楚
|
||||
简单
|
||||
容易
|
||||
困难
|
||||
重要
|
||||
主要
|
||||
基本
|
||||
一般
|
||||
特别
|
||||
非常
|
||||
十分
|
||||
相当
|
||||
比较
|
||||
更加
|
||||
最
|
||||
更
|
||||
已经
|
||||
正在
|
||||
将要
|
||||
可能
|
||||
也许
|
||||
大概
|
||||
大约
|
||||
左右
|
||||
上下
|
||||
今天
|
||||
明天
|
||||
昨天
|
||||
现在
|
||||
以前
|
||||
以后
|
||||
时候
|
||||
时间
|
||||
地方
|
||||
这里
|
||||
那里
|
||||
这样
|
||||
那样
|
||||
如此
|
||||
这样
|
||||
那样
|
||||
如何
|
||||
怎样
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -246,50 +246,90 @@ class TaskScheduler:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _execute_task_logic(self, task: Dict[str, Any]) -> None:
|
||||
"""执行任务的具体逻辑(动态导入模块)"""
|
||||
start_time = time.time()
|
||||
task_id = task['task_id']
|
||||
module_path = task['module_path']
|
||||
task_log = log.bind(task_id=task_id, module=module_path)
|
||||
def _execute_task_logic(self, task):
|
||||
"""
|
||||
执行任务逻辑的核心方法
|
||||
支持类方法、静态方法和实例方法的调用
|
||||
"""
|
||||
module_path = task.get('module_path')
|
||||
if not module_path:
|
||||
raise ValueError("任务缺少 module_path 配置")
|
||||
|
||||
# 解析模块路径和类名
|
||||
try:
|
||||
path_parts = module_path.split('.')
|
||||
if len(path_parts) < 2:
|
||||
raise ValueError(f"无效的模块路径: {module_path}")
|
||||
|
||||
module_name = '.'.join(path_parts[:-1])
|
||||
class_name = path_parts[-1]
|
||||
method_name = 'main' # 默认方法名
|
||||
except Exception as e:
|
||||
raise ValueError(f"解析模块路径失败: {str(e)}")
|
||||
|
||||
# 动态导入模块
|
||||
try:
|
||||
import importlib
|
||||
module = importlib.import_module(module_name)
|
||||
except ImportError as e:
|
||||
raise ImportError(f"无法导入模块 {module_name}: {str(e)}")
|
||||
|
||||
# 获取类和方法
|
||||
if not hasattr(module, class_name):
|
||||
raise AttributeError(f"模块 {module_name} 中未找到类 {class_name}")
|
||||
|
||||
cls = getattr(module, class_name)
|
||||
|
||||
# 检查是否存在指定方法
|
||||
if not hasattr(cls, method_name):
|
||||
raise AttributeError(f"类 {class_name} 中未找到方法 {method_name}")
|
||||
|
||||
method = getattr(cls, method_name)
|
||||
|
||||
# 根据方法类型决定如何调用
|
||||
import inspect
|
||||
callable_entry = None
|
||||
|
||||
# 判断是否为静态方法或类方法
|
||||
if isinstance(method, staticmethod):
|
||||
# 静态方法可以直接调用
|
||||
callable_entry = method
|
||||
elif isinstance(method, classmethod):
|
||||
# 类方法需要传入类作为第一个参数
|
||||
callable_entry = method
|
||||
else:
|
||||
# 实例方法或普通函数
|
||||
try:
|
||||
# 尝试检查方法签名
|
||||
sig = inspect.signature(method)
|
||||
params = list(sig.parameters.values())
|
||||
|
||||
# 如果第一个参数是self且没有默认值,则认为是实例方法
|
||||
if params and params[0].name == 'self' and params[0].default == inspect.Parameter.empty:
|
||||
# 创建实例并获取绑定方法
|
||||
instance = cls()
|
||||
callable_entry = getattr(instance, method_name)
|
||||
else:
|
||||
# 可能是普通函数或者是带有默认self参数的方法
|
||||
callable_entry = method
|
||||
except Exception:
|
||||
# 如果检查签名失败,默认尝试创建实例
|
||||
try:
|
||||
instance = cls()
|
||||
callable_entry = getattr(instance, method_name)
|
||||
except Exception:
|
||||
# 如果创建实例也失败,则直接调用方法(适用于不需要self的特殊情况)
|
||||
callable_entry = method
|
||||
|
||||
# 执行任务
|
||||
if not callable(callable_entry):
|
||||
raise TypeError(f"{module_path}.{method_name} 不是可调用对象")
|
||||
|
||||
try:
|
||||
# 解析可调用入口(支持模块/类/函数路径)
|
||||
# 若路径最终为类,先实例化再调 main;否则直接调用
|
||||
target_obj = None
|
||||
parts = module_path.split('.') if isinstance(module_path, str) else []
|
||||
resolved = None
|
||||
try:
|
||||
# 尝试导入尽可能深的模块
|
||||
for i in range(len(parts), 0, -1):
|
||||
mod = importlib.import_module('.'.join(parts[:i]))
|
||||
attr_chain = parts[i:]
|
||||
obj = mod
|
||||
for attr in attr_chain:
|
||||
obj = getattr(obj, attr)
|
||||
resolved = obj
|
||||
break
|
||||
except Exception:
|
||||
resolved = None
|
||||
|
||||
if isinstance(resolved, type):
|
||||
try:
|
||||
target_obj = resolved() # 触发 __init__ 日志
|
||||
if hasattr(target_obj, 'main') and callable(getattr(target_obj, 'main')):
|
||||
task_log.debug("开始执行实例的 main()")
|
||||
getattr(target_obj, 'main')()
|
||||
else:
|
||||
raise AttributeError(f"类 {resolved.__name__} 未提供可调用的 main()")
|
||||
except Exception as e:
|
||||
raise
|
||||
else:
|
||||
callable_entry = self._resolve_callable(module_path)
|
||||
task_log.debug("开始执行任务入口函数")
|
||||
callable_entry()
|
||||
task_log.info(f"任务执行完成,耗时: {time.time() - start_time:.2f}秒")
|
||||
|
||||
# 执行任务逻辑
|
||||
callable_entry()
|
||||
except Exception as e:
|
||||
task_log.error("任务逻辑执行失败", exc_info=True)
|
||||
self.logger.error(f"任务逻辑执行失败: {str(e)}")
|
||||
raise
|
||||
|
||||
def _calculate_next_run_time(self, cron_expr: str, time_zone: str = 'Asia/Shanghai') -> datetime:
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
print("Hello, World!")
|
||||
Binary file not shown.
+223
-343
@@ -10,25 +10,14 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"id": "initial_id",
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"ExecuteTime": {
|
||||
"end_time": "2025-10-17T05:43:18.381936Z",
|
||||
"start_time": "2025-10-17T05:43:15.265036Z"
|
||||
},
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"PROJECT_ROOT = d:\\Idea Project\\intelligence_system\n",
|
||||
"\u001b[32m2025-10-23 16:56:55\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m任务调度器已初始化,最大工作线程数: 5\u001b[0m\n"
|
||||
]
|
||||
"end_time": "2025-10-29T02:25:08.582541Z",
|
||||
"start_time": "2025-10-29T02:25:08.473381Z"
|
||||
}
|
||||
],
|
||||
},
|
||||
"source": [
|
||||
"# 使 Notebook 可从项目根导入\n",
|
||||
"import sys\n",
|
||||
@@ -217,7 +206,18 @@
|
||||
" except Exception:\n",
|
||||
" pass\n",
|
||||
" return str(dt)"
|
||||
]
|
||||
],
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"PROJECT_ROOT = D:\\Idea Project\\intelligence_system\n",
|
||||
"\u001B[32m2025-10-29 10:25:08\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mtask_scheduler\u001B[0m - \u001B[1m任务调度器已初始化,最大工作线程数: 5\u001B[0m\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"execution_count": 8
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
@@ -242,7 +242,7 @@
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001b[32m2025-10-17 13:43:18\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
|
||||
"\u001B[32m2025-10-29 09:54:09\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mmysql_agent\u001B[0m - \u001B[1m查询执行成功\u001B[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -281,20 +281,36 @@
|
||||
" </thead>\n",
|
||||
" <tbody>\n",
|
||||
" <tr>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>RSS基于规则数据处理</td>\n",
|
||||
" <td>processor</td>\n",
|
||||
" <td>processors.processor_rss_data</td>\n",
|
||||
" <td>0 8,20 * * *</td>\n",
|
||||
" <td>Asia/Shanghai</td>\n",
|
||||
" <td>2025-10-28 20:00:00</td>\n",
|
||||
" <td>2025-10-28 13:34:49</td>\n",
|
||||
" <td>success</td>\n",
|
||||
" <td>10</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>0</td>\n",
|
||||
" <td>2025-10-22 16:06:42</td>\n",
|
||||
" <td>2025-10-28 13:34:50</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>RSS新闻订阅</td>\n",
|
||||
" <td>collector</td>\n",
|
||||
" <td>collectors.rss_subscriptions.NewsAPIClient</td>\n",
|
||||
" <td>5 0 * * *</td>\n",
|
||||
" <td>*/5 * * * *</td>\n",
|
||||
" <td>Asia/Shanghai</td>\n",
|
||||
" <td>2025-10-18 00:05:00</td>\n",
|
||||
" <td>2025-10-17 00:05:07</td>\n",
|
||||
" <td>2025-10-28 13:40:00</td>\n",
|
||||
" <td>2025-10-28 13:35:09</td>\n",
|
||||
" <td>success</td>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>495</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>0</td>\n",
|
||||
" <td>2025-10-16 15:47:34</td>\n",
|
||||
" <td>2025-10-17 00:05:08</td>\n",
|
||||
" <td>2025-10-28 13:35:09</td>\n",
|
||||
" </tr>\n",
|
||||
" </tbody>\n",
|
||||
"</table>"
|
||||
@@ -346,34 +362,54 @@
|
||||
" <tbody>\n",
|
||||
" <tr>\n",
|
||||
" <th>0</th>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>RSS基于规则数据处理</td>\n",
|
||||
" <td>processor</td>\n",
|
||||
" <td>processors.processor_rss_data</td>\n",
|
||||
" <td>0 8,20 * * *</td>\n",
|
||||
" <td>Asia/Shanghai</td>\n",
|
||||
" <td>2025-10-28 20:00:00</td>\n",
|
||||
" <td>2025-10-28 13:34:49</td>\n",
|
||||
" <td>success</td>\n",
|
||||
" <td>10</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>0</td>\n",
|
||||
" <td>2025-10-22 16:06:42</td>\n",
|
||||
" <td>2025-10-28 13:34:50</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>1</th>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>RSS新闻订阅</td>\n",
|
||||
" <td>collector</td>\n",
|
||||
" <td>collectors.rss_subscriptions.NewsAPIClient</td>\n",
|
||||
" <td>5 0 * * *</td>\n",
|
||||
" <td>*/5 * * * *</td>\n",
|
||||
" <td>Asia/Shanghai</td>\n",
|
||||
" <td>2025-10-18 00:05:00</td>\n",
|
||||
" <td>2025-10-17 00:05:07</td>\n",
|
||||
" <td>2025-10-28 13:40:00</td>\n",
|
||||
" <td>2025-10-28 13:35:09</td>\n",
|
||||
" <td>success</td>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>495</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>0</td>\n",
|
||||
" <td>2025-10-16 15:47:34</td>\n",
|
||||
" <td>2025-10-17 00:05:08</td>\n",
|
||||
" <td>2025-10-28 13:35:09</td>\n",
|
||||
" </tr>\n",
|
||||
" </tbody>\n",
|
||||
"</table>\n",
|
||||
"</div>"
|
||||
],
|
||||
"text/plain": [
|
||||
" 任务ID 任务名称 任务类型 模块路径 \\\n",
|
||||
"0 1 RSS新闻订阅 collector collectors.rss_subscriptions.NewsAPIClient \n",
|
||||
" 任务ID 任务名称 任务类型 模块路径 \\\n",
|
||||
"0 2 RSS基于规则数据处理 processor processors.processor_rss_data \n",
|
||||
"1 1 RSS新闻订阅 collector collectors.rss_subscriptions.NewsAPIClient \n",
|
||||
"\n",
|
||||
" Cron表达式 时区 下次运行时间 最后运行时间 \\\n",
|
||||
"0 5 0 * * * Asia/Shanghai 2025-10-18 00:05:00 2025-10-17 00:05:07 \n",
|
||||
" Cron表达式 时区 下次运行时间 最后运行时间 \\\n",
|
||||
"0 0 8,20 * * * Asia/Shanghai 2025-10-28 20:00:00 2025-10-28 13:34:49 \n",
|
||||
"1 */5 * * * * Asia/Shanghai 2025-10-28 13:40:00 2025-10-28 13:35:09 \n",
|
||||
"\n",
|
||||
" 运行状态 运行次数 是否活跃 is_running created_at updated_at \n",
|
||||
"0 success 4 1 0 2025-10-16 15:47:34 2025-10-17 00:05:08 "
|
||||
"0 success 10 1 0 2025-10-22 16:06:42 2025-10-28 13:34:50 \n",
|
||||
"1 success 495 1 0 2025-10-16 15:47:34 2025-10-28 13:35:09 "
|
||||
]
|
||||
},
|
||||
"execution_count": 2,
|
||||
@@ -433,70 +469,13 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"id": "eab90de72c35429e",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
"end_time": "2025-10-17T05:43:26.113877Z",
|
||||
"start_time": "2025-10-17T05:43:26.071398Z"
|
||||
"end_time": "2025-10-29T02:26:12.873536Z",
|
||||
"start_time": "2025-10-29T02:26:12.648420Z"
|
||||
}
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001b[32m2025-10-17 13:43:26\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"### 任务详情\n",
|
||||
"**任务ID**: 1\n",
|
||||
"**任务名称**: RSS新闻订阅\n",
|
||||
"**任务类型**: collector\n",
|
||||
"**模块路径**: collectors.rss_subscriptions.NewsAPIClient\n",
|
||||
"**Cron表达式**: 5 0 * * *\n",
|
||||
"**时区**: Asia/Shanghai\n",
|
||||
"**最后运行时间**: 2025-10-17 00:05:07\n",
|
||||
"**下次运行时间**: 2025-10-18 00:05:00\n",
|
||||
"**运行状态**: success\n",
|
||||
"**是否活跃**: 是\n",
|
||||
"**运行次数**: 4\n",
|
||||
"**创建时间**: 2025-10-16 15:47:34"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"{'task_id': 1,\n",
|
||||
" 'task_name': 'RSS新闻订阅',\n",
|
||||
" 'task_type': 'collector',\n",
|
||||
" 'module_path': 'collectors.rss_subscriptions.NewsAPIClient',\n",
|
||||
" 'cron_expression': '5 0 * * *',\n",
|
||||
" 'time_zone': 'Asia/Shanghai',\n",
|
||||
" 'next_run_time': Timestamp('2025-10-18 00:05:00'),\n",
|
||||
" 'last_run_time': Timestamp('2025-10-17 00:05:07'),\n",
|
||||
" 'last_run_status': 'success',\n",
|
||||
" 'run_count': 4,\n",
|
||||
" 'is_active': 1,\n",
|
||||
" 'is_running': 0,\n",
|
||||
" 'created_at': Timestamp('2025-10-16 15:47:34'),\n",
|
||||
" 'updated_at': Timestamp('2025-10-17 00:05:08')}"
|
||||
]
|
||||
},
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# 查看指定任务的详情\n",
|
||||
"def show_task_details(task_id):\n",
|
||||
@@ -524,7 +503,53 @@
|
||||
"\n",
|
||||
"# 执行:查看任务ID为1的详情(替换为实际ID)\n",
|
||||
"show_task_details(1)"
|
||||
]
|
||||
],
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001B[32m2025-10-29 10:26:12\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mmysql_agent\u001B[0m - \u001B[1m查询执行成功\u001B[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
],
|
||||
"text/markdown": "### 任务详情\n**任务ID**: 1\n**任务名称**: RSS新闻订阅\n**任务类型**: collector\n**模块路径**: processors.processor_rss_data.RSSDataProcessor\n**Cron表达式**: */5 * * * *\n**时区**: Asia/Shanghai\n**最后运行时间**: 2025-10-28 13:35:09\n**下次运行时间**: 2025-10-29 10:25:00\n**运行状态**: success\n**是否活跃**: 是\n**运行次数**: 496\n**创建时间**: 2025-10-16 15:47:34"
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data",
|
||||
"jetTransient": {
|
||||
"display_id": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"{'task_id': 1,\n",
|
||||
" 'task_name': 'RSS新闻订阅',\n",
|
||||
" 'task_type': 'collector',\n",
|
||||
" 'module_path': 'processors.processor_rss_data.RSSDataProcessor',\n",
|
||||
" 'cron_expression': '*/5 * * * *',\n",
|
||||
" 'time_zone': 'Asia/Shanghai',\n",
|
||||
" 'next_run_time': Timestamp('2025-10-29 10:25:00'),\n",
|
||||
" 'last_run_time': Timestamp('2025-10-28 13:35:09'),\n",
|
||||
" 'last_run_status': 'success',\n",
|
||||
" 'run_count': 496,\n",
|
||||
" 'is_active': 1,\n",
|
||||
" 'is_running': 0,\n",
|
||||
" 'created_at': Timestamp('2025-10-16 15:47:34'),\n",
|
||||
" 'updated_at': Timestamp('2025-10-29 10:24:49')}"
|
||||
]
|
||||
},
|
||||
"execution_count": 10,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"execution_count": 10
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
@@ -536,24 +561,16 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 14,
|
||||
"execution_count": 4,
|
||||
"id": "2b2d723bb8e2784f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stderr",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"d:\\ProgramTools\\anaconda3\\envs\\intelligence_system\\Lib\\site-packages\\requests\\__init__.py:86: RequestsDependencyWarning: Unable to find acceptable character detection dependency (chardet or charset_normalizer).\n",
|
||||
" warnings.warn(\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001b[32m2025-10-16 15:47:34\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
|
||||
"\u001b[32m2025-10-16 15:47:34\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m新任务添加成功\u001b[0m\n"
|
||||
"\u001B[32m2025-10-29 09:56:52\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mmysql_agent\u001B[0m - \u001B[1m查询执行成功\u001B[0m\n",
|
||||
"\u001B[32m2025-10-29 09:56:52\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mtask_scheduler\u001B[0m - \u001B[1m新任务添加成功\u001B[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -571,7 +588,7 @@
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"新任务ID: 0,任务名称: RSS新闻订阅"
|
||||
"新任务ID: 0,任务名称: AI处理RSS新闻"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
@@ -586,7 +603,7 @@
|
||||
"np.int64(0)"
|
||||
]
|
||||
},
|
||||
"execution_count": 14,
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -611,9 +628,9 @@
|
||||
"\n",
|
||||
"# 执行:添加一个新闻采集任务\n",
|
||||
"add_new_task(\n",
|
||||
" name=\"RSS新闻订阅\",\n",
|
||||
" task_type=\"collector\",\n",
|
||||
" module_path=\"collectors.rss_subscriptions\",\n",
|
||||
" name=\"AI处理RSS新闻\",\n",
|
||||
" task_type=\"processor\",\n",
|
||||
" module_path=\"processors.ai_processors.ai_processor_rss_data.RSSDataAIProcessor\",\n",
|
||||
" cron_expression=\"5 0 * * *\", # 每5分钟执行1次\n",
|
||||
" timezone=\"Asia/Shanghai\"\n",
|
||||
")"
|
||||
@@ -629,69 +646,13 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"id": "c892fd8ad2f0dd9d",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
"end_time": "2025-10-17T05:44:19.046308Z",
|
||||
"start_time": "2025-10-17T05:44:18.980345Z"
|
||||
"end_time": "2025-10-29T02:29:56.088085Z",
|
||||
"start_time": "2025-10-29T02:29:55.754298Z"
|
||||
}
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"### 任务ID 1 更新成功"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001b[32m2025-10-17 13:44:19\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"### 任务详情\n",
|
||||
"**任务ID**: 1\n",
|
||||
"**任务名称**: RSS新闻订阅\n",
|
||||
"**任务类型**: collector\n",
|
||||
"**模块路径**: collectors.rss_subscriptions.NewsAPIClient\n",
|
||||
"**Cron表达式**: 5 * * * *\n",
|
||||
"**时区**: Asia/Shanghai\n",
|
||||
"**最后运行时间**: 2025-10-17 00:05:07\n",
|
||||
"**下次运行时间**: 2025-10-18 00:05:00\n",
|
||||
"**运行状态**: success\n",
|
||||
"**是否活跃**: 是\n",
|
||||
"**运行次数**: 4\n",
|
||||
"**创建时间**: 2025-10-16 15:47:34"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"True"
|
||||
]
|
||||
},
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# 更新任务属性\n",
|
||||
"def update_task(task_id, **kwargs):\n",
|
||||
@@ -720,11 +681,57 @@
|
||||
" return success\n",
|
||||
"\n",
|
||||
"# 执行:更新任务(示例:修改任务1的Cron表达式为每天10点)\n",
|
||||
"update_task(1, cron = \"5 * * * *\")\n",
|
||||
"update_task(2, module = \"processors.processor_rss_data\")\n",
|
||||
"\n",
|
||||
"# 执行:同时更新多个属性(名称和Cron表达式)\n",
|
||||
"# update_task(1, name=\"每日早间新闻采集\", cron=\"0 8 * * *\")"
|
||||
]
|
||||
],
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
],
|
||||
"text/markdown": "### 任务ID 2 更新成功"
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data",
|
||||
"jetTransient": {
|
||||
"display_id": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001B[32m2025-10-29 10:29:56\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mmysql_agent\u001B[0m - \u001B[1m查询执行成功\u001B[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
],
|
||||
"text/markdown": "### 任务详情\n**任务ID**: 2\n**任务名称**: RSS基于规则数据处理\n**任务类型**: processor\n**模块路径**: processors.processor_rss_data\n**Cron表达式**: 0 8,20 * * *\n**时区**: Asia/Shanghai\n**最后运行时间**: 2025-10-28 13:34:49\n**下次运行时间**: 2025-10-28 20:00:00\n**运行状态**: success\n**是否活跃**: 是\n**运行次数**: 10\n**创建时间**: 2025-10-22 16:06:42"
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data",
|
||||
"jetTransient": {
|
||||
"display_id": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"True"
|
||||
]
|
||||
},
|
||||
"execution_count": 21,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"execution_count": 21
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
@@ -786,188 +793,22 @@
|
||||
"id": "c554c748169d5ac8",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## 7. 手动执行任务(对应命令行 run)"
|
||||
"## 7. 手动执行任务(对应命令行 run)\n",
|
||||
"\n",
|
||||
"自动识别main,即main的上一级"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "94892f4134316f8e",
|
||||
"metadata": {
|
||||
"jupyter": {
|
||||
"is_executing": true
|
||||
},
|
||||
"ExecuteTime": {
|
||||
"end_time": "2025-10-17T05:44:37.714559Z",
|
||||
"start_time": "2025-10-17T05:44:35.084369Z"
|
||||
"start_time": "2025-10-29T02:30:10.298891Z"
|
||||
}
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"### 开始执行任务ID 2"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"---"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001b[32m2025-10-23 16:57:20\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m查询执行成功\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:20\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理器初始化完成\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:20\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m开始处理RSS数据...\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功加载 8 条未处理的RSS数据\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[33m\u001b[1mWARNING \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[33m\u001b[1m停用词文件不存在: processors/stopwords.txt,使用默认停用词\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[33m\u001b[1mWARNING \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[33m\u001b[1m关键词文件不存在: processors/keywords.txt\u001b[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "stderr",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Building prefix dict from the default dictionary ...\n",
|
||||
"Loading model from cache C:\\Users\\zy187\\AppData\\Local\\Temp\\jieba.cache\n",
|
||||
"Loading model cost 0.609 seconds.\n",
|
||||
"Prefix dict has been built successfully.\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m数据处理完成,共处理 8 条记录\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m过滤出 1 条汽车后市场相关新闻\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mmysql_agent\u001b[0m - \u001b[1m表 processed_rss_data 插入结果汇总\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功保存 1 条处理结果到数据库\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1m成功标记 8 条数据为已处理\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mprocessor_rss_data\u001b[0m - \u001b[1mRSS数据处理完成\u001b[0m\n",
|
||||
"\u001b[32m2025-10-23 16:57:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mtask_scheduler\u001b[0m - \u001b[1m任务执行完成,耗时: 1.19秒\u001b[0m\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"**任务名称**: RSS基于规则数据处理"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"**任务ID**: 2"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"**执行时长**: 1.26 秒"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"---"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"### 📋 执行输出:"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"RSS数据处理完成!\n",
|
||||
"处理统计: {'total_articles': 8, 'filtered_articles': 1, 'filter_rate': 0.125, 'processing_time': '2025-10-23 16:57:21', 'save_success': True, 'mark_success': True}\n",
|
||||
"\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"---"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/markdown": [
|
||||
"### ✅ 任务执行成功"
|
||||
],
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"{'success': True,\n",
|
||||
" 'task_name': 'RSS基于规则数据处理',\n",
|
||||
" 'task_id': 2,\n",
|
||||
" 'execution_time': 1.2610254287719727,\n",
|
||||
" 'output': \"RSS数据处理完成!\\n处理统计: {'total_articles': 8, 'filtered_articles': 1, 'filter_rate': 0.125, 'processing_time': '2025-10-23 16:57:21', 'save_success': True, 'mark_success': True}\\n\",\n",
|
||||
" 'error': None}"
|
||||
]
|
||||
},
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# 手动执行任务(异步方式,快速返回)\n",
|
||||
"def run_task_manually(task_id):\n",
|
||||
@@ -1013,8 +854,47 @@
|
||||
" return result\n",
|
||||
"\n",
|
||||
"# 执行:手动运行任务ID为2的任务(显示详细执行过程)\n",
|
||||
"run_task_with_details(2)"
|
||||
]
|
||||
"run_task_with_details(3)"
|
||||
],
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
],
|
||||
"text/markdown": "### 开始执行任务ID 3"
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data",
|
||||
"jetTransient": {
|
||||
"display_id": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"<IPython.core.display.Markdown object>"
|
||||
],
|
||||
"text/markdown": "---"
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data",
|
||||
"jetTransient": {
|
||||
"display_id": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"\u001B[32m2025-10-29 10:30:10\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mmysql_agent\u001B[0m - \u001B[1m查询执行成功\u001B[0m\n",
|
||||
"\u001B[32m2025-10-29 10:30:11\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mai_processor_rss_data\u001B[0m - \u001B[1mRSS数据AI处理器初始化完成\u001B[0m\n",
|
||||
"\u001B[32m2025-10-29 10:30:11\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mai_processor_rss_data\u001B[0m - \u001B[1m开始批量处理数据,批次大小: 200, 延迟: 1.5秒\u001B[0m\n",
|
||||
"\u001B[32m2025-10-29 10:30:11\u001B[0m | \u001B[1mINFO \u001B[0m | \u001B[36mai_processor_rss_data\u001B[0m - \u001B[1m成功加载 3 条未处理的数据\u001B[0m\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"execution_count": null
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user