""" 应用工具模块 本模块提供应用级的工具类,包括: - 日志记录器配置(支持日志轮转) - 后台任务调度器 - 任务队列管理 - 请求头解码工具 这些工具在整个应用中被广泛使用。 """ import logging import os from logging.handlers import RotatingFileHandler from queue import Queue import threading from apscheduler.schedulers.background import BackgroundScheduler from urllib.parse import unquote class AppTools: """ 应用级工具集合类 提供应用级别的工具功能,包括: - 初始化轮转日志记录器 - 初始化后台调度器(进程退出时自动关闭) - 维护一个简单的任务队列与后台处理线程 属性: config: 配置对象 task_queue: 任务队列 logger: 日志记录器 scheduler: 后台调度器 """ def __init__(self, config): """ 初始化应用工具 Args: config: 配置对象,包含日志、目录等配置信息 """ self.config = config self.task_queue = Queue() self.logger = self._setup_logger() self.scheduler = self._setup_scheduler() self._start_task_thread() def _setup_logger(self): """ 配置带轮转的文件日志记录器 创建支持日志轮转的文件日志记录器,避免重复添加相同文件处理器。 Returns: logging.Logger: 配置好的日志记录器 注意: - 日志文件最大 5MB - 保留 5 个备份文件 - 使用 UTF-8 编码 """ log_dir = self.config.LOGS_DIRECTORY if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = self.config.LOG_FILE logger = logging.getLogger("app") logger.setLevel(logging.INFO) # 若未绑定目标日志文件的 RotatingFileHandler,则创建并绑定 if not any(isinstance(h, RotatingFileHandler) and getattr(h, 'baseFilename', None) == str(log_file) for h in logger.handlers): file_handler = RotatingFileHandler(log_file, maxBytes=1024 * 1024 * 5, backupCount=5, encoding='utf-8') file_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s') file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger def _setup_scheduler(self): """ 初始化后台调度器 创建后台调度器,并在进程退出时优雅关闭,防止资源泄漏。 Returns: BackgroundScheduler: 后台调度器实例 注意: 调度器会在进程退出时自动关闭 """ scheduler = BackgroundScheduler() import atexit atexit.register(lambda: scheduler.shutdown(wait=False)) return scheduler def _start_task_thread(self): """ 启动后台任务处理线程 启动一个守护线程来处理任务队列中的任务,守护模式运行,随主线程退出。 """ task_thread = threading.Thread(target=self.process_tasks, daemon=True) task_thread.start() def process_tasks(self): """ 后台消费队列中的任务: 任务结构为 {'handler': callable, 'data': any, 'response': Queue} - 正常执行时将 handler(data) 的结果放入 response 队列 - 发生异常时记录错误并将失败信息放入 response 队列 - 每次任务完成后调用 task_done() """ while True: task = self.task_queue.get() if task is None: # 外部以 None 作为结束信号 self.logger.error("任务处理线程已终止") break try: result = task['handler'](task['data']) task['response'].put(result) except Exception as e: self.logger.error(f"任务执行失败: {str(e)}") task['response'].put({'msg': f'任务执行失败: {str(e)}'}) finally: self.task_queue.task_done() self.logger.info("任务处理完成") def enqueue_task(self, handler, data): """ 将任务入队 将任务放入任务队列,并返回一个响应队列,调用方可以从响应队列中获取执行结果。 Args: handler: 处理函数,执行具体业务逻辑 data: 传递给处理函数的数据 Returns: Queue: 响应队列,用于获取任务执行结果 """ response_queue = Queue() self.task_queue.put({ 'handler': handler, 'data': data, 'response': response_queue }) return response_queue @staticmethod def decode_headers(headers): """ 解码请求头 对请求头字典进行 URL 解码(UTF-8),返回解码后的副本。 主要用于处理包含中文字符的请求头。 Args: headers: 请求头字典 Returns: dict: 解码后的请求头字典 """ return {key: unquote(value, encoding='utf-8') for key, value in headers.items()} # 全局日志记录器变量 # 用于存储全局日志记录器实例,避免重复初始化 logger = None def setup_global_logger(config): """ 设置全局日志记录器 懒加载并返回全局 logger,若未初始化则构建 AppTools 并复用其中的 logger。 避免重复初始化日志处理器。 Args: config: 配置对象 Returns: logging.Logger: 全局日志记录器实例 """ global logger if logger is None: app_tools = AppTools(config) logger = app_tools.logger return logger