diff --git a/ReportEngine/flask_interface.py b/ReportEngine/flask_interface.py index 2fd30f2..db28fa2 100644 --- a/ReportEngine/flask_interface.py +++ b/ReportEngine/flask_interface.py @@ -34,6 +34,8 @@ task_lock = threading.Lock() # 通过有界deque缓存最近的事件,方便SSE断线后快速补发 MAX_TASK_HISTORY = 5 STREAM_HEARTBEAT_INTERVAL = 15 # 心跳间隔秒 +STREAM_IDLE_TIMEOUT = 120 # 终态后最长保活时间,避免孤儿SSE阻塞 +STREAM_TERMINAL_STATUSES = {"completed", "error", "cancelled"} stream_lock = threading.Lock() stream_subscribers = defaultdict(list) tasks_registry: Dict[str, 'ReportTask'] = {} @@ -717,6 +719,19 @@ def stream_task(task_id: str): except ValueError: last_event_id = None + def client_disconnected() -> bool: + """ + 尽早探测客户端是否已经断开,避免继续写入触发BrokenPipe。 + + eventlet 在 Windows 上会在关闭连接时抛出 ConnectionAbortedError, + 提前退出生成器可以缩减无意义的日志。 + """ + try: + env_input = request.environ.get('wsgi.input') + return bool(getattr(env_input, 'closed', False)) + except Exception: + return False + def event_generator(): """ SSE事件生成器。 @@ -726,22 +741,29 @@ def stream_task(task_id: str): - 周期性发送心跳并在任务结束后自动注销监听。 """ queue = _register_stream(task_id) + last_data_ts = time.time() try: # 断线重连场景下,先补发历史事件,保证界面状态一致 history = task.history_since(last_event_id) for event in history: yield _format_sse(event) + if event.get('type') != 'heartbeat': + last_data_ts = time.time() - finished = task.status in ("completed", "error", "cancelled") + finished = task.status in STREAM_TERMINAL_STATUSES while True: if finished: break + if client_disconnected(): + logger.info(f"SSE客户端已断开,停止推送: {task_id}") + break + event = None try: event = queue.get(timeout=STREAM_HEARTBEAT_INTERVAL) - yield _format_sse(event) - if event.get('type') in ("completed", "error"): - finished = True except Empty: + if task.status in STREAM_TERMINAL_STATUSES: + logger.info(f"任务 {task_id} 已结束且无新事件,SSE自动收口") + break heartbeat = { 'id': f"hb-{int(time.time() * 1000)}", 'type': 'heartbeat', @@ -749,8 +771,37 @@ def stream_task(task_id: str): 'timestamp': datetime.utcnow().isoformat() + 'Z', 'payload': {'status': task.status} } - yield _format_sse(heartbeat) - finished = task.status in ("completed", "error", "cancelled") + event = heartbeat + if event is None: + logger.warning(f"SSE推送获取事件失败(task {task_id}),提前结束") + break + + try: + yield _format_sse(event) + if event.get('type') != 'heartbeat': + last_data_ts = time.time() + except GeneratorExit: + logger.info(f"SSE生成器关闭,停止任务 {task_id} 推送") + break + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError) as exc: + logger.warning(f"SSE连接被客户端中断(task {task_id}): {exc}") + break + except Exception as exc: + event_type = event.get('type') if isinstance(event, dict) else 'unknown' + logger.exception(f"SSE推送失败(task {task_id}, event {event_type}): {exc}") + break + + if event.get('type') in ("completed", "error", "cancelled"): + finished = True + else: + finished = finished or task.status in STREAM_TERMINAL_STATUSES + + # 终态下最多保活一段时间,防止前端早已结束但后台循环未退出 + if task.status in STREAM_TERMINAL_STATUSES: + idle_for = time.time() - last_data_ts + if idle_for > STREAM_IDLE_TIMEOUT: + logger.info(f"任务 {task_id} 已终态且空闲 {int(idle_for)}s,主动关闭SSE") + break finally: _unregister_stream(task_id, queue) diff --git a/app.py b/app.py index c60f3e2..da8abba 100644 --- a/app.py +++ b/app.py @@ -36,6 +36,39 @@ app = Flask(__name__) app.config['SECRET_KEY'] = 'Dedicated-to-creating-a-concise-and-versatile-public-opinion-analysis-platform' socketio = SocketIO(app, cors_allowed_origins="*") +# eventlet 在客户端主动断开时偶尔会抛出 ConnectionAbortedError,这里做一次防御性包裹, +# 避免无意义的堆栈污染日志(仅在 eventlet 可用时启用)。 +def _patch_eventlet_disconnect_logging(): + try: + import eventlet.wsgi # type: ignore + except Exception as exc: # pragma: no cover - 仅在生产环境有效 + logger.debug(f"eventlet 不可用,跳过断开补丁: {exc}") + return + + try: + original_finish = eventlet.wsgi.HttpProtocol.finish # type: ignore[attr-defined] + except Exception as exc: # pragma: no cover + logger.debug(f"eventlet 缺少 HttpProtocol.finish,跳过断开补丁: {exc}") + return + + def _safe_finish(self, *args, **kwargs): # pragma: no cover - 运行时才会触发 + try: + return original_finish(self, *args, **kwargs) + except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError) as exc: + try: + environ = getattr(self, 'environ', {}) or {} + method = environ.get('REQUEST_METHOD', '') + path = environ.get('PATH_INFO', '') + logger.warning(f"客户端已主动断开,忽略异常: {method} {path} ({exc})") + except Exception: + logger.warning(f"客户端已主动断开,忽略异常: {exc}") + return + + eventlet.wsgi.HttpProtocol.finish = _safe_finish # type: ignore[attr-defined] + logger.info("已对 eventlet 连接中断进行安全防护") + +_patch_eventlet_disconnect_logging() + # 注册ReportEngine Blueprint if REPORT_ENGINE_AVAILABLE: app.register_blueprint(report_bp, url_prefix='/api/report') @@ -1113,4 +1146,4 @@ if __name__ == '__main__': logger.info("\n正在关闭应用...") cleanup_processes() - \ No newline at end of file + diff --git a/templates/index.html b/templates/index.html index 68e547f..ef7d3dc 100644 --- a/templates/index.html +++ b/templates/index.html @@ -2562,6 +2562,7 @@ // 切换应用 function switchToApp(app) { if (app === currentApp) return; + const previousApp = currentApp; // 检查Report Engine是否被锁定 if (app === 'report') { @@ -2636,6 +2637,12 @@ } else { // 【修复】切换离开Report Engine时停止日志刷新,节省资源 reportLogManager.stop(); + + // 离开Report且无任务运行时,关闭SSE避免后台悬挂 + if (previousApp === 'report' && !reportTaskId && reportEventSource) { + safeCloseReportStream(true); + stopProgressPolling(); + } } }