# utils.py from datetime import timedelta from task_executor import execute_task import queue from log_config import configure_task_logger, configure_error_task_logger from pathlib import Path import os import csv from queue import Queue from datetime import datetime # 获取常规日志记录器 logger = configure_task_logger() # 获取错误任务日志记录器 error_task_logger = configure_error_task_logger() # 全局任务字典,使用 unique_id 作为键 all_tasks = {} class PersistentQueue: def __init__(self, db_relative_path='db/task_queue.csv'): # 获取当前脚本所在的目录,并构建跨平台的 CSV 文件路径 current_dir = os.path.dirname(os.path.abspath(__file__)) self.db_file = os.path.join(current_dir, db_relative_path) # 确保 db 目录存在 os.makedirs(os.path.dirname(self.db_file), exist_ok=True) self.queue = Queue() self.load_from_disk() def load_from_disk(self): """从 CSV 文件加载任务队列""" try: with open(self.db_file, 'r', newline='', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: # 将字符串字段转换为适当的数据类型 task = { 'unique_id': row['unique_id'], 'exec_time': row['exec_time'], # 保持为字符串 'is_switch_on': row['is_switch_on'].lower() == 'true', # 转换为布尔值 'status': row['status'], } # 动态添加 should_execute 字段 exec_time = datetime.strptime(task['exec_time'], '%H:%M').time() task['should_execute'] = lambda now, et=exec_time: now.time() >= et self.queue.put(task) logger.info("任务队列已从磁盘加载。") except FileNotFoundError: logger.warning("CSV 文件未找到,初始化为空任务队列。") except Exception as e: error_task_logger.error(f"加载任务队列时发生异常: {e}") def save_to_disk(self): """将任务队列保存到 CSV 文件""" try: with open(self.db_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status']) writer.writeheader() # 写入表头 temp_queue = Queue() while not self.queue.empty(): task = self.queue.get() # 只保存可序列化的字段 task_to_save = { 'unique_id': task['unique_id'], 'exec_time': task['exec_time'], 'is_switch_on': task['is_switch_on'], 'status': task['status'], } writer.writerow(task_to_save) temp_queue.put(task) # 重新加入临时队列以保留任务 self.queue.task_done() # 标记任务完成以释放锁 # 将所有任务从临时队列放回原队列 while not temp_queue.empty(): self.queue.put(temp_queue.get()) logger.info("任务队列已保存到磁盘。") except Exception as e: error_task_logger.error(f"保存任务队列时发生异常: {e}") def put(self, task): """将任务加入队列并保存到磁盘""" self.queue.put(task) self.save_to_disk() logger.info(f"任务 {task['unique_id']} 已加入队列。") def get(self): """从队列中获取任务并保存到磁盘""" if not self.queue.empty(): task = self.queue.get() self.save_to_disk() logger.info(f"任务 {task['unique_id']} 已从队列中取出。") return task else: logger.info("任务队列为空。") return None def task_done(self): """标记任务完成并保存到磁盘""" self.queue.task_done() self.save_to_disk() logger.info("任务已完成标记。") def is_empty(self): """检查队列是否为空""" return self.queue.empty() def empty(self): """返回队列是否为空的状态""" return self.is_empty() task_queue = PersistentQueue() def load_tasks_and_execute(): """ 从 tasks.csv 文件中加载任务到持久化队列中,仅包含待执行任务。 避免重复添加任务。 """ try: load_tasks_from_csv() # 从 CSV 文件加载所有任务 now = datetime.now() # 获取队列中已有任务ID(避免重复添加) existing_task_ids = set() # 创建一个临时队列来遍历任务 temp_queue = Queue() while not task_queue.empty(): task = task_queue.get() existing_task_ids.add(task['unique_id']) temp_queue.put(task) # 将任务放回原队列 while not temp_queue.empty(): task_queue.put(temp_queue.get()) for task in all_tasks.values(): # 检查任务是否过期(超过5分钟) exec_time_today = datetime.combine(now.date(), datetime.strptime(task['exec_time'], '%H:%M').time()) if (now - exec_time_today) > timedelta(minutes=5): if task['status'] == '待执行': error_task_logger.error(f"任务 {task['unique_id']} 超过执行窗口5分钟以上,标记为过期。") update_task_status(task['unique_id'], '过期') continue # 不处理过期任务 # 如果任务应该执行、状态是待执行且不在队列中 if (task['should_execute'](now) and task['status'] == '待执行' and task['unique_id'] not in existing_task_ids): task_queue.put(task) existing_task_ids.add(task['unique_id']) logger.info(f"任务 {task['unique_id']} 加入队列。") except Exception as e: error_task_logger.error(f"加载任务时出错: {e}") # 执行队列中的任务(每次只执行一条) execute_single_task() def execute_single_task(): """从队列中取出并执行单个任务""" if not task_queue.empty(): try: task = task_queue.get() if task: # 标记任务为正在执行 update_task_status(task['unique_id'], '正在执行') try: success = execute_task(task['unique_id']) if success: update_task_status(task['unique_id'], '已完成') logger.info(f"任务 {task['unique_id']} 执行成功。") else: update_task_status(task['unique_id'], '失败') error_task_logger.error(f"任务 {task['unique_id']} 执行失败。") except Exception as e: error_task_logger.error(f"任务 {task['unique_id']} 执行时发生异常: {e}") update_task_status(task['unique_id'], '失败') task_queue.task_done() except queue.Empty: logger.info("队列为空,无任务可执行。") def load_tasks_from_csv(): """从 tasks.csv 文件中加载任务到全局任务字典""" try: csv_file = Path('tasks.csv') if csv_file.is_file(): with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: # 将字符串字段转换为适当的数据类型 task = { 'unique_id': row['unique_id'], 'exec_time': row['exec_time'], 'is_switch_on': row['is_switch_on'].lower() == 'true', 'status': row['status'], } # 动态添加 should_execute 字段 exec_time = datetime.strptime(task['exec_time'], '%H:%M').time() task['should_execute'] = lambda now, et=exec_time: now.time() >= et all_tasks[task['unique_id']] = task logger.info("任务已从磁盘加载到全局任务字典。") else: logger.warning("tasks.csv 文件未找到,初始化为空任务字典。") except Exception as e: error_task_logger.error(f"加载任务时发生异常: {e}") def save_tasks_to_csv(): """将所有任务的状态保存到 tasks.csv 文件""" try: with open('tasks.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status']) writer.writeheader() for task in all_tasks.values(): task_to_save = { 'unique_id': task['unique_id'], 'exec_time': task['exec_time'], 'is_switch_on': task['is_switch_on'], 'status': task['status'], } writer.writerow(task_to_save) logger.info("所有任务状态已保存到磁盘。") except Exception as e: error_task_logger.error(f"保存任务状态时出错: {e}") def update_task_status(task_id, new_status): """更新指定任务的状态,并保存到磁盘""" if task_id in all_tasks: all_tasks[task_id]['status'] = new_status save_tasks_to_csv() logger.info(f"任务 {task_id} 状态已更新为 {new_status}。") else: error_task_logger.error(f"尝试更新不存在的任务 {task_id} 的状态。") def load_tasks_on_start(): """程序启动时加载任务""" logger.info("启动时加载并执行任务...") load_tasks_and_execute() logger.info("启动任务加载完成。")