87ead4fa6a
- 对话区域: st.file_uploader + 全局 paste/drop 事件监听 + sessionStorage 桥接 - 文件预览芯片: 上传后显示在对话区域,可逐文件移除 - OCR 双层解析全面接入: file_parser(文字) + ocr_extractor(字段提取) - XLSX 解析: openpyxl 逐工作表/逐行读取 - 修复: create_session 强制写入 agent_state.session_id - 修复: load_session_node 不再从磁盘覆盖 session_id - 修复: 切换会话 _last_switched_to 哨兵防止无限 rerun Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
245 lines
7.4 KiB
Python
245 lines
7.4 KiB
Python
"""文件解析器:将上传文件转为文本,供 LLM 处理。
|
||
|
||
支持:
|
||
- 图片 (.png/.jpg/.jpeg/.bmp) → OCR 提取文本
|
||
- PDF (.pdf) → 文本提取
|
||
- Word (.docx) → 文本提取
|
||
- 纯文本 (.txt/.csv/.json/.xml) → 直接读取
|
||
|
||
策略选择:
|
||
- 原生多模态: 模型支持图片时直接传文件(当前 MiniMax 不支持,自动退回文本转换)
|
||
- 文本转换: 所有文件转为 UTF-8 文本后注入 prompt
|
||
"""
|
||
|
||
import os
|
||
import io
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import PIL.Image
|
||
|
||
MODELS_WITH_VISION = {
|
||
"gpt-4o", "gpt-4-turbo", "gpt-4-vision-preview",
|
||
"claude-3", "claude-3.5", "claude-4",
|
||
"gemini-1.5", "gemini-2",
|
||
}
|
||
|
||
|
||
def can_use_vision(model: str = "") -> bool:
|
||
"""检查当前模型是否支持原生多模态(图片直接上传)。"""
|
||
if not model:
|
||
model = os.getenv("LLM_MODEL", "")
|
||
return any(v in model.lower() for v in MODELS_WITH_VISION)
|
||
|
||
|
||
def parse_file(file_path: str, file_type: str = "") -> dict:
|
||
"""解析任意文件为文本。
|
||
|
||
返回: {"text": str, "file_type": str, "method": str, "error": Optional[str]}
|
||
"""
|
||
path = Path(file_path)
|
||
if not path.exists():
|
||
return {"text": "", "file_type": file_type, "method": "none", "error": "文件不存在"}
|
||
|
||
suffix = file_type or path.suffix.lower()
|
||
|
||
parsers = {
|
||
".png": _parse_image,
|
||
".jpg": _parse_image,
|
||
".jpeg": _parse_image,
|
||
".bmp": _parse_image,
|
||
".webp": _parse_image,
|
||
".pdf": _parse_pdf,
|
||
".docx": _parse_docx,
|
||
".xlsx": _parse_xlsx,
|
||
}
|
||
|
||
parser = parsers.get(suffix)
|
||
if parser:
|
||
return parser(path)
|
||
else:
|
||
return _parse_text(path)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 各类型解析器
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_image(path: Path) -> dict:
|
||
"""OCR 提取图片中的文字。优先 EasyOCR,回退 PaddleOCR。"""
|
||
try:
|
||
img = PIL.Image.open(path)
|
||
info = f"[图片: {img.size[0]}x{img.size[1]}, {img.mode}]"
|
||
except Exception:
|
||
info = "[图片: 无法读取元数据]"
|
||
|
||
# 优先 EasyOCR(Windows 兼容性更好)
|
||
try:
|
||
import easyocr
|
||
import numpy as np
|
||
reader = easyocr.Reader(["ch_sim", "en"], gpu=False, verbose=False)
|
||
result = reader.readtext(np.array(img))
|
||
lines = [text.strip() for (_, text, _) in result if text.strip()]
|
||
if lines:
|
||
return {
|
||
"text": f"{info}\n识别文本:\n" + "\n".join(lines),
|
||
"file_type": "image",
|
||
"method": "easyocr",
|
||
"error": None,
|
||
}
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 回退 PaddleOCR
|
||
try:
|
||
from paddleocr import PaddleOCR
|
||
ocr = PaddleOCR(lang="ch")
|
||
result = ocr.ocr(str(path))
|
||
lines = []
|
||
if result and result[0]:
|
||
for line in result[0]:
|
||
text = line[1][0] if len(line) > 1 else ""
|
||
if text.strip():
|
||
lines.append(text.strip())
|
||
if lines:
|
||
return {
|
||
"text": f"{info}\n识别文本:\n" + "\n".join(lines),
|
||
"file_type": "image",
|
||
"method": "paddleocr",
|
||
"error": None,
|
||
}
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# OCR 不可用 → 返回图片元信息 + 安装提示
|
||
return {
|
||
"text": f"{info}\n(如需 OCR 文字识别,请安装: pip install easyocr)",
|
||
"file_type": "image",
|
||
"method": "metadata_only",
|
||
"error": "OCR 引擎未安装,已返回图片元信息",
|
||
}
|
||
|
||
|
||
def _parse_pdf(path: Path) -> dict:
|
||
"""提取 PDF 中的文本。"""
|
||
try:
|
||
import pdfplumber
|
||
with pdfplumber.open(path) as pdf:
|
||
pages = []
|
||
for page in pdf.pages:
|
||
text = page.extract_text()
|
||
if text:
|
||
pages.append(text)
|
||
full = "\n\n".join(pages)
|
||
return {
|
||
"text": full,
|
||
"file_type": "pdf",
|
||
"method": "pdfplumber",
|
||
"error": None,
|
||
}
|
||
except ImportError:
|
||
pass
|
||
except Exception as e:
|
||
pass
|
||
|
||
# Fallback: 尝试 PyMuPDF
|
||
try:
|
||
import fitz
|
||
doc = fitz.open(path)
|
||
pages = []
|
||
for page in doc:
|
||
pages.append(page.get_text())
|
||
doc.close()
|
||
return {
|
||
"text": "\n\n".join(pages),
|
||
"file_type": "pdf",
|
||
"method": "pymupdf",
|
||
"error": None,
|
||
}
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
return {"text": "", "file_type": "pdf", "method": "none",
|
||
"error": "PDF 解析需要安装 pdfplumber 或 PyMuPDF"}
|
||
|
||
|
||
def _parse_docx(path: Path) -> dict:
|
||
"""提取 Word 文档中的文本。"""
|
||
try:
|
||
from docx import Document
|
||
doc = Document(path)
|
||
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
|
||
# 同时提取表格内容
|
||
for table in doc.tables:
|
||
for row in table.rows:
|
||
cells = [cell.text for cell in row.cells if cell.text.strip()]
|
||
if cells:
|
||
paragraphs.append(" | ".join(cells))
|
||
return {
|
||
"text": "\n\n".join(paragraphs),
|
||
"file_type": "docx",
|
||
"method": "python-docx",
|
||
"error": None,
|
||
}
|
||
except ImportError:
|
||
pass
|
||
except Exception as e:
|
||
pass
|
||
|
||
return {"text": "", "file_type": "docx", "method": "none",
|
||
"error": "DOCX 解析需要安装 python-docx"}
|
||
|
||
|
||
def _parse_xlsx(path: Path) -> dict:
|
||
"""提取 Excel (.xlsx) 表格内容为文本。"""
|
||
try:
|
||
import openpyxl
|
||
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
|
||
sheets_text = []
|
||
for sheet_name in wb.sheetnames:
|
||
ws = wb[sheet_name]
|
||
rows = []
|
||
for row in ws.iter_rows(values_only=True):
|
||
cells = [str(c) if c is not None else "" for c in row]
|
||
if any(c.strip() for c in cells):
|
||
rows.append(" | ".join(cells))
|
||
if rows:
|
||
sheets_text.append(f"--- 工作表: {sheet_name} ---\n" + "\n".join(rows))
|
||
wb.close()
|
||
if sheets_text:
|
||
return {
|
||
"text": "\n\n".join(sheets_text),
|
||
"file_type": "xlsx",
|
||
"method": "openpyxl",
|
||
"error": None,
|
||
}
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
return {"text": "", "file_type": "xlsx", "method": "none",
|
||
"error": "XLSX 解析需要安装 openpyxl"}
|
||
|
||
|
||
def _parse_text(path: Path) -> dict:
|
||
"""读取纯文本文件。"""
|
||
try:
|
||
text = path.read_text(encoding="utf-8")
|
||
return {"text": text, "file_type": path.suffix, "method": "direct", "error": None}
|
||
except UnicodeDecodeError:
|
||
try:
|
||
text = path.read_text(encoding="gbk")
|
||
return {"text": text, "file_type": path.suffix, "method": "direct_gbk", "error": None}
|
||
except Exception:
|
||
return {"text": "", "file_type": path.suffix, "method": "none",
|
||
"error": "无法解码文件"}
|
||
except Exception:
|
||
return {"text": "", "file_type": path.suffix, "method": "none",
|
||
"error": "读取失败"}
|