Fixes Situations Where TCP Connections Might be Refused

This commit is contained in:
马一丁
2025-11-22 02:06:53 +08:00
parent 56b6bdd1b4
commit a22be6d7dd
3 changed files with 98 additions and 7 deletions
+57 -6
View File
@@ -34,6 +34,8 @@ task_lock = threading.Lock()
# 通过有界deque缓存最近的事件,方便SSE断线后快速补发
MAX_TASK_HISTORY = 5
STREAM_HEARTBEAT_INTERVAL = 15 # 心跳间隔秒
STREAM_IDLE_TIMEOUT = 120 # 终态后最长保活时间,避免孤儿SSE阻塞
STREAM_TERMINAL_STATUSES = {"completed", "error", "cancelled"}
stream_lock = threading.Lock()
stream_subscribers = defaultdict(list)
tasks_registry: Dict[str, 'ReportTask'] = {}
@@ -717,6 +719,19 @@ def stream_task(task_id: str):
except ValueError:
last_event_id = None
def client_disconnected() -> bool:
"""
尽早探测客户端是否已经断开,避免继续写入触发BrokenPipe。
eventlet 在 Windows 上会在关闭连接时抛出 ConnectionAbortedError
提前退出生成器可以缩减无意义的日志。
"""
try:
env_input = request.environ.get('wsgi.input')
return bool(getattr(env_input, 'closed', False))
except Exception:
return False
def event_generator():
"""
SSE事件生成器。
@@ -726,22 +741,29 @@ def stream_task(task_id: str):
- 周期性发送心跳并在任务结束后自动注销监听。
"""
queue = _register_stream(task_id)
last_data_ts = time.time()
try:
# 断线重连场景下,先补发历史事件,保证界面状态一致
history = task.history_since(last_event_id)
for event in history:
yield _format_sse(event)
if event.get('type') != 'heartbeat':
last_data_ts = time.time()
finished = task.status in ("completed", "error", "cancelled")
finished = task.status in STREAM_TERMINAL_STATUSES
while True:
if finished:
break
if client_disconnected():
logger.info(f"SSE客户端已断开,停止推送: {task_id}")
break
event = None
try:
event = queue.get(timeout=STREAM_HEARTBEAT_INTERVAL)
yield _format_sse(event)
if event.get('type') in ("completed", "error"):
finished = True
except Empty:
if task.status in STREAM_TERMINAL_STATUSES:
logger.info(f"任务 {task_id} 已结束且无新事件,SSE自动收口")
break
heartbeat = {
'id': f"hb-{int(time.time() * 1000)}",
'type': 'heartbeat',
@@ -749,8 +771,37 @@ def stream_task(task_id: str):
'timestamp': datetime.utcnow().isoformat() + 'Z',
'payload': {'status': task.status}
}
yield _format_sse(heartbeat)
finished = task.status in ("completed", "error", "cancelled")
event = heartbeat
if event is None:
logger.warning(f"SSE推送获取事件失败(task {task_id}),提前结束")
break
try:
yield _format_sse(event)
if event.get('type') != 'heartbeat':
last_data_ts = time.time()
except GeneratorExit:
logger.info(f"SSE生成器关闭,停止任务 {task_id} 推送")
break
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError) as exc:
logger.warning(f"SSE连接被客户端中断(task {task_id}: {exc}")
break
except Exception as exc:
event_type = event.get('type') if isinstance(event, dict) else 'unknown'
logger.exception(f"SSE推送失败(task {task_id}, event {event_type}: {exc}")
break
if event.get('type') in ("completed", "error", "cancelled"):
finished = True
else:
finished = finished or task.status in STREAM_TERMINAL_STATUSES
# 终态下最多保活一段时间,防止前端早已结束但后台循环未退出
if task.status in STREAM_TERMINAL_STATUSES:
idle_for = time.time() - last_data_ts
if idle_for > STREAM_IDLE_TIMEOUT:
logger.info(f"任务 {task_id} 已终态且空闲 {int(idle_for)}s,主动关闭SSE")
break
finally:
_unregister_stream(task_id, queue)