135 lines
5.2 KiB
Python
135 lines
5.2 KiB
Python
import signal
|
|
import time
|
|
from datetime import datetime
|
|
from system_management.scheduler.task_scheduler import TaskScheduler
|
|
from utils.logger import CrossPlatformLog
|
|
from config import Config
|
|
|
|
|
|
# 初始化日志
|
|
log = CrossPlatformLog.get_logger("Main")
|
|
|
|
|
|
class IntelligenceSystem:
|
|
def __init__(self, db_config=None, run_all_on_startup=False):
|
|
"""初始化系统(仅作为容器,不包含业务逻辑)
|
|
|
|
Args:
|
|
db_config: 数据库配置
|
|
run_all_on_startup: 启动时是否立即执行所有到期任务(默认False)
|
|
"""
|
|
self.scheduler = TaskScheduler(Config.MYSQL_CONFIG, max_workers=5)
|
|
self._running = False
|
|
self.run_all_on_startup = run_all_on_startup
|
|
log.info(f"情报系统已初始化(Cron模式),启动时执行任务: {run_all_on_startup}")
|
|
|
|
def start(self):
|
|
"""启动系统主入口"""
|
|
self._running = True
|
|
self._setup_signal_handlers()
|
|
log.info("系统启动 - 运行在Cron调度模式")
|
|
|
|
# 启动时执行所有到期任务(如果开关开启)
|
|
if self.run_all_on_startup:
|
|
print(f"\n{'='*60}")
|
|
print("🚀 启动时执行所有到期任务...")
|
|
print(f"{'='*60}\n")
|
|
log.info("启动时执行所有到期任务")
|
|
result = self.scheduler.check_and_run_tasks(print_empty_status=True)
|
|
print(f"\n启动任务执行完成: 总数={result['总任务数']}, 成功={result['成功']}, 失败={result['失败']}\n")
|
|
|
|
# 时间追踪变量
|
|
last_status_print_time = time.time() # 上次打印状态的时间
|
|
last_hourly_report_time = time.time() # 上次小时统计的时间
|
|
status_print_interval = 60 # 每分钟打印一次状态(60秒)
|
|
hourly_report_interval = 3600 # 每小时统计一次(3600秒)
|
|
|
|
try:
|
|
# 主循环 - 仅负责定期检查任务
|
|
while self._running:
|
|
current_time = time.time()
|
|
|
|
# 判断是否需要打印状态(每分钟一次)
|
|
should_print_status = (current_time - last_status_print_time) >= status_print_interval
|
|
|
|
# 检查并执行到期任务
|
|
self.scheduler.check_and_run_tasks(print_empty_status=should_print_status)
|
|
|
|
# 更新最后打印时间
|
|
if should_print_status:
|
|
last_status_print_time = current_time
|
|
|
|
# 检查是否需要进行小时统计(每小时一次)
|
|
if (current_time - last_hourly_report_time) >= hourly_report_interval:
|
|
self._print_hourly_stats()
|
|
last_hourly_report_time = current_time
|
|
|
|
# 短间隔轮询(每10秒检查一次,保证Cron时间精度)
|
|
time.sleep(10)
|
|
|
|
except Exception as e:
|
|
log.critical("系统主循环崩溃", exc_info=True)
|
|
finally:
|
|
self.shutdown()
|
|
|
|
def _setup_signal_handlers(self):
|
|
"""设置系统信号处理器"""
|
|
signal.signal(signal.SIGINT, self._handle_shutdown)
|
|
signal.signal(signal.SIGTERM, self._handle_shutdown)
|
|
log.debug("信号处理器已注册")
|
|
|
|
def _handle_shutdown(self, signum, frame):
|
|
"""处理系统关闭信号"""
|
|
log.info(f"收到关闭信号 {signum},开始关闭系统")
|
|
self._running = False
|
|
|
|
def _print_hourly_stats(self):
|
|
"""打印并重置小时统计信息"""
|
|
stats = self.scheduler.get_and_reset_hourly_stats()
|
|
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"📊 小时任务统计报告 - {now}")
|
|
print(f"{'='*60}")
|
|
print(f" 总任务数: {stats['总数']}")
|
|
print(f" 成功: {stats['成功']}")
|
|
print(f" 失败: {stats['失败']}")
|
|
if stats['总数'] > 0:
|
|
success_rate = (stats['成功'] / stats['总数']) * 100
|
|
print(f" 成功率: {success_rate:.1f}%")
|
|
print(f"{'='*60}\n")
|
|
|
|
log.info(
|
|
"小时任务统计",
|
|
总任务数=stats['总数'],
|
|
成功=stats['成功'],
|
|
失败=stats['失败']
|
|
)
|
|
|
|
def shutdown(self):
|
|
"""优雅关闭系统"""
|
|
log.info("开始优雅关闭系统")
|
|
|
|
# 等待所有正在执行的任务完成
|
|
self.scheduler.executor.shutdown(wait=True, cancel_futures=False)
|
|
|
|
# 记录最终状态
|
|
pending_count = self.scheduler.get_pending_tasks_count()
|
|
log.info(
|
|
"系统关闭完成",
|
|
pending_tasks=pending_count,
|
|
shutdown_time=datetime.now()
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
# 启动系统 - 仅作为入口,不包含调度逻辑
|
|
# run_all_on_startup=True: 启动时立即执行所有到期任务
|
|
# run_all_on_startup=False: 启动时不执行任务,等待下次调度周期
|
|
system = IntelligenceSystem(run_all_on_startup=False)
|
|
system.start()
|
|
except Exception as e:
|
|
log.critical("情报系统启动失败", exc_info=True)
|
|
raise
|