Files
panda 2528a2778c 校验唯一任务添加时间
优化续约代办请求次数
2026-01-04 13:44:53 +08:00

265 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# utils.py
from datetime import timedelta
from task_executor import execute_task
import queue
from log_config import configure_task_logger, configure_error_task_logger
from pathlib import Path
import os
import csv
from queue import Queue
from datetime import datetime
# 获取常规日志记录器
logger = configure_task_logger()
# 获取错误任务日志记录器
error_task_logger = configure_error_task_logger()
# 全局任务字典,使用 (unique_id, exec_time) 作为组合键,支持一个任务多个执行时间
all_tasks = {}
class PersistentQueue:
def __init__(self, db_relative_path='db/task_queue.csv'):
# 获取当前脚本所在的目录,并构建跨平台的 CSV 文件路径
current_dir = os.path.dirname(os.path.abspath(__file__))
self.db_file = os.path.join(current_dir, db_relative_path)
# 确保 db 目录存在
os.makedirs(os.path.dirname(self.db_file), exist_ok=True)
self.queue = Queue()
self.load_from_disk()
def load_from_disk(self):
"""从 CSV 文件加载任务队列"""
try:
with open(self.db_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# 将字符串字段转换为适当的数据类型
task = {
'unique_id': row['unique_id'],
'exec_time': row['exec_time'], # 保持为字符串
'is_switch_on': row['is_switch_on'].lower() == 'true', # 转换为布尔值
'status': row['status'],
}
# 动态添加 should_execute 字段
exec_time = datetime.strptime(task['exec_time'], '%H:%M').time()
task['should_execute'] = lambda now, et=exec_time: now.time() >= et
self.queue.put(task)
logger.info("任务队列已从磁盘加载。")
except FileNotFoundError:
logger.warning("CSV 文件未找到,初始化为空任务队列。")
except Exception as e:
error_task_logger.error(f"加载任务队列时发生异常: {e}")
def save_to_disk(self):
"""将任务队列保存到 CSV 文件"""
try:
with open(self.db_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status'])
writer.writeheader() # 写入表头
temp_queue = Queue()
while not self.queue.empty():
task = self.queue.get()
# 只保存可序列化的字段
task_to_save = {
'unique_id': task['unique_id'],
'exec_time': task['exec_time'],
'is_switch_on': task['is_switch_on'],
'status': task['status'],
}
writer.writerow(task_to_save)
temp_queue.put(task) # 重新加入临时队列以保留任务
self.queue.task_done() # 标记任务完成以释放锁
# 将所有任务从临时队列放回原队列
while not temp_queue.empty():
self.queue.put(temp_queue.get())
logger.info("任务队列已保存到磁盘。")
except Exception as e:
error_task_logger.error(f"保存任务队列时发生异常: {e}")
def put(self, task):
"""将任务加入队列并保存到磁盘"""
self.queue.put(task)
self.save_to_disk()
logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 已加入队列。")
def get(self):
"""从队列中获取任务并保存到磁盘"""
if not self.queue.empty():
task = self.queue.get()
self.save_to_disk()
logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 已从队列中取出。")
return task
else:
logger.info("任务队列为空。")
return None
def task_done(self):
"""标记任务完成并保存到磁盘"""
self.queue.task_done()
self.save_to_disk()
logger.info("任务已完成标记。")
def is_empty(self):
"""检查队列是否为空"""
return self.queue.empty()
def empty(self):
"""返回队列是否为空的状态"""
return self.is_empty()
task_queue = PersistentQueue()
def load_tasks_and_execute():
"""
从 tasks.csv 文件中加载任务到持久化队列中,仅包含待执行任务。
支持一个任务多个执行时间,避免重复添加任务。
"""
try:
load_tasks_from_csv() # 从 CSV 文件加载所有任务
now = datetime.now()
# 获取队列中已有任务标识(使用 (unique_id, exec_time) 避免重复添加)
existing_task_keys = set()
# 创建一个临时队列来遍历任务
temp_queue = Queue()
while not task_queue.empty():
task = task_queue.get()
task_key = (task['unique_id'], task['exec_time'])
existing_task_keys.add(task_key)
temp_queue.put(task)
# 将任务放回原队列
while not temp_queue.empty():
task_queue.put(temp_queue.get())
for task in all_tasks.values():
# 检查任务是否过期(超过5分钟)
exec_time_today = datetime.combine(now.date(), datetime.strptime(task['exec_time'], '%H:%M').time())
if (now - exec_time_today) > timedelta(minutes=5):
if task['status'] == '待执行':
error_task_logger.error(f"任务 {task['unique_id']} ({task['exec_time']}) 超过执行窗口5分钟以上,标记为过期。")
update_task_status(task['unique_id'], task['exec_time'], '过期')
continue # 不处理过期任务
# 使用组合键判断任务是否已在队列中
task_key = (task['unique_id'], task['exec_time'])
# 如果任务应该执行、状态是待执行且不在队列中
if (task['should_execute'](now) and
task['status'] == '待执行' and
task_key not in existing_task_keys):
task_queue.put(task)
existing_task_keys.add(task_key)
logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 加入队列。")
except Exception as e:
error_task_logger.error(f"加载任务时出错: {e}")
# 执行队列中的任务(每次只执行一条)
execute_single_task()
def execute_single_task():
"""从队列中取出并执行单个任务,支持一个任务多个执行时间"""
if not task_queue.empty():
try:
task = task_queue.get()
if task:
# 标记任务为正在执行
update_task_status(task['unique_id'], task['exec_time'], '正在执行')
try:
success = execute_task(task['unique_id'])
if success:
update_task_status(task['unique_id'], task['exec_time'], '已完成')
logger.info(f"任务 {task['unique_id']} ({task['exec_time']}) 执行成功。")
else:
update_task_status(task['unique_id'], task['exec_time'], '失败')
error_task_logger.error(f"任务 {task['unique_id']} ({task['exec_time']}) 执行失败。")
except Exception as e:
error_task_logger.error(f"任务 {task['unique_id']} ({task['exec_time']}) 执行时发生异常: {e}")
update_task_status(task['unique_id'], task['exec_time'], '失败')
task_queue.task_done()
except queue.Empty:
logger.info("队列为空,无任务可执行。")
def load_tasks_from_csv():
"""从 tasks.csv 文件中加载任务到全局任务字典,支持一个任务多个执行时间"""
try:
csv_file = Path('tasks.csv')
if csv_file.is_file():
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# 将字符串字段转换为适当的数据类型
task = {
'unique_id': row['unique_id'],
'exec_time': row['exec_time'],
'is_switch_on': row['is_switch_on'].lower() == 'true',
'status': row['status'],
}
# 动态添加 should_execute 字段
exec_time = datetime.strptime(task['exec_time'], '%H:%M').time()
task['should_execute'] = lambda now, et=exec_time: now.time() >= et
# 使用 (unique_id, exec_time) 作为组合键,支持一个任务多个执行时间
task_key = (task['unique_id'], task['exec_time'])
all_tasks[task_key] = task
logger.info("任务已从磁盘加载到全局任务字典。")
else:
logger.warning("tasks.csv 文件未找到,初始化为空任务字典。")
except Exception as e:
error_task_logger.error(f"加载任务时发生异常: {e}")
def save_tasks_to_csv():
"""将所有任务的状态保存到 tasks.csv 文件,支持一个任务多个执行时间"""
try:
with open('tasks.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status'])
writer.writeheader()
# 遍历所有任务实例(包括相同unique_id的不同exec_time
for task in all_tasks.values():
task_to_save = {
'unique_id': task['unique_id'],
'exec_time': task['exec_time'],
'is_switch_on': task['is_switch_on'],
'status': task['status'],
}
writer.writerow(task_to_save)
logger.info("所有任务状态已保存到磁盘。")
except Exception as e:
error_task_logger.error(f"保存任务状态时出错: {e}")
def update_task_status(task_id, exec_time, new_status):
"""更新指定任务的状态,并保存到磁盘,支持一个任务多个执行时间"""
task_key = (task_id, exec_time)
if task_key in all_tasks:
all_tasks[task_key]['status'] = new_status
save_tasks_to_csv()
logger.info(f"任务 {task_id} ({exec_time}) 状态已更新为 {new_status}")
else:
error_task_logger.error(f"尝试更新不存在的任务 {task_id} ({exec_time}) 的状态。")
def load_tasks_on_start():
"""程序启动时加载任务"""
logger.info("启动时加载并执行任务...")
load_tasks_and_execute()
logger.info("启动任务加载完成。")