""" Jaspersoft E2E 测试脚本 用法: python scripts/run_e2e.py [--user-text "请根据图片生成结算单模板"] 输出: - tmp/e2e_events_{HHMMSS}.json 完整事件流 - tmp/e2e_log_{HHMMSS}.txt 节点日志 """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1) import requests, json, time, uuid from pathlib import Path BASE_URL = "http://localhost:8000" TEST_IMAGE = Path(__file__).parent.parent / "test_image.jpg" USER_TEXT = "请根据图片信息生成结算单模板" ts = time.strftime("%H%M%S") out_path = Path(__file__).parent.parent / "tmp" / f"e2e_events_{ts}.json" log_path = Path(__file__).parent.parent / "tmp" / f"e2e_log_{ts}.txt" out_path.parent.mkdir(parents=True, exist_ok=True) def log(msg): print(msg, flush=True) with open(log_path, "a", encoding="utf-8") as f: f.write(msg + "\n") def run(): log("=" * 60) log(f"E2E 测试开始 {time.strftime('%H:%M:%S')}") # 1. 创建会话 sid_resp = requests.post(f"{BASE_URL}/api/sessions", json={"session_id": "test"}, timeout=10) sid = sid_resp.json()["session_id"] log(f"[会话] {sid}") # 2. 上传图片 with open(TEST_IMAGE, "rb") as f: up_resp = requests.post( f"{BASE_URL}/api/upload", files={"file": ("test_image.jpg", f, "image/jpeg")}, data={"session_id": sid}, timeout=30, ) fid = up_resp.json()["file_id"] log(f"[上传] file_id={fid}") # 3. 发送对话 log(f"[对话] 开始 pipeline...") start = time.time() events = [] node_times = {} error_events = [] r = requests.post( f"{BASE_URL}/api/sessions/{sid}/chat", json={"text": USER_TEXT, "file_ids": [fid]}, stream=True, timeout=600, ) log(f"[状态] HTTP {r.status_code}") for line in r.iter_lines(): if not line: continue line = line.decode("utf-8", errors="replace") if line.startswith("data:"): try: data = json.loads(line[5:].strip()) events.append(data) evt = data.get("event", "") d = data.get("data", {}) node = d.get("node", "") if evt == "node_start": node_times.setdefault(node, {"start": time.time() - start, "complete": None}) log(f" [开始] {node}") if evt == "node_complete": if node in node_times and node_times[node]["complete"] is None: dur = time.time() - start - node_times[node]["start"] node_times[node]["complete"] = time.time() - start detail = d.get("detail", "")[:80] log(f" [完成] {node} ({dur:.1f}s) — {detail}") if evt == "error": msg = d.get("message", str(data))[:200] log(f" [错误] {msg}") error_events.append(data) if evt == "result": result = data.get("data", {}) elapsed = time.time() - start jrxml = result.get("jrxml", "") log(f"\n{'='*50}") log(f"[完成] 耗时 {elapsed:.1f}s") log(f" status: {result.get('status', 'N/A')}") log(f" jrxml_length: {len(jrxml)}") log(f" error: {result.get('error', 'None')[:200]}") if evt == "done": log(f"\n[SSE Done]") except json.JSONDecodeError: pass elapsed_total = time.time() - start log(f"\n总耗时: {elapsed_total:.1f}s") log(f"共 {len(events)} 个事件,{len(node_times)} 个节点,{len(error_events)} 个错误") # 保存 with open(out_path, "w", encoding="utf-8") as f: json.dump({ "session_id": sid, "elapsed": elapsed_total, "events": events, "node_times": node_times, "error_events": error_events, }, f, ensure_ascii=False, indent=2) log(f"事件已保存: {out_path}") log(f"日志已保存: {log_path}") if __name__ == "__main__": run()