"""LangGraph JRXML 生成代理的状态图定义。""" import functools import os from typing import Literal from dotenv import load_dotenv from langgraph.graph import StateGraph, END from agent.state import AgentState from agent.nodes import ( load_session_node, process_input, manage_context, save_state_snapshot, classify_intent, retrieve, generate, generate_skeleton, refine_layout, map_fields, modify_jrxml, handle_consult, handle_undo, handle_reset, save_session_node, validate, explain_error, correct_jrxml, finalize, ) from backend.logger import get_logger load_dotenv(override=True) MAX_RETRY = int(os.getenv("MAX_RETRY", "5")) _graph_log = get_logger("agent") def _log_route(route_name: str): """装饰器:自动记录路由决策。""" def decorator(func): @functools.wraps(func) def wrapper(state: AgentState, *args, **kwargs): target = func(state, *args, **kwargs) _graph_log.info( f"[路由] {route_name} → {target}", extra={ "route": route_name, "target": target, "session_id": state.get("session_id", ""), "intent": state.get("intent", ""), "status": state.get("status", ""), "has_jrxml": bool(state.get("current_jrxml", "").strip()), "retry_count": state.get("retry_count", 0), }, ) return target return wrapper return decorator # ============================================================ # 路由函数 # ============================================================ @_log_route("route_by_intent") def route_by_intent(state: AgentState) -> Literal[ "retrieve", "modify_jrxml", "save_session", "handle_consult", "handle_undo", "handle_reset" ]: """根据 classify_intent 的结果路由到对应的处理节点。""" intent = state.get("intent", "initial_generation") if intent == "initial_generation": return "retrieve" elif intent == "modify_report": return "modify_jrxml" elif intent in ("preview_report", "export_pdf", "export_jrxml"): return "save_session" elif intent == "consult_question": return "handle_consult" elif intent == "undo_modification": return "handle_undo" elif intent == "reset_session": return "handle_reset" else: # 兜底:根据是否有报表判断 if state.get("current_jrxml"): return "modify_jrxml" return "retrieve" @_log_route("route_after_retrieve") def route_after_retrieve(state: AgentState) -> Literal["generate", "generate_skeleton"]: """当 layout_schema 存在时走三层精确生成,否则走原有 1-shot。""" layout_schema = state.get("layout_schema") if layout_schema and isinstance(layout_schema, dict) and layout_schema.get("total_rows", 0) > 0: return "generate_skeleton" return "generate" @_log_route("route_after_generate") def route_after_generate(state: AgentState) -> Literal["save_session"]: return "save_session" @_log_route("route_after_modify") def route_after_modify(state: AgentState) -> Literal["save_session"]: return "save_session" @_log_route("route_after_undo") def route_after_undo(state: AgentState) -> Literal["save_session"]: return "save_session" @_log_route("route_after_save") def route_after_save(state: AgentState) -> Literal["validate", "finalize"]: # 预览/导出意图跳过验证,直接完成 intent = state.get("intent", "") if intent in ("preview_report", "export_pdf", "export_jrxml"): return "finalize" # JRXML 为空时跳过验证/修正循环(生成失败等场景) if not state.get("current_jrxml", "").strip(): return "finalize" return "validate" @_log_route("route_after_validate") def route_after_validate(state: AgentState) -> Literal["finalize", "explain_error"]: if state.get("status") == "pass": return "finalize" # JRXML 为空时跳过 explain→correct 修正循环 if not state.get("current_jrxml", "").strip(): return "finalize" # 验证服务不可用时跳过修正循环,避免对网络错误进行无效修正 if state.get("service_unavailable"): return "finalize" return "explain_error" @_log_route("route_after_explain") def route_after_explain(state: AgentState) -> Literal["correct_jrxml"]: return "correct_jrxml" @_log_route("route_after_correct") def route_after_correct(state: AgentState) -> Literal["validate", "finalize"]: retry = state.get("retry_count", 0) if retry >= MAX_RETRY: return "finalize" return "validate" # ============================================================ # 图构建 # ============================================================ def build_graph(on_node_start=None) -> StateGraph: """构建 LangGraph 状态图。 Args: on_node_start: 可选回调,在每个节点开始执行时调用。 签名: on_node_start(node_name: str) -> None 用于 SSE 流式推送 node_start 事件。 """ workflow = StateGraph(AgentState) def _wrap(name, fn): """包装节点函数,在开始执行时触发 on_node_start 回调。""" if on_node_start is None: return fn @functools.wraps(fn) def wrapped(state, *args, **kwargs): on_node_start(name) return fn(state, *args, **kwargs) return wrapped # 现有节点 workflow.add_node("load_session", _wrap("load_session", load_session_node)) workflow.add_node("process_input", _wrap("process_input", process_input)) workflow.add_node("manage_context", _wrap("manage_context", manage_context)) workflow.add_node("save_session", _wrap("save_session", save_session_node)) workflow.add_node("retrieve", _wrap("retrieve", retrieve)) workflow.add_node("generate", _wrap("generate", generate)) workflow.add_node("modify_jrxml", _wrap("modify_jrxml", modify_jrxml)) workflow.add_node("validate", _wrap("validate", validate)) workflow.add_node("explain_error", _wrap("explain_error", explain_error)) workflow.add_node("correct_jrxml", _wrap("correct_jrxml", correct_jrxml)) workflow.add_node("finalize", _wrap("finalize", finalize)) # 新增节点:意图识别 workflow.add_node("save_state_snapshot", _wrap("save_state_snapshot", save_state_snapshot)) workflow.add_node("classify_intent", _wrap("classify_intent", classify_intent)) workflow.add_node("handle_consult", _wrap("handle_consult", handle_consult)) workflow.add_node("handle_undo", _wrap("handle_undo", handle_undo)) workflow.add_node("handle_reset", _wrap("handle_reset", handle_reset)) # 新增节点:分层精确生成(阶段一~三) workflow.add_node("generate_skeleton", _wrap("generate_skeleton", generate_skeleton)) workflow.add_node("refine_layout", _wrap("refine_layout", refine_layout)) workflow.add_node("map_fields", _wrap("map_fields", map_fields)) # ---- 入口和前置流程 ---- workflow.set_entry_point("load_session") workflow.add_edge("load_session", "process_input") workflow.add_edge("process_input", "manage_context") workflow.add_edge("manage_context", "save_state_snapshot") workflow.add_edge("save_state_snapshot", "classify_intent") # ---- 意图路由 ---- workflow.add_conditional_edges( "classify_intent", route_by_intent, { "retrieve": "retrieve", "modify_jrxml": "modify_jrxml", "save_session": "save_session", "handle_consult": "handle_consult", "handle_undo": "handle_undo", "handle_reset": "handle_reset", }, ) # ---- 初始生成分支 ---- workflow.add_conditional_edges( "retrieve", route_after_retrieve, { "generate": "generate", "generate_skeleton": "generate_skeleton", }, ) # 原有 1-shot 路径 workflow.add_conditional_edges( "generate", route_after_generate, {"save_session": "save_session"}, ) # 分层精确生成 3 阶段路径 workflow.add_edge("generate_skeleton", "refine_layout") workflow.add_edge("refine_layout", "map_fields") workflow.add_conditional_edges( "map_fields", route_after_generate, {"save_session": "save_session"}, ) # ---- 修改分支 ---- workflow.add_conditional_edges( "modify_jrxml", route_after_modify, {"save_session": "save_session"}, ) # ---- 撤销分支 ---- workflow.add_conditional_edges( "handle_undo", route_after_undo, {"save_session": "save_session"}, ) # ---- 保存后进入验证 ---- workflow.add_conditional_edges( "save_session", route_after_save, {"validate": "validate", "finalize": "finalize"}, ) # ---- 验证 → 修正循环 ---- workflow.add_conditional_edges( "validate", route_after_validate, {"finalize": "finalize", "explain_error": "explain_error"}, ) workflow.add_conditional_edges( "explain_error", route_after_explain, {"correct_jrxml": "correct_jrxml"}, ) workflow.add_conditional_edges( "correct_jrxml", route_after_correct, {"validate": "validate", "finalize": "finalize"}, ) # ---- 咨询 / 重置 → 直接结束 ---- workflow.add_edge("handle_consult", "finalize") workflow.add_edge("handle_reset", "finalize") # ---- 结束 ---- workflow.add_edge("finalize", END) return workflow.compile() # ============================================================ # 初始状态 # ============================================================ def create_initial_state() -> AgentState: return AgentState( conversation_history=[], current_jrxml="", user_input="", status="", error_msg="", natural_explanation="", retry_count=0, user_modification_request="", final_jrxml="", stage="initial_generation", retrieved_context="", full_conversation_history=[], compressed_history="", current_token_count=0, session_id="", session_name="", created_at="", updated_at="", intent="", history_states=[], jrxml_versions=[], last_error_case={}, pending_failure_context={}, layout_schema={}, ocr_elements=[], )