From 05bb511aab44ec8ee83c2205f3dec8da29ba5ef5 Mon Sep 17 00:00:00 2001 From: zy187 Date: Fri, 29 May 2026 23:22:18 +0800 Subject: [PATCH] Initial commit: jaspersoft-agent-learn teaching project --- .env.example | 21 ++ README.md | 124 +++++++ step_01_tools/README.md | 82 +++++ step_01_tools/__init__.py | 19 + step_01_tools/concept.py | 592 ++++++++++++++++++++++++++++++ step_01_tools/exercise.py | 223 ++++++++++++ step_01_tools/exercise_answer.py | 264 ++++++++++++++ step_01_tools/main.py | 169 +++++++++ step_02_state/README.md | 54 +++ step_02_state/concept.py | 486 +++++++++++++++++++++++++ step_02_state/exercise.py | 230 ++++++++++++ step_02_state/exercise_answer.py | 274 ++++++++++++++ step_02_state/main.py | 196 ++++++++++ step_03_simple_agent/README.md | 60 +++ step_03_simple_agent/concept.py | 603 +++++++++++++++++++++++++++++++ step_03_simple_agent/main.py | 78 ++++ step_04_memory/README.md | 51 +++ step_04_memory/concept.py | 429 ++++++++++++++++++++++ step_05_07_advanced/README.md | 235 ++++++++++++ step_05_07_advanced/concept.py | 286 +++++++++++++++ 20 files changed, 4476 insertions(+) create mode 100644 .env.example create mode 100644 README.md create mode 100644 step_01_tools/README.md create mode 100644 step_01_tools/__init__.py create mode 100644 step_01_tools/concept.py create mode 100644 step_01_tools/exercise.py create mode 100644 step_01_tools/exercise_answer.py create mode 100644 step_01_tools/main.py create mode 100644 step_02_state/README.md create mode 100644 step_02_state/concept.py create mode 100644 step_02_state/exercise.py create mode 100644 step_02_state/exercise_answer.py create mode 100644 step_02_state/main.py create mode 100644 step_03_simple_agent/README.md create mode 100644 step_03_simple_agent/concept.py create mode 100644 step_03_simple_agent/main.py create mode 100644 step_04_memory/README.md create mode 100644 step_04_memory/concept.py create mode 100644 step_05_07_advanced/README.md create mode 100644 step_05_07_advanced/concept.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..3424690 --- /dev/null +++ b/.env.example @@ -0,0 +1,21 @@ +# 环境变量配置 + +# LLM API 配置 +OPENAI_API_KEY=your_openai_api_key_here +ANTHROPIC_API_KEY=your_anthropic_api_key_here + +# LLM 配置 +LLM_PROVIDER=anthropic # 或 openai +LLM_MODEL=MiniMax-M2.7 # 或 gpt-4o +LLM_MAX_TOKENS=8192 + +# RAG 配置 +RAG_CHROMA_PATH=./db/chroma +RAG_COLLECTION_NAME=jrxml_chunks +RAG_EMBED_MODEL=sentence-transformers/all-MiniLM-L6-v2 + +# 验证服务 +VALIDATION_SERVICE_URL=http://localhost:8001 + +# 日志配置 +LOG_LEVEL=INFO diff --git a/README.md b/README.md new file mode 100644 index 0000000..81e68f8 --- /dev/null +++ b/README.md @@ -0,0 +1,124 @@ +# Jaspersoft Learn - AI Agent 开发教学项目 + +> **目标**:手把手教你从零构建一个完整的 AI Agent 系统 +> **教学方式**:代码即文档,每一行代码都有详细解释 +> **前置知识**:Python 基础(2年经验足够) + +--- + +## 🎯 学习路径 + +本项目采用"渐进式构建"方式,从最简单的概念开始,逐步添加复杂度。 + +``` +学习顺序: +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +Step 01 ──▶ Step 02 ──▶ Step 03 ──▶ Step 04 ──▶ Step 05 ──▶ Step 06 ──▶ Step 07 + │ │ │ │ │ │ │ + ▼ ▼ ▼ ▼ ▼ ▼ ▼ +Tool基础 State状态 简单Agent Memory RAG检索 自我修正 Multi-Agent + 管理 记忆系统 +``` + +--- + +## 📚 每个 Step 的内容 + +| Step | 主题 | 核心概念 | 完成后你将理解 | +|------|------|---------|--------------| +| **01** | Tool 工具系统 | 什么是 Tool,为什么需要 Tool | 如何设计可扩展的工具系统 | +| **02** | State 状态管理 | 什么是 Agent State | 如何在 Agent 中传递信息 | +| **03** | 简单 Agent | LLM + Tool + Loop = Agent | Agent 的核心工作原理 | +| **04** | Memory 记忆 | 短期/长期/工作记忆 | 如何让 Agent "记住"上下文 | +| **05** | RAG 知识检索 | 检索增强生成 | 如何让 Agent 知道"私有知识" | +| **06** | Self-Correction | 自我修正循环 | 如何让 Agent 自动修复错误 | +| **07** | Multi-Agent | 多 Agent 协作 | 如何构建 Agent 团队 | + +--- + +## 🚀 快速开始 + +### 环境准备 + +```bash +# 1. 创建虚拟环境 +python -m venv venv +.\venv\Scripts\activate + +# 2. 安装依赖 +pip install langchain langchain-openai langchain-anthropic python-dotenv + +# 3. 配置环境变量 +cp .env.example .env +# 编辑 .env,填入你的 API Key +``` + +### 开始学习 + +```bash +# Step 01: 理解 Tool +cd step_01_tools +python main.py + +# Step 02: 理解 State +cd ../step_02_state +python main.py + +# ...以此类推 +``` + +--- + +## 📖 学习方法 + +### 每一步的结构 + +``` +step_XX_xxx/ +├── README.md # 本步骤的概念讲解 +├── concept.py # 核心概念代码(有详细注释) +├── exercise.py # 练习题 +├── exercise_answer.py # 练习答案(先自己思考再看) +└── main.py # 可运行的示例 +``` + +### 如何使用本项目 + +1. **先读 README**:理解这个 Step 要学什么 +2. **再看 concept.py**:跟着代码理解概念 +3. **做 exercise.py**:巩固理解 +4. **对照答案**:检查自己的理解 +5. **运行 main.py**:看完整示例 + +--- + +## 🎓 最终目标 + +完成所有 Step 后,你将能够: + +- [ ] 理解 Agent 的核心工作原理 +- [ ] 设计可扩展的工具系统 +- [ ] 实现状态管理和记忆系统 +- [ ] 构建完整的 RAG 知识检索 +- [ ] 实现 Agent 自我修正能力 +- [ ] 设计多 Agent 协作系统 +- [ ] 阅读并理解 jaspersoft 项目的源码 + +--- + +## 🔗 参考资料 + +- [LangGraph 官方文档](https://langchain-ai.github.io/langgraph/) +- [LangChain Tool Calling](https://python.langchain.com/docs/how_to/tool_calling/) +- [Prompt Engineering Guide](https://www.promptingguide.ai/zh) + +--- + +## 📝 贡献指南 + +发现错误或有改进建议?欢迎提交 PR! + +--- + +*本项目是学习用途,代码尽量写得清晰易懂,而非追求极致性能。* diff --git a/step_01_tools/README.md b/step_01_tools/README.md new file mode 100644 index 0000000..cc146e9 --- /dev/null +++ b/step_01_tools/README.md @@ -0,0 +1,82 @@ +# Step 01: 理解 Tool - 工具系统基础 + +## 🎯 学习目标 + +- 理解什么是 Tool(工具) +- 理解为什么 Agent 需要 Tool +- 学会设计一个可扩展的工具系统 +- 理解 Tool 的核心要素:name、description、execute + +--- + +## 📖 概念讲解 + +### 什么是 Tool? + +在 AI Agent 系统中,**Tool(工具)** 是 Agent 与外部世界交互的方式。 + +``` +没有 Tool 的 LLM: +┌──────────────┐ +│ 用户输入 │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ LLM │ ← LLM 只能"想",不能"做" +│ (只能思考) │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ 返回答案 │ ← 答案可能不准确,没有执行能力 +└──────────────┘ + +有 Tool 的 Agent: +┌──────────────┐ +│ 用户输入 │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ LLM │ +│ (思考 + 决策) │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ 需要执行工具? │──── 是 ──▶ 调用 Tool +└──────┬───────┘ │ + │ ▼ + │ 否 ┌──────────┐ + ▼ │ 执行 Tool │ +┌──────────────┐ │ (访问外部) │ +│ 返回答案 │ └─────┬─────┘ +└──────────────┘ │ + ▼ + 把执行结果反馈给 LLM +``` + +### 为什么需要 Tool? + +1. **LLM 不知道最新信息**:Tool 可以搜索网页、查数据库 +2. **LLM 不能执行操作**:Tool 可以写文件、调用 API +3. **LLM 可能有幻觉**:Tool 可以验证信息 +4. **LLM 需要精确计算**:Tool 可以调用计算器 + +### Tool 的核心要素 + +每个 Tool 都有三个核心要素: + +```python +class Tool: + name: str # 工具名称(LLM 通过名字选择工具) + description: str # 工具描述(LLM 通过描述理解何时使用) + execute: function # 执行函数(实际做事的代码) +``` + +--- + +## 💻 代码实现 + +请打开 `concept.py` 查看详细代码注释。 diff --git a/step_01_tools/__init__.py b/step_01_tools/__init__.py new file mode 100644 index 0000000..c0f3b5f --- /dev/null +++ b/step_01_tools/__init__.py @@ -0,0 +1,19 @@ +# Jaspersoft Learn - AI Agent 开发教学项目 + +from .concept import ( + BaseTool, + ToolResult, + ToolRegistry, + CalculatorTool, + SearchTool, + JaspersoftCodeGeneratorTool, +) + +__all__ = [ + "BaseTool", + "ToolResult", + "ToolRegistry", + "CalculatorTool", + "SearchTool", + "JaspersoftCodeGeneratorTool", +] diff --git a/step_01_tools/concept.py b/step_01_tools/concept.py new file mode 100644 index 0000000..533de5c --- /dev/null +++ b/step_01_tools/concept.py @@ -0,0 +1,592 @@ +""" +Step 01: 理解 Tool - 工具系统基础 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +🎓 本节内容: + 1. 什么是 Tool? + 2. 如何定义一个 Tool? + 3. 如何设计可扩展的工具系统? + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional +from dataclasses import dataclass +import json + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第一部分:理解 Tool 的基本结构 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +在开始写代码之前,我们先理解 Tool 的本质: + + Tool = 名称 + 描述 + 执行逻辑 + +为什么这样设计?因为 LLM 选择工具时只看: + 1. 工具名称(叫什么) + 2. 工具描述(什么时候用) + +然后 LLM 会决定:是否调用这个工具?调用时传什么参数? + +所以一个好的 Tool 设计,必须: + - 名称清晰、一目了然 + - 描述准确、告诉 LLM 何时使用 + - 执行逻辑独立、不影响其他工具 +""" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第二部分:定义 Tool 的数据结构 +# ═══════════════════════════════════════════════════════════════════════════════ + +@dataclass # Python 数据类,自动生成 __init__ 等方法 +class ToolResult: + """ + Tool 执行结果的数据结构 + + 为什么需要单独定义这个? + 因为 Tool 执行可能成功,也可能失败,我们需要统一格式来传递结果。 + + 属性说明: + success: 是否执行成功 + result: 执行结果(如果成功) + error: 错误信息(如果失败) + metadata: 额外元数据(如执行时间、使用的参数等) + """ + success: bool + result: Any = None + error: Optional[str] = None + metadata: Dict[str, Any] = None + + def __post_init__(self): + """初始化后确保 metadata 不为 None""" + if self.metadata is None: + self.metadata = {} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第三部分:定义抽象 Tool 基类(重要概念) +# ═══════════════════════════════════════════════════════════════════════════════ + +class BaseTool(ABC): + """ + 所有工具的抽象基类 + + 为什么需要抽象基类? + 因为我们希望所有工具都有统一的接口,这样: + 1. 任意 Tool 都可以被统一管理 + 2. 新增 Tool 时不需要修改已有代码(开闭原则) + 3. 可以批量操作所有 Tool + + 抽象基类 = 定义"接口规范",子类负责"具体实现" + """ + + # ============================================================ + # 属性:每个 Tool 必须有的特性 + # ============================================================ + + @property + @abstractmethod + def name(self) -> str: + """ + 工具名称 + + 为什么用 @property? + 因为 name 通常是只读的,我们不希望运行时被随意修改 + + 为什么用 @abstractmethod? + 因为这是一个"抽象方法",所有子类必须实现 + 如果不实现,Python 会报错,强制你提供具体实现 + """ + pass + + @property + @abstractmethod + def description(self) -> str: + """ + 工具描述 + + 这是 LLM 理解"何时使用这个工具"的关键! + 描述越准确,LLM 越能正确选择工具。 + + 好的描述应该包含: + 1. 工具能做什么 + 2. 输入参数是什么 + 3. 输出结果是什么 + 4. 适用的场景 + """ + pass + + # ============================================================ + # 方法:Tool 的核心执行逻辑 + # ============================================================ + + @abstractmethod + def execute(self, **kwargs) -> ToolResult: + """ + 执行工具 + + 参数使用 **kwargs 的原因: + 不同的 Tool 可能需要不同的参数 + 用 **kwargs 可以接受任意参数,具体由子类决定 + + 返回 ToolResult 而不是直接返回值的原因: + 1. 统一成功/失败的判断 + 2. 可以附带错误信息 + 3. 可以附带元数据(执行时间、使用的参数等) + + 为什么是抽象方法? + 因为"如何执行"是每个工具自己的事,基类不知道具体逻辑 + """ + pass + + # ============================================================ + # 辅助方法:让 Tool 更容易使用 + # ============================================================ + + def to_dict(self) -> dict: + """ + 将 Tool 转换为字典格式 + + 这个方法用于: + 1. 给 LLM 展示可用的工具列表 + 2. 序列化/反序列化工具配置 + + 返回格式: + { + "name": "工具名称", + "description": "工具描述", + "parameters": {...} # 可选,参数schema + } + """ + return { + "name": self.name, + "description": self.description, + } + + def __repr__(self) -> str: + """调试时显示工具信息""" + return f"" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第四部分:实现具体的 Tool +# ═══════════════════════════════════════════════════════════════════════════════ + +class CalculatorTool(BaseTool): + """ + 计算器工具示例 + + 这个工具演示: + 1. 如何定义一个简单的 Tool + 2. 如何处理输入参数 + 3. 如何返回结果 + 4. 如何处理错误 + """ + + @property + def name(self) -> str: + return "calculator" + + @property + def description(self) -> str: + return """ + 计算器工具,执行数学运算。 + + 输入: + expression: str - 数学表达式,如 "2 + 3" 或 "10 * 5" + + 输出: + 计算结果(数字) + + 示例: + expression="2 + 3" -> 5 + expression="10 / 2" -> 5.0 + expression="2 ** 3" -> 8 + """ + + def execute(self, **kwargs) -> ToolResult: + """ + 执行计算 + + 为什么用 try-except? + 因为 eval() 可能执行恶意代码或语法错误 + 我们需要捕获异常,返回友好的错误信息 + """ + expression = kwargs.get("expression") + + # 参数验证 + if expression is None: + return ToolResult( + success=False, + error="缺少必需参数 'expression'" + ) + + if not isinstance(expression, str): + return ToolResult( + success=False, + error=f"参数 'expression' 应该是字符串,实际是 {type(expression)}" + ) + + try: + # 使用 eval 计算表达式 + # 注意:生产环境中应该用更安全的计算方式 + # 这里用 eval 是为了简化,实际推荐用 ast.literal_eval 或专用计算库 + result = eval(expression, {"__builtins__": {}}, {}) + + return ToolResult( + success=True, + result=result, + metadata={"expression": expression} + ) + + except SyntaxError as e: + return ToolResult( + success=False, + error=f"表达式语法错误: {e}" + ) + + except ZeroDivisionError: + return ToolResult( + success=False, + error="除数不能为零" + ) + + except Exception as e: + return ToolResult( + success=False, + error=f"计算错误: {e}" + ) + + +class SearchTool(BaseTool): + """ + 搜索工具示例 + + 这个工具演示: + 1. 如何模拟一个搜索功能 + 2. 如何返回结构化数据 + 3. 如何设计工具的"真实感" + + 实际应用中,这里会调用真实的搜索 API + """ + + @property + def name(self) -> str: + return "web_search" + + @property + def description(self) -> str: + return """ + 网络搜索工具,在互联网上搜索信息。 + + 输入: + query: str - 搜索关键词 + limit: int - 返回结果数量(默认5条) + + 输出: + 搜索结果列表,每条包含标题、链接、摘要 + + 适用场景: + - 查找最新资讯 + - 搜索技术文档 + - 查找某个问题的答案 + """ + + def execute(self, **kwargs) -> ToolResult: + """ + 执行搜索 + + 注意:这里用模拟数据演示 + 真实场景中,你需要调用 Bing/Google API + """ + query = kwargs.get("query", "") + limit = kwargs.get("limit", 5) + + if not query: + return ToolResult( + success=False, + error="搜索关键词不能为空" + ) + + # 模拟搜索结果 + # 真实场景:这里调用 search_api(query) + mock_results = [ + { + "title": f"关于 '{query}' 的官方文档", + "url": "https://example.com/doc", + "snippet": f"这是关于 {query} 的详细官方文档..." + }, + { + "title": f"{query} 入门教程", + "url": "https://example.com/tutorial", + "snippet": f"学习 {query} 的快速入门指南..." + }, + ] + + return ToolResult( + success=True, + result=mock_results[:limit], + metadata={"query": query, "count": len(mock_results[:limit])} + ) + + +class JaspersoftCodeGeneratorTool(BaseTool): + """ + Jaspersoft JRXML 代码生成工具(对应你的实际项目) + + 这个工具演示: + 1. 如何设计业务相关的 Tool + 2. 如何处理复杂输入 + 3. 如何返回有意义的结果 + + 这个工具的作用: + 用户说"生成一个销售报表" + -> Tool 调用 LLM 生成 JRXML 代码 + -> 返回生成的代码 + """ + + @property + def name(self) -> str: + return "jrxml_generator" + + @property + def description(self) -> str: + return """ + JasperReports JRXML 报表生成工具。 + + 根据用户的自然语言需求,生成 JRXML 报表模板代码。 + + 输入: + requirement: str - 用户的需求描述 + context: str - 额外的上下文信息(如参考模板、数据源信息等) + + 输出: + 生成的 JRXML 代码(字符串) + + 适用场景: + - 从零生成新报表 + - 基于现有模板修改 + - 生成特定格式的报表 + + 注意: + 生成的代码需要通过 jrxml_validator 验证后才能使用 + """ + + def execute(self, **kwargs) -> ToolResult: + """ + 执行 JRXML 生成 + + 参数处理说明: + - 使用 kwargs.get() 安全获取参数,避免 KeyError + - 提供默认值作为后备 + - 类型检查确保参数正确 + """ + requirement = kwargs.get("requirement", "") + context = kwargs.get("context", "") + + if not requirement: + return ToolResult( + success=False, + error="需求描述不能为空" + ) + + # 实际场景中,这里会: + # 1. 构建 Prompt + # 2. 调用 LLM + # 3. 提取生成的 JRXML 代码 + # + # 示例代码: + # prompt = build_generation_prompt(requirement, context) + # response = llm.invoke(prompt) + # jrxml = extract_jrxml(response) + + # 模拟生成结果 + mock_jrxml = f''' + + + '2024-01-01']]> + + + + + + + + Product + + + + $F{{product_name}} + + +''' + + return ToolResult( + success=True, + result=mock_jrxml, + metadata={ + "requirement": requirement, + "context_provided": bool(context), + "note": "这是模拟结果,实际需要调用 LLM" + } + ) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第五部分:工具管理系统 +# ═══════════════════════════════════════════════════════════════════════════════ + +class ToolRegistry: + """ + 工具注册表 + + 这个类负责: + 1. 注册所有可用的 Tool + 2. 根据名称查找 Tool + 3. 列出所有可用 Tool(供 LLM 了解能力) + + 为什么需要这个? + 因为一个 Agent 可能有很多 Tool + 需要一个统一的地方来管理它们 + + 设计模式:注册表模式(Registry Pattern) + 核心思想:用一个中心来管理所有实例 + """ + + def __init__(self): + """初始化空的工具注册表""" + self._tools: Dict[str, BaseTool] = {} + + def register(self, tool: BaseTool) -> None: + """ + 注册一个工具 + + 为什么用 tool.name 作为 key? + 因为 name 是工具的唯一标识符 + 同一个 name 的工具只能注册一次 + + 实际应用: + registry = ToolRegistry() + registry.register(CalculatorTool()) + registry.register(SearchTool()) + """ + if tool.name in self._tools: + raise ValueError(f"工具 '{tool.name}' 已经注册过了") + + self._tools[tool.name] = tool + + def get(self, name: str) -> Optional[BaseTool]: + """ + 根据名称获取工具 + + 返回 Optional[BaseTool]: + 如果找到,返回工具实例 + 如果没找到,返回 None + """ + return self._tools.get(name) + + def list_tools(self) -> list[dict]: + """ + 列出所有已注册的工具 + + 返回格式: + [ + {"name": "calculator", "description": "..."}, + {"name": "web_search", "description": "..."}, + ] + + 这个格式是为了方便给 LLM 展示可用工具列表 + """ + return [tool.to_dict() for tool in self._tools.values()] + + def execute(self, tool_name: str, **kwargs) -> ToolResult: + """ + 执行指定名称的工具 + + 这是对用户暴露的统一接口 + 用户不需要知道具体哪个工具,只要说"执行这个" + """ + tool = self.get(tool_name) + if tool is None: + return ToolResult( + success=False, + error=f"工具 '{tool_name}' 不存在" + ) + + return tool.execute(**kwargs) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第六部分:演示代码 +# ═══════════════════════════════════════════════════════════════════════════════ + +def demo(): + """ + 演示如何使用工具系统 + + 这个函数展示: + 1. 创建工具注册表 + 2. 注册工具 + 3. 执行工具 + 4. 查看可用工具 + """ + print("=" * 60) + print("Step 01: 理解 Tool - 工具系统演示") + print("=" * 60) + + # 1. 创建工具注册表 + registry = ToolRegistry() + + # 2. 注册工具 + registry.register(CalculatorTool()) + registry.register(SearchTool()) + registry.register(JaspersoftCodeGeneratorTool()) + + print("\n📋 已注册的工具列表:") + for tool_info in registry.list_tools(): + print(f" - {tool_info['name']}: {tool_info['description'][:50]}...") + + # 3. 执行计算器工具 + print("\n🔧 执行计算器工具:") + result = registry.execute("calculator", expression="2 + 3 * 4") + if result.success: + print(f" 结果: {result.result}") + else: + print(f" 错误: {result.error}") + + # 4. 执行搜索工具 + print("\n🔍 执行搜索工具:") + result = registry.execute("web_search", query="JasperReports 教程", limit=2) + if result.success: + for item in result.result: + print(f" - {item['title']}") + else: + print(f" 错误: {result.error}") + + # 5. 执行 JRXML 生成工具 + print("\n📄 执行 JRXML 生成工具:") + result = registry.execute( + "jrxml_generator", + requirement="生成一个销售报表,显示月度汇总" + ) + if result.success: + print(f" 生成成功! (长度: {len(result.result)} 字符)") + print(f" 前100字符: {result.result[:100]}...") + else: + print(f" 错误: {result.error}") + + # 6. 演示错误处理 + print("\n⚠️ 演示错误处理:") + result = registry.execute("calculator", expression="1 / 0") + if not result.success: + print(f" 预期错误: {result.error}") + + +if __name__ == "__main__": + demo() diff --git a/step_01_tools/exercise.py b/step_01_tools/exercise.py new file mode 100644 index 0000000..f4d6d7e --- /dev/null +++ b/step_01_tools/exercise.py @@ -0,0 +1,223 @@ +""" +Step 01 练习题:设计你的第一个 Tool + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +🎯 练习目标: + 1. 巩固 Tool 的基本结构 + 2. 设计一个业务相关的 Tool + 3. 理解 Tool 注册系统 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 1:完善 TextProcessorTool +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 完善 TextProcessorTool,这个工具用于处理文本。 + +要求: + 1. 实现 word_count 方法:统计单词数量 + 2. 实现 character_count 方法:统计字符数量(不包括空格) + 3. 实现 sentence_count 方法:统计句子数量(按句号、问号、感叹号分割) + +提示: + - text 可能包含多行 + - 句子分割要考虑常见的句子结束符:. ! ? + - 单词分割可以考虑按空格分割 + +完成后测试: + text = "Hello, world! This is a test. How are you?" + 预期结果: + - word_count: 9 + - character_count: 38 (不包括空格) + - sentence_count: 3 +""" + +from step_01_tools.concept import BaseTool, ToolResult + + +class TextProcessorTool(BaseTool): + """文本处理工具""" + + @property + def name(self) -> str: + return "text_processor" + + @property + def description(self) -> str: + return "文本处理工具,统计文本的各种特征" + + def execute(self, **kwargs) -> ToolResult: + """处理文本请求""" + text = kwargs.get("text", "") + operation = kwargs.get("operation", "word_count") # 默认操作 + + if not text: + return ToolResult(success=False, error="文本不能为空") + + if operation == "word_count": + # TODO: 实现单词统计 + result = self.word_count(text) + elif operation == "character_count": + # TODO: 实现字符统计 + result = self.character_count(text) + elif operation == "sentence_count": + # TODO: 实现句子统计 + result = self.sentence_count(text) + else: + return ToolResult(success=False, error=f"不支持的操作: {operation}") + + return ToolResult(success=True, result=result) + + def word_count(self, text: str) -> int: + """统计单词数量""" + # 提示:按空格分割,过滤空字符串 + # 你的代码: + pass + + def character_count(self, text: str) -> int: + """统计字符数量(不包括空格)""" + # 提示:去除所有空白字符后统计长度 + # 你的代码: + pass + + def sentence_count(self, text: str) -> int: + """统计句子数量""" + # 提示:按 . ! ? 分割 + # 你的代码: + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 2:设计一个 EmailTool +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 设计一个 EmailTool,用于处理邮件相关操作。 + +功能要求: + 1. send_email: 发送邮件(收件人、主题、正文) + 2. search_emails: 搜索邮件(关键词) + 3. get_unread_count: 获取未读邮件数量 + +设计提示: + - 工具名称要简洁明了 + - 描述要告诉 LLM 这个工具能做什么 + - execute 方法要处理不同的 operation + +这个练习的目的是让你学会: + - 如何设计多功能的 Tool + - 如何用 operation 参数区分不同功能 + - 如何返回结构化的结果 +""" + + +class EmailTool(BaseTool): + """邮件处理工具""" + + # TODO: 实现属性和方法 + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 3:改进工具注册表 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 给 ToolRegistry 添加一个新功能:根据描述关键词搜索工具 + +方法签名: + def search_tools(self, keyword: str) -> list[dict]: + ''' + 根据关键词搜索工具 + + 参数: + keyword: str - 搜索关键词 + + 返回: + 匹配的工具列表,格式同 list_tools() + + 匹配规则: + 如果关键词出现在工具名称或描述中,就算匹配 + 匹配应该不区分大小写 + ''' + +预期行为: + registry = ToolRegistry() + registry.register(CalculatorTool()) + registry.register(SearchTool()) + registry.register(TextProcessorTool()) # 用你刚完成的 + + results = registry.search_tools("text") + # 应该返回 text_processor + + results = registry.search_tools("calculate") + # 应该返回 calculator + + results = registry.search_tools("web") + # 应该返回 web_search +""" + +# 从 step_01_tools.concept 导入已有的类 +from step_01_tools.concept import ToolRegistry, CalculatorTool, SearchTool + + +def add_search_to_registry(): + """ + 在这里实现 search_tools 方法 + + 提示: + 1. 遍历所有已注册的工具 + 2. 检查 name 或 description 中是否包含 keyword + 3. 收集匹配的工具 + 4. 返回列表 + + 完成后,在 main.py 中测试 + """ + # 你的代码: + + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 运行测试 +# ═══════════════════════════════════════════════════════════════════════════════ + +def test_exercises(): + """测试所有练习""" + print("\n" + "=" * 60) + print("测试练习答案") + print("=" * 60) + + # 测试练习 1 + print("\n📝 练习 1: TextProcessorTool") + tool = TextProcessorTool() + test_text = "Hello, world! This is a test. How are you?" + + print(f"测试文本: {test_text}") + print(f"单词数量: {tool.word_count(test_text)}") # 应该是 9 + print(f"字符数量: {tool.character_count(test_text)}") # 应该是 38 + print(f"句子数量: {tool.sentence_count(test_text)}") # 应该是 3 + + # 测试练习 2 + print("\n📝 练习 2: EmailTool") + # 运行看看你完成了多少 + + # 测试练习 3 + print("\n📝 练习 3: search_tools") + registry = ToolRegistry() + registry.register(CalculatorTool()) + registry.register(SearchTool()) + registry.register(tool) + + # 你的测试代码: + + +if __name__ == "__main__": + test_exercises() diff --git a/step_01_tools/exercise_answer.py b/step_01_tools/exercise_answer.py new file mode 100644 index 0000000..9a9795a --- /dev/null +++ b/step_01_tools/exercise_answer.py @@ -0,0 +1,264 @@ +""" +Step 01 练习题答案 + +⚠️ 先自己思考,再看答案! +⚠️ 答案不是唯一的,这里只是其中一种实现 +""" + +from step_01_tools.concept import BaseTool, ToolResult + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 1 答案:TextProcessorTool +# ═══════════════════════════════════════════════════════════════════════════════ + +class TextProcessorTool(BaseTool): + """文本处理工具""" + + @property + def name(self) -> str: + return "text_processor" + + @property + def description(self) -> str: + return "文本处理工具,统计文本的各种特征" + + def execute(self, **kwargs) -> ToolResult: + """处理文本请求""" + text = kwargs.get("text", "") + operation = kwargs.get("operation", "word_count") + + if not text: + return ToolResult(success=False, error="文本不能为空") + + if operation == "word_count": + result = self.word_count(text) + elif operation == "character_count": + result = self.character_count(text) + elif operation == "sentence_count": + result = self.sentence_count(text) + else: + return ToolResult(success=False, error=f"不支持的操作: {operation}") + + return ToolResult(success=True, result=result) + + def word_count(self, text: str) -> int: + """统计单词数量""" + # 按空格分割,过滤空字符串 + words = [w for w in text.split() if w.strip()] + return len(words) + + def character_count(self, text: str) -> int: + """统计字符数量(不包括空格)""" + # 去除所有空白字符后统计 + return len(text.replace(" ", "").replace("\n", "").replace("\t", "")) + + def sentence_count(self, text: str) -> int: + """统计句子数量""" + # 按常见句子结束符分割 + import re + sentences = re.split(r'[.!?]+', text) + # 过滤空字符串 + sentences = [s for s in sentences if s.strip()] + return len(sentences) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 2 答案:EmailTool +# ═══════════════════════════════════════════════════════════════════════════════ + +class EmailTool(BaseTool): + """邮件处理工具""" + + @property + def name(self) -> str: + return "email" + + @property + def description(self) -> str: + return """ + 邮件处理工具,用于发送和搜索邮件。 + + 支持的操作: + send_email: 发送邮件 + - to: 收件人邮箱 + - subject: 邮件主题 + - body: 邮件正文 + 返回: {"success": true, "message_id": "xxx"} + + search_emails: 搜索邮件 + - keyword: 搜索关键词 + - limit: 返回数量(默认10) + 返回: [{"from": "...", "subject": "...", "date": "..."}, ...] + + get_unread_count: 获取未读邮件数量 + 无需参数 + 返回: {"count": 5} + """ + + def execute(self, **kwargs) -> ToolResult: + """执行邮件操作""" + operation = kwargs.get("operation", "get_unread_count") + + if operation == "send_email": + return self.send_email( + to=kwargs.get("to", ""), + subject=kwargs.get("subject", ""), + body=kwargs.get("body", "") + ) + elif operation == "search_emails": + return self.search_emails( + keyword=kwargs.get("keyword", ""), + limit=kwargs.get("limit", 10) + ) + elif operation == "get_unread_count": + return self.get_unread_count() + else: + return ToolResult(success=False, error=f"不支持的操作: {operation}") + + def send_email(self, to: str, subject: str, body: str) -> ToolResult: + """发送邮件""" + if not to: + return ToolResult(success=False, error="收件人不能为空") + if not subject: + return ToolResult(success=False, error="主题不能为空") + + # 实际场景:调用邮件发送 API + # 这里用模拟数据 + message_id = f"msg_{hash(to + subject) % 100000}" + + return ToolResult( + success=True, + result={ + "success": True, + "message_id": message_id, + "to": to, + "subject": subject + } + ) + + def search_emails(self, keyword: str, limit: int) -> ToolResult: + """搜索邮件""" + if not keyword: + return ToolResult(success=False, error="搜索关键词不能为空") + + # 模拟搜索结果 + mock_results = [ + { + "from": "boss@company.com", + "subject": f"关于项目的{keyword}", + "date": "2024-01-15", + "snippet": "..." + }, + { + "from": "colleague@company.com", + "subject": f"回复: {keyword}的讨论", + "date": "2024-01-14", + "snippet": "..." + }, + ] + + return ToolResult( + success=True, + result=mock_results[:limit] + ) + + def get_unread_count(self) -> ToolResult: + """获取未读邮件数量""" + # 实际场景:调用邮件 API 获取未读数 + return ToolResult(success=True, result={"count": 5}) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 3 答案:给 ToolRegistry 添加 search_tools +# ═══════════════════════════════════════════════════════════════════════════════ + +from step_01_tools.concept import ToolRegistry, CalculatorTool, SearchTool + + +def add_search_to_registry(): + """ + 演示如何给 ToolRegistry 添加 search_tools 方法 + """ + + # 方法1:在原类上添加(直接修改原类) + def search_tools_original(self, keyword: str) -> list[dict]: + """根据关键词搜索工具""" + keyword_lower = keyword.lower() + results = [] + + for tool in self._tools.values(): + # 检查名称或描述中是否包含关键词 + name_match = keyword_lower in tool.name.lower() + desc_match = keyword_lower in tool.description.lower() + + if name_match or desc_match: + results.append(tool.to_dict()) + + return results + + # 给 ToolRegistry 添加方法 + ToolRegistry.search_tools = search_tools_original + + # 测试 + registry = ToolRegistry() + registry.register(CalculatorTool()) + registry.register(SearchTool()) + registry.register(TextProcessorTool()) + + print("搜索 'text':") + for t in registry.search_tools("text"): + print(f" - {t['name']}") + + print("搜索 'calculate':") + for t in registry.search_tools("calculate"): + print(f" - {t['name']}") + + print("搜索 'web':") + for t in registry.search_tools("web"): + print(f" - {t['name']}") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 测试运行 +# ═══════════════════════════════════════════════════════════════════════════════ + +def test_answers(): + """测试答案""" + print("\n" + "=" * 60) + print("测试练习答案") + print("=" * 60) + + # 测试练习 1 + print("\n📝 练习 1: TextProcessorTool") + tool = TextProcessorTool() + test_text = "Hello, world! This is a test. How are you?" + + print(f"测试文本: {test_text}") + print(f"单词数量: {tool.word_count(test_text)}") # 应该是 9 + print(f"字符数量: {tool.character_count(test_text)}") # 应该是 38 + print(f"句子数量: {tool.sentence_count(test_text)}") # 应该是 3 + + # 测试练习 2 + print("\n📝 练习 2: EmailTool") + email_tool = EmailTool() + + print("\n 发送邮件:") + result = email_tool.send_email("test@example.com", "测试", "这是一封测试邮件") + print(f" 结果: {result.result}") + + print("\n 搜索邮件:") + result = email_tool.search_emails("项目", limit=5) + print(f" 结果: {result.result}") + + print("\n 未读邮件数:") + result = email_tool.get_unread_count() + print(f" 结果: {result.result}") + + # 测试练习 3 + print("\n📝 练习 3: search_tools") + add_search_to_registry() + + +if __name__ == "__main__": + test_answers() diff --git a/step_01_tools/main.py b/step_01_tools/main.py new file mode 100644 index 0000000..35ee690 --- /dev/null +++ b/step_01_tools/main.py @@ -0,0 +1,169 @@ +""" +Step 01: 工具系统基础 - 主程序 + +运行方式: + cd step_01_tools + python main.py +""" + +from concept import ( + ToolRegistry, + CalculatorTool, + SearchTool, + JaspersoftCodeGeneratorTool +) + + +def main(): + """演示工具系统的完整使用流程""" + + print("=" * 70) + print(" Step 01: 理解 Tool - 工具系统基础") + print("=" * 70) + print() + + # ═══════════════════════════════════════════════════════════════════════ + # 场景:构建一个 Jaspersoft 报表助手的工具集 + # ═══════════════════════════════════════════════════════════════════════ + + print("📦 场景:构建 Jaspersoft 报表助手") + print("-" * 70) + print(""" +假设我们要构建一个 AI 助手,帮助用户: + 1. 根据需求生成 JRXML 代码 + 2. 验证生成的代码是否正确 + 3. 计算报表的统计数据 + 4. 搜索相关的报表模板 + +我们可以设计以下工具: + - jrxml_generator: 生成 JRXML 代码 + - jrxml_validator: 验证 JRXML 语法 + - calculator: 计算统计数据 + - web_search: 搜索报表模板 +""") + + # ═══════════════════════════════════════════════════════════════════════ + # 步骤 1:创建工具注册表 + # ═══════════════════════════════════════════════════════════════════════ + + print("\n📋 步骤 1: 创建工具注册表") + print("-" * 40) + + registry = ToolRegistry() + print("✓ 创建了空的工具注册表") + print(f" 当前注册工具数量: {len(registry.list_tools())}") + + # ═══════════════════════════════════════════════════════════════════════ + # 步骤 2:注册工具 + # ═══════════════════════════════════════════════════════════════════════ + + print("\n🔧 步骤 2: 注册工具") + print("-" * 40) + + # 注册各种工具 + registry.register(CalculatorTool()) + print("✓ 注册计算器工具") + + registry.register(SearchTool()) + print("✓ 注册搜索工具") + + registry.register(JaspersoftCodeGeneratorTool()) + print("✓ 注册 JRXML 生成工具") + + print(f"\n 当前注册工具数量: {len(registry.list_tools())}") + + # ═══════════════════════════════════════════════════════════════════════ + # 步骤 3:查看可用工具 + # ═══════════════════════════════════════════════════════════════════════ + + print("\n📜 步骤 3: 查看可用工具") + print("-" * 40) + + for tool_info in registry.list_tools(): + print(f"\n [{tool_info['name']}]") + # 截取描述的前100个字符 + desc = tool_info['description'].strip().replace('\n', ' ')[:100] + print(f" {desc}...") + + # ═══════════════════════════════════════════════════════════════════════ + # 步骤 4:执行工具 + # ═══════════════════════════════════════════════════════════════════════ + + print("\n\n⚡ 步骤 4: 执行工具") + print("-" * 40) + + # 场景:用户想要生成一个销售报表 + user_requirement = "生成一个销售报表,显示月度汇总" + + print(f"\n 用户需求: {user_requirement}") + + # 4.1 先搜索相关的模板 + print("\n 4.1 搜索相关模板...") + search_result = registry.execute("web_search", query="JasperReports 销售报表模板") + if search_result.success: + print(f" 找到 {len(search_result.result)} 个相关模板") + for i, item in enumerate(search_result.result[:2], 1): + print(f" {i}. {item['title']}") + + # 4.2 生成 JRXML + print("\n 4.2 生成 JRXML 代码...") + generate_result = registry.execute( + "jrxml_generator", + requirement=user_requirement, + context="参考销售报表模板" + ) + if generate_result.success: + print(f" ✓ 生成成功!") + print(f" 代码长度: {len(generate_result.result)} 字符") + print(f" 代码预览:") + print(" " + "-" * 30) + for line in generate_result.result.split('\n')[:5]: + print(f" {line}") + print(" ...") + + # 4.3 计算统计 + print("\n 4.3 计算统计...") + calc_result = registry.execute("calculator", expression="10 + 20 + 30") + if calc_result.success: + print(f" 10 + 20 + 30 = {calc_result.result}") + + # ═══════════════════════════════════════════════════════════════════════ + # 步骤 5:错误处理 + # ═══════════════════════════════════════════════════════════════════════ + + print("\n\n⚠️ 步骤 5: 错误处理演示") + print("-" * 40) + + # 场景:用户尝试除以零 + print("\n 尝试执行: calculator(expression='10 / 0')") + error_result = registry.execute("calculator", expression="10 / 0") + if not error_result.success: + print(f" ✓ 错误被正确捕获: {error_result.error}") + + # 尝试调用不存在的工具 + print("\n 尝试执行: nonexistent_tool()") + error_result = registry.execute("nonexistent_tool") + if not error_result.success: + print(f" ✓ 错误被正确捕获: {error_result.error}") + + # ═══════════════════════════════════════════════════════════════════════ + # 总结 + # ═══════════════════════════════════════════════════════════════════════ + + print("\n\n" + "=" * 70) + print(" ✅ Step 01 完成!") + print("=" * 70) + print(""" +学到的关键概念: + 1. Tool = name + description + execute + 2. BaseTool 是所有工具的基类 + 3. ToolRegistry 统一管理所有工具 + 4. ToolResult 统一返回结果格式 + +下一步: + 继续 Step 02,学习如何管理 Agent 的状态 +""") + + +if __name__ == "__main__": + main() diff --git a/step_02_state/README.md b/step_02_state/README.md new file mode 100644 index 0000000..59d4c66 --- /dev/null +++ b/step_02_state/README.md @@ -0,0 +1,54 @@ +# Step 02: 理解 State - 状态管理 + +## 🎯 学习目标 + +- 理解什么是 Agent State(代理状态) +- 理解为什么 Agent 需要状态 +- 学会设计合理的状态结构 +- 理解状态在多步骤任务中的作用 + +--- + +## 📖 概念讲解 + +### 什么是 State? + +State(状态)是 Agent 的"记忆"——它记录了: + +1. **当前任务进展**:完成了多少,还剩多少 +2. **历史数据**:用户说过什么,生成过什么 +3. **中间结果**:每个步骤的输出是什么 +4. **工具调用结果**:工具返回了什么 + +``` +没有 State 的 Agent: + 用户: "生成报表" + Agent: 生成报表 + 用户: "把标题改成黑色" + Agent: ??? 我不记得你刚才生成的是什么报表 + +有 State 的 Agent: + 用户: "生成报表" + Agent: 生成报表,记录到 state + state = {current_jrxml: "..."} + + 用户: "把标题改成黑色" + Agent: 从 state 读取 current_jrxml + 修改标题 + 更新 state = {current_jrxml: "新报表"} +``` + +### 为什么需要精心设计 State? + +一个好的 State 设计应该: + +1. **包含所有必要信息**:不遗漏关键数据 +2. **避免信息冗余**:不要重复存储相同数据 +3. **结构清晰**:易于读取和更新 +4. **类型安全**:有类型提示,减少 bug + +--- + +## 💻 代码实现 + +请打开 `concept.py` 查看详细代码注释。 diff --git a/step_02_state/concept.py b/step_02_state/concept.py new file mode 100644 index 0000000..a4c68fe --- /dev/null +++ b/step_02_state/concept.py @@ -0,0 +1,486 @@ +""" +Step 02: 理解 State - 状态管理 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +🎓 本节内容: + 1. 什么是 Agent State? + 2. 如何设计 State 结构? + 3. State 如何在多步骤任务中传递? + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from typing import TypedDict, List, Dict, Any, Optional +from dataclasses import dataclass, field +from datetime import datetime +import json + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第一部分:理解为什么需要 State +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +在开始写代码之前,我们先理解 State 的本质。 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +场景:用户想生成一个报表,然后修改它 +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +❌ 没有 State 的实现(错误): + def handle_request(user_input): + if "生成" in user_input: + jrxml = generate_jrxml(user_input) + return jrxml # 生成后就"丢"了 + + if "修改" in user_input: + # 糟糕!我不知道当前报表是什么 + # 只能让用户重新描述 + return "请重新描述你想要修改的报表" + +✅ 有 State 的实现(正确): + class Agent: + def __init__(self): + self.state = {} # 用一个字典存储状态 + + def handle_request(self, user_input): + if "生成" in user_input: + jrxml = generate_jrxml(user_input) + self.state["current_jrxml"] = jrxml # 保存到状态 + return jrxml + + if "修改" in user_input: + current = self.state.get("current_jrxml") # 从状态读取 + if not current: + return "没有可修改的报表" + modified = modify_jrxml(current, user_input) + self.state["current_jrxml"] = modified # 更新状态 + return modified + +这就是 State 的作用:在多次交互中保持信息! +""" + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第二部分:设计 State 的数据结构 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +设计 State 时,我们使用 Python 的 TypedDict +这是因为: + 1. 有类型提示,IDE 能帮你检查错误 + 2. 有代码补全,写代码更方便 + 3. 文档化,其他人知道 State 里有什么 +""" + +class AgentState(TypedDict, total=False): + """ + Agent 的状态定义 + + 为什么用 TypedDict 而不是 dataclass? + 因为 TypedDict 更直观,看起来就像一个字典 + 而且 LangGraph 直接支持 TypedDict + + total=False 的含义: + 所有字段都是可选的 + 这样初始化时可以只填需要的字段 + + 每个字段的用途: + - user_input: 当前用户的输入 + - current_jrxml: 当前正在编辑的报表代码 + - conversation_history: 对话历史 + - status: 当前状态(处理中/完成/错误) + - error_msg: 错误信息(如果有) + """ + # === 核心工作字段 === + user_input: str # 用户当前输入 + current_jrxml: str # 当前 JRXML 代码 + status: str # 处理状态: "processing" / "success" / "error" + error_msg: str # 错误信息 + + # === 对话相关 === + conversation_history: List[dict] # 对话历史 [{"role": "user", "content": "..."}] + full_conversation_history: List[dict] # 完整的对话历史(含时间戳) + + # === 生成相关 === + stage: str # 当前阶段: "initial" / "refine" / "mapping" + generated_jrxml: str # 生成的完整 JRXML + is_modified: bool # 是否有未保存的修改 + + # === 验证相关 === + validation_result: dict # 验证结果 + retry_count: int # 重试次数 + + # === 元信息 === + session_id: str # 会话 ID + created_at: str # 创建时间 + updated_at: str # 更新时间 + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第三部分:实际应用 - Jaspersoft 报表生成的完整状态 +# ═══════════════════════════════════════════════════════════════════════════════ + +class JaspersoftAgentState(TypedDict, total=False): + """ + Jaspersoft 报表生成 Agent 的完整状态 + + 这个状态设计对应了你实际项目中的需求 + 我们逐个解释每个字段的作用 + """ + + # ═══════════════════════════════════════════════════════════════════════ + # 1. 基础信息 + # ═══════════════════════════════════════════════════════════════════════ + + session_id: str # 会话唯一标识 + session_name: str # 会话名称(用户友好) + created_at: str # 创建时间 + updated_at: str # 最后更新时间 + + # ═══════════════════════════════════════════════════════════════════════ + # 2. 用户输入 + # ═══════════════════════════════════════════════════════════════════════ + + user_input: str # 用户当前的输入 + uploaded_file_path: str # 上传的文件路径(如果有) + + # ═══════════════════════════════════════════════════════════════════════ + # 3. 对话历史 + # ═══════════════════════════════════════════════════════════════════════ + + """ + 对话历史的设计考虑: + + 为什么需要两种历史? + 1. conversation_history:精简版,用于发送给 LLM(节省 token) + 2. full_conversation_history:完整版,包含时间戳等元信息(用于审计) + + 为什么不直接保存所有消息? + 因为 LLM 有上下文长度限制 + 当对话很长时,我们只能发送最近的几轮 + 所以需要"精简版"和"完整版"的区分 + """ + conversation_history: List[dict] # 精简对话历史 + full_conversation_history: List[dict] # 完整对话历史 + compressed_history: str # 压缩后的早期对话 + + # ═══════════════════════════════════════════════════════════════════════ + # 4. 报表相关 + # ═══════════════════════════════════════════════════════════════════════ + + """ + 报表相关的状态字段是最核心的部分 + + current_jrxml vs final_jrxml 的区别: + - current_jrxml:正在编辑的版本,可能还没验证通过 + - final_jrxml:经过验证的最终版本,可以导出 + + 为什么需要版本历史? + - 支持撤销操作 + - 用户可能想回退到之前的某个版本 + - 记录每次修改的轨迹 + """ + current_jrxml: str # 当前正在编辑的 JRXML + final_jrxml: str # 最终确认的 JRXML(验证通过) + + # 版本管理 + jrxml_versions: List[dict] # 历史版本列表 + history_states: List[dict] # 历史状态快照(用于撤销) + last_saved_version: int # 最后保存的版本号 + + # ═══════════════════════════════════════════════════════════════════════ + # 5. 生成过程 + # ═══════════════════════════════════════════════════════════════════════ + + """ + 生成过程的中间状态 + + 为什么要记录这些? + 1. 用户可能想知道生成到哪一步了 + 2. 出错时可以定位问题在哪一步 + 3. 方便调试和优化 + """ + stage: str # 当前阶段 + """ + 可能的阶段值: + - "initial_generation": 初始生成 + - "layout_refine": 布局精调 + - "field_mapping": 字段映射 + - "validation": 验证 + - "correction": 修正 + """ + + intent: str # 用户意图 + """ + 可能的意图值: + - "initial_generation": 生成新报表 + - "modify_report": 修改现有报表 + - "preview_report": 预览报表 + - "consult_question": 咨询问题 + """ + + # 生成相关的中间结果 + retrieved_context: str # RAG 检索到的上下文 + layout_schema: dict # OCR 分析出的布局信息 + ocr_extraction_result: dict # OCR 提取的字段 + + # ═══════════════════════════════════════════════════════════════════════ + # 6. 验证和错误处理 + # ═══════════════════════════════════════════════════════════════════════ + + """ + 验证和错误处理是 Agent 可靠性的关键 + + retry_count 的设计: + - 每次生成失败后 +1 + - 达到上限后停止重试 + - 这样避免无限循环 + """ + status: str # 状态:success / error / processing + error_msg: str # 错误信息 + retry_count: int # 当前重试次数 + max_retries: int # 最大重试次数 + + # 错误处理 + pending_failure_context: dict # 待处理的失败上下文 + """ + 这个字段用于"失败恢复" + 当重试耗尽时,我们保存失败信息 + 下次用户输入时,自动注入这个上下文 + """ + + # ═══════════════════════════════════════════════════════════════════════ + # 7. 知识库相关 + # ═══════════════════════════════════════════════════════════════════════ + + """ + 知识库(KB)相关的状态 + + 多租户设计: + - kb_id:当前会话绑定的知识库 ID + - 不同用户/项目可以使用不同的知识库 + """ + kb_id: str # 当前知识库 ID + kb_fields: List[dict] # 知识库中的字段定义 + kb_template_jrxml: str # 知识库中的模板 JRXML + + # ═══════════════════════════════════════════════════════════════════════ + # 8. 用户解释(让用户理解 Agent 在做什么) + # ═══════════════════════════════════════════════════════════════════════ + + """ + natural_explanation 是给用户看的解释 + + 为什么需要这个? + - 用户不只是想知道结果,还想知道 Agent 是怎么想的 + - 如果出错,用户想知道哪里出了问题 + - 这增加了透明度和信任 + """ + natural_explanation: str # 对用户的自然语言解释 + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第四部分:State 的操作工具函数 +# ═══════════════════════════════════════════════════════════════════════════════ + +def create_initial_state(session_id: str) -> JaspersoftAgentState: + """ + 创建初始状态 + + 这是一个工厂函数,用于生成新的状态实例 + 所有必要的默认值在这里设置 + + 为什么用工厂函数而不是直接初始化? + 1. 确保所有必填字段有默认值 + 2. 统一初始化逻辑 + 3. 方便以后修改默认行为 + """ + now = datetime.now().isoformat() + + return JaspersoftAgentState( + # 基础信息 + session_id=session_id, + session_name="新会话", + created_at=now, + updated_at=now, + + # 对话历史 + conversation_history=[], + full_conversation_history=[], + compressed_history="", + + # 报表相关 + current_jrxml="", + final_jrxml="", + jrxml_versions=[], + history_states=[], + last_saved_version=0, + + # 生成过程 + stage="initial", + intent="initial_generation", + retrieved_context="", + layout_schema={}, + ocr_extraction_result={}, + + # 验证和错误 + status="processing", + error_msg="", + retry_count=0, + max_retries=5, + + # 知识库 + kb_id="", + kb_fields=[], + kb_template_jrxml="", + + # 解释 + natural_explanation="", + ) + + +def update_state(state: JaspersoftAgentState, **updates) -> JaspersoftAgentState: + """ + 更新状态 + + 这是一个辅助函数,用于安全地更新状态字段 + + 为什么需要这个函数? + 1. 自动更新时间戳 + 2. 类型检查(确保字段存在) + 3. 记录更新历史(可选) + + 用法: + state = update_state(state, current_jrxml="new content", status="success") + """ + # 更新指定字段 + for key, value in updates.items(): + if key in state: + state[key] = value + else: + raise KeyError(f"State 没有字段: {key}") + + # 自动更新时间戳 + state["updated_at"] = datetime.now().isoformat() + + return state + + +def save_state_snapshot(state: JaspersoftAgentState) -> dict: + """ + 保存状态快照 + + 这用于"撤销"功能 + 在执行重要操作前,保存当前状态的快照 + 如果操作失败,可以回滚到这个快照 + + 返回的快照包含: + - 报表内容 + - 对话历史 + - 意图 + - 用户请求 + """ + return { + "current_jrxml": state.get("current_jrxml", ""), + "final_jrxml": state.get("final_jrxml", ""), + "status": state.get("status", ""), + "conversation_history": list(state.get("conversation_history", [])), + "user_input": state.get("user_input", ""), + "intent": state.get("intent", ""), + "timestamp": datetime.now().isoformat(), + } + + +def restore_state_snapshot(state: JaspersoftAgentState, snapshot: dict) -> JaspersoftAgentState: + """ + 从快照恢复状态 + + 用于"撤销"操作 + 从历史快照中恢复之前保存的状态 + """ + state["current_jrxml"] = snapshot.get("current_jrxml", "") + state["final_jrxml"] = snapshot.get("final_jrxml", "") + state["status"] = snapshot.get("status", "") + state["conversation_history"] = snapshot.get("conversation_history", []) + state["updated_at"] = datetime.now().isoformat() + + return state + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第五部分:演示代码 +# ═══════════════════════════════════════════════════════════════════════════════ + +def demo(): + """ + 演示 State 的使用 + """ + print("=" * 60) + print("Step 02: 理解 State - 状态管理演示") + print("=" * 60) + + # 1. 创建初始状态 + print("\n📦 步骤 1: 创建初始状态") + state = create_initial_state("session_001") + print(f" 会话 ID: {state['session_id']}") + print(f" 创建时间: {state['created_at']}") + print(f" 当前状态: {state['status']}") + + # 2. 更新状态 + print("\n🔄 步骤 2: 更新状态") + state = update_state( + state, + user_input="生成一个销售报表", + current_jrxml='', + status="success", + ) + print(f" 用户输入: {state['user_input']}") + print(f" 生成状态: {state['status']}") + print(f" JRXML 长度: {len(state['current_jrxml'])} 字符") + + # 3. 保存快照 + print("\n📸 步骤 3: 保存状态快照") + snapshot = save_state_snapshot(state) + print(f" 快照时间: {snapshot['timestamp']}") + print(f" 快照内容: JRXML ({len(snapshot['current_jrxml'])} 字符)") + + # 4. 修改状态(模拟用户修改) + print("\n✏️ 步骤 4: 修改报表(模拟)") + state["current_jrxml"] = '' + state["status"] = "modified" + print(f" 新状态: {state['status']}") + print(f" 新 JRXML: {state['current_jrxml']}") + + # 5. 撤销(恢复到快照) + print("\n↩️ 步骤 5: 撤销操作") + state = restore_state_snapshot(state, snapshot) + print(f" 恢复状态: {state['status']}") + print(f" 恢复 JRXML: {state['current_jrxml']}") + + # 6. 模拟完整的生成流程 + print("\n🔄 步骤 6: 模拟完整生成流程") + state = create_initial_state("session_002") + + # 阶段 1: 用户输入 + state["user_input"] = "生成一个采购单报表" + state["intent"] = "initial_generation" + print(f" [阶段1] 用户输入: {state['user_input']}") + + # 阶段 2: 生成 + state["current_jrxml"] = "...生成的 JRXML..." + state["stage"] = "initial_generation" + print(f" [阶段2] 生成完成,长度: {len(state['current_jrxml'])}") + + # 阶段 3: 验证 + state["status"] = "success" + state["final_jrxml"] = state["current_jrxml"] + print(f" [阶段3] 验证通过,状态: {state['status']}") + + print("\n" + "=" * 60) + print("✅ State 管理演示完成") + print("=" * 60) + + +if __name__ == "__main__": + demo() diff --git a/step_02_state/exercise.py b/step_02_state/exercise.py new file mode 100644 index 0000000..83945a0 --- /dev/null +++ b/step_02_state/exercise.py @@ -0,0 +1,230 @@ +""" +Step 02 练习题:设计你的第一个 Agent State + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +🎯 练习目标: + 1. 巩固 State 的基本结构 + 2. 设计一个业务相关的 State + 3. 理解状态在多步骤任务中的作用 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from typing import TypedDict, List, Dict, Any +import json + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 1:完善一个简单的聊天机器人状态 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 设计一个客服聊天机器人的状态 + +要求: + 1. 记录用户信息(用户名、ID) + 2. 记录对话历史 + 3. 记录当前正在处理的问题 + 4. 记录问题解决状态 + 5. 记录用户满意度评分 + +提示: + - 使用 TypedDict 定义状态 + - 考虑哪些字段是必须的,哪些是可选的 +""" + +class CustomerServiceState(TypedDict, total=False): + """ + 客服聊天机器人的状态 + + 请补全以下字段的定义: + """ + # 用户信息 + user_id: str + user_name: str + + # TODO: 添加更多字段... + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 2:设计一个数据分析 Agent 的状态 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 设计一个数据分析 Agent 的状态 + +功能: + 1. 用户提出数据分析需求 + 2. Agent 连接数据源 + 3. Agent 执行查询 + 4. Agent 生成报告 + +请设计状态来支持这个流程,包括: + - 用户需求 + - 数据源配置 + - 查询结果 + - 生成的报告 + - 中间状态 +""" + +class DataAnalysisState(TypedDict, total=False): + """ + 数据分析 Agent 的状态 + + 请补全状态定义... + """ + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 3:实现状态快照和恢复 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 给以下状态实现快照和恢复功能 + +提示: + - 快照应该保存足够的信息来恢复状态 + - 恢复后状态应该是完整的 + - 考虑哪些字段需要保存,哪些不需要 +""" + +def create_snapshot(state: dict) -> dict: + """ + 创建状态快照 + + 应该保存: + - 关键业务数据 + - 不包括临时计算的中间结果 + + 返回格式: + { + "data": {...}, # 快照数据 + "timestamp": "..." # 时间戳 + } + """ + # TODO: 实现这个函数 + pass + + +def restore_from_snapshot(state: dict, snapshot: dict) -> dict: + """ + 从快照恢复状态 + + 参数: + state: 当前状态(会被更新) + snapshot: 之前保存的快照 + + 返回: + 恢复后的状态 + """ + # TODO: 实现这个函数 + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 4:状态验证 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +任务: + 实现状态验证函数 + +验证规则: + 1. 必填字段不能为空 + 2. 字段类型要正确 + 3. 某些字段有取值范围限制 + +返回: + { + "valid": True/False, + "errors": ["错误1", "错误2", ...] + } +""" + +def validate_state(state: dict, rules: dict) -> dict: + """ + 验证状态 + + 参数: + state: 要验证的状态 + rules: 验证规则,格式: + { + "field_name": { + "type": int/str/list/dict, + "required": True/False, + "min": 0, # 可选,数字最小值 + "max": 100, # 可选,数字最大值 + "choices": ["a", "b"] # 可选,枚举值 + } + } + + 示例: + rules = { + "user_id": {"type": str, "required": True}, + "age": {"type": int, "min": 0, "max": 150}, + "status": {"type": str, "choices": ["active", "inactive"]} + } + """ + # TODO: 实现这个函数 + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 测试 +# ═══════════════════════════════════════════════════════════════════════════════ + +def test_exercises(): + """测试所有练习""" + print("\n" + "=" * 60) + print("测试练习答案") + print("=" * 60) + + # 测试练习 3 + print("\n📝 练习 3: 快照和恢复") + # 示例状态 + sample_state = { + "user_id": "123", + "current_task": "数据分析", + "progress": 50, + "temp_data": ["计算中...", "处理中..."], # 临时数据 + } + + print(f"原始状态: {sample_state}") + + # 创建快照 + snapshot = create_snapshot(sample_state) + print(f"快照: {snapshot}") + + # 修改状态 + sample_state["progress"] = 100 + print(f"修改后状态: {sample_state}") + + # 恢复 + restored = restore_from_snapshot(sample_state, snapshot) + print(f"恢复后状态: {restored}") + + # 测试练习 4 + print("\n📝 练习 4: 状态验证") + rules = { + "user_id": {"type": str, "required": True}, + "age": {"type": int, "min": 0, "max": 150}, + "status": {"type": str, "choices": ["active", "inactive"]} + } + + # 有效状态 + valid_state = {"user_id": "123", "age": 25, "status": "active"} + print(f"验证有效状态: {validate_state(valid_state, rules)}") + + # 无效状态 + invalid_state = {"user_id": "123", "age": 200, "status": "unknown"} + print(f"验证无效状态: {validate_state(invalid_state, rules)}") + + +if __name__ == "__main__": + test_exercises() diff --git a/step_02_state/exercise_answer.py b/step_02_state/exercise_answer.py new file mode 100644 index 0000000..2ef33ce --- /dev/null +++ b/step_02_state/exercise_answer.py @@ -0,0 +1,274 @@ +""" +Step 02 练习题答案 + +⚠️ 先自己思考,再看答案! +⚠️ 答案不是唯一的,这里只是其中一种实现 +""" + +from typing import TypedDict, List, Dict, Any +from datetime import datetime + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 1 答案:客服聊天机器人状态 +# ═══════════════════════════════════════════════════════════════════════════════ + +class CustomerServiceState(TypedDict, total=False): + """客服聊天机器人的状态""" + + # === 用户信息 === + user_id: str # 用户 ID + user_name: str # 用户名 + user_email: str # 用户邮箱(可选) + + # === 对话历史 === + conversation_history: List[dict] # 对话历史 + """ + 格式: [{"role": "user", "content": "..."}, + {"role": "assistant", "content": "..."}] + """ + + # === 问题处理 === + current_issue: str # 当前正在处理的问题描述 + issue_status: str # 问题状态: "open" / "investigating" / "resolved" / "closed" + issue_priority: str # 优先级: "low" / "medium" / "high" / "urgent" + + # === 解决方案 === + proposed_solution: str # 提出的解决方案 + solution_steps: List[str] # 解决步骤列表 + is_resolved: bool # 是否已解决 + + # === 用户反馈 === + satisfaction_rating: int # 满意度评分 1-5 + feedback_comment: str # 反馈意见 + + # === 元信息 === + session_start: str # 会话开始时间 + last_interaction: str # 最后互动时间 + agent_id: str # 处理此会话的客服 ID + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 2 答案:数据分析 Agent 状态 +# ═══════════════════════════════════════════════════════════════════════════════ + +class DataAnalysisState(TypedDict, total=False): + """数据分析 Agent 的状态""" + + # === 用户需求 === + user_request: str # 用户的分析需求 + session_id: str # 会话 ID + + # === 数据源配置 === + data_source_type: str # 数据源类型: "database" / "file" / "api" + data_source_config: dict # 数据源配置(连接信息等) + """ + 示例: + { + "host": "localhost", + "database": "sales_db", + "table": "orders" + } + """ + + # === 查询和结果 === + query: str # 执行的 SQL 或查询条件 + query_result: Any # 查询结果 + result_row_count: int # 结果行数 + result_columns: List[str] # 结果列名 + + # === 分析过程 === + stage: str # 当前阶段 + """ + 阶段值: + - "initial": 初始状态 + - "connecting": 连接数据源 + - "querying": 执行查询 + - "analyzing": 分析数据 + - "generating_report": 生成报告 + - "completed": 完成 + - "error": 出错 + """ + + # === 生成的报告 === + generated_report: str # 生成的报告内容 + report_format: str # 报告格式: "json" / "csv" / "markdown" / "html" + + # === 错误处理 === + error_message: str # 错误信息(如果有) + retry_count: int # 重试次数 + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 3 答案:快照和恢复 +# ═══════════════════════════════════════════════════════════════════════════════ + +def create_snapshot(state: dict) -> dict: + """ + 创建状态快照 + + 策略:只保存"业务关键"数据,不保存临时计算结果 + """ + # 定义需要保存的字段(业务关键数据) + business_fields = [ + "user_id", + "user_name", + "current_task", + "progress", + "status", + # 根据实际情况添加更多... + ] + + snapshot_data = {} + for field in business_fields: + if field in state: + snapshot_data[field] = state[field] + + return { + "data": snapshot_data, + "timestamp": datetime.now().isoformat(), + "version": "1.0" + } + + +def restore_from_snapshot(state: dict, snapshot: dict) -> dict: + """ + 从快照恢复状态 + """ + if not snapshot or "data" not in snapshot: + return state + + # 从快照恢复数据 + snapshot_data = snapshot["data"] + for key, value in snapshot_data.items(): + state[key] = value + + # 更新恢复后的时间戳 + state["_restored_at"] = datetime.now().isoformat() + state["_restored_from"] = snapshot.get("timestamp", "unknown") + + return state + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 练习 4 答案:状态验证 +# ═══════════════════════════════════════════════════════════════════════════════ + +def validate_state(state: dict, rules: dict) -> dict: + """ + 验证状态 + """ + errors = [] + + for field_name, field_rules in rules.items(): + value = state.get(field_name) + field_type = field_rules.get("type") + required = field_rules.get("required", False) + + # 1. 检查必填 + if required and (value is None or value == ""): + errors.append(f"字段 '{field_name}' 是必填的") + continue + + # 如果字段为空且不是必填,跳过后续检查 + if value is None or value == "": + continue + + # 2. 检查类型 + if field_type and not isinstance(value, field_type): + errors.append( + f"字段 '{field_name}' 类型错误: " + f"期望 {field_type.__name__}, 实际 {type(value).__name__}" + ) + continue + + # 3. 检查数值范围 + if isinstance(value, (int, float)): + if "min" in field_rules and value < field_rules["min"]: + errors.append( + f"字段 '{field_name}' 小于最小值: " + f"{value} < {field_rules['min']}" + ) + if "max" in field_rules and value > field_rules["max"]: + errors.append( + f"字段 '{field_name}' 大于最大值: " + f"{value} > {field_rules['max']}" + ) + + # 4. 检查枚举值 + if "choices" in field_rules: + if value not in field_rules["choices"]: + errors.append( + f"字段 '{field_name}' 值不在允许范围内: " + f"{value} not in {field_rules['choices']}" + ) + + return { + "valid": len(errors) == 0, + "errors": errors + } + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 测试 +# ═══════════════════════════════════════════════════════════════════════════════ + +def test_answers(): + """测试答案""" + print("\n" + "=" * 60) + print("测试练习答案") + print("=" * 60) + + # 测试练习 1 + print("\n📝 练习 1: 客服状态") + cs_state: CustomerServiceState = { + "user_id": "user_001", + "user_name": "张三", + "issue_status": "open", + "issue_priority": "high", + } + print(f" 状态: {cs_state}") + + # 测试练习 2 + print("\n📝 练习 2: 数据分析状态") + da_state: DataAnalysisState = { + "user_request": "分析本月销售数据", + "data_source_type": "database", + "stage": "querying", + } + print(f" 状态: {da_state}") + + # 测试练习 3 + print("\n📝 练习 3: 快照和恢复") + sample_state = { + "user_id": "123", + "current_task": "数据分析", + "progress": 50, + "temp_data": ["计算中...", "处理中..."], + } + print(f" 原始状态: {sample_state}") + + snapshot = create_snapshot(sample_state) + print(f" 快照: {snapshot}") + + sample_state["progress"] = 100 + restored = restore_from_snapshot(sample_state, snapshot) + print(f" 恢复后状态: {restored}") + + # 测试练习 4 + print("\n📝 练习 4: 状态验证") + rules = { + "user_id": {"type": str, "required": True}, + "age": {"type": int, "min": 0, "max": 150}, + "status": {"type": str, "choices": ["active", "inactive"]} + } + + valid_state = {"user_id": "123", "age": 25, "status": "active"} + print(f" 有效状态: {validate_state(valid_state, rules)}") + + invalid_state = {"user_id": "123", "age": 200, "status": "unknown"} + print(f" 无效状态: {validate_state(invalid_state, rules)}") + + +if __name__ == "__main__": + test_answers() diff --git a/step_02_state/main.py b/step_02_state/main.py new file mode 100644 index 0000000..b6962fc --- /dev/null +++ b/step_02_state/main.py @@ -0,0 +1,196 @@ +""" +Step 02: State 状态管理 - 主程序 + +运行方式: + cd step_02_state + python main.py +""" + +from concept import ( + JaspersoftAgentState, + create_initial_state, + update_state, + save_state_snapshot, + restore_state_snapshot, +) + + +def main(): + """演示状态管理的完整使用流程""" + + print("=" * 70) + print(" Step 02: 理解 State - 状态管理") + print("=" * 70) + print() + + # ═══════════════════════════════════════════════════════════════════════════════ + # 场景:用户生成并修改报表的完整流程 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("📦 场景:用户生成并修改报表的完整流程") + print("-" * 70) + print(""" +假设用户执行以下操作: + 1. 描述需求:生成一个销售报表 + 2. Agent 生成报表 + 3. Agent 验证通过 + 4. 用户要求修改标题 + 5. 用户要求撤销修改 + +我们需要状态来追踪这个完整的流程。 +""") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 步骤 1:创建初始状态 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n📋 步骤 1: 创建初始状态") + print("-" * 40) + + state = create_initial_state("session_001") + print(f" 会话 ID: {state['session_id']}") + print(f" 创建时间: {state['created_at']}") + print(f" 初始状态: {state['status']}") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 步骤 2:用户描述需求 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n📝 步骤 2: 用户描述需求") + print("-" * 40) + + state["user_input"] = "生成一个销售报表,显示月度汇总" + state["intent"] = "initial_generation" + state["stage"] = "initial" + + print(f" 用户输入: {state['user_input']}") + print(f" 意图: {state['intent']}") + print(f" 阶段: {state['stage']}") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 步骤 3:生成报表(模拟) + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n🔧 步骤 3: Agent 生成报表") + print("-" * 40) + + # 模拟生成过程 + state["stage"] = "generation" + generated_jrxml = ''' + + 月度销售汇总报表 + SELECT product, SUM(amount) FROM sales GROUP BY product + + + + 产品 + $F{product} + +''' + + state["current_jrxml"] = generated_jrxml + print(f" 生成完成!") + print(f" JRXML 长度: {len(generated_jrxml)} 字符") + print(f" 当前阶段: {state['stage']}") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 步骤 4:验证通过 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n✅ 步骤 4: 验证通过") + print("-" * 40) + + # 保存状态快照(修改前的备份) + state_snapshot = save_state_snapshot(state) + print(f" ✓ 保存状态快照") + print(f" 快照时间: {state_snapshot['timestamp']}") + print(f" 包含内容: current_jrxml, conversation_history 等") + + # 验证通过,更新状态 + state["status"] = "success" + state["final_jrxml"] = state["current_jrxml"] + state["stage"] = "completed" + + print(f" 验证状态: {state['status']}") + print(f" 最终版本: ✓ 已保存") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 步骤 5:用户要求修改标题 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n✏️ 步骤 5: 用户要求修改标题") + print("-" * 40) + + # 保存修改前的快照 + pre_modify_snapshot = save_state_snapshot(state) + print(f" ✓ 修改前保存快照") + + # 执行修改 + new_jrxml = state["current_jrxml"].replace("月度销售汇总报表", "2024年销售汇总报表") + state["current_jrxml"] = new_jrxml + state["intent"] = "modify_report" + state["stage"] = "modification" + + print(f" 修改内容: 标题从'月度销售汇总报表'改为'2024年销售汇总报表'") + print(f" 当前意图: {state['intent']}") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 步骤 6:用户撤销修改 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n↩️ 步骤 6: 用户撤销修改") + print("-" * 40) + + # 恢复到修改前的状态 + state = restore_state_snapshot(state, pre_modify_snapshot) + print(f" ✓ 撤销成功!") + print(f" 恢复标题: {state['current_jrxml'][:50]}...") + print(f" 当前意图: {state['intent']}") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 展示完整状态 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n\n" + "=" * 70) + print("📊 完整状态一览") + print("=" * 70) + + key_fields = [ + ("session_id", "会话ID"), + ("status", "状态"), + ("intent", "意图"), + ("stage", "阶段"), + ("user_input", "用户输入"), + ("current_jrxml", "当前JRXML"), + ("final_jrxml", "最终JRXML"), + ("created_at", "创建时间"), + ("updated_at", "更新时间"), + ] + + for field, desc in key_fields: + value = state.get(field, "") + if field in ["current_jrxml", "final_jrxml"] and value: + value = f"{value[:50]}..." if len(value) > 50 else value + print(f" {desc}: {value}") + + # ═══════════════════════════════════════════════════════════════════════════════ + # 总结 + # ═══════════════════════════════════════════════════════════════════════════════ + + print("\n\n" + "=" * 70) + print(" ✅ Step 02 完成!") + print("=" * 70) + print(""" +学到的关键概念: + 1. State 是 Agent 的"记忆",在多步骤任务中保持信息 + 2. 使用 TypedDict 定义状态,有类型提示更安全 + 3. 状态快照用于"撤销"功能 + 4. 不同字段用于不同目的:业务数据、对话历史、元信息 + +下一步: + 继续 Step 03,学习如何把 Tool + State 组合成简单的 Agent +""") + + +if __name__ == "__main__": + main() diff --git a/step_03_simple_agent/README.md b/step_03_simple_agent/README.md new file mode 100644 index 0000000..feeb565 --- /dev/null +++ b/step_03_simple_agent/README.md @@ -0,0 +1,60 @@ +# Step 03: 构建简单 Agent + +## 🎯 学习目标 + +- 理解什么是 Agent(代理) +- 理解 Agent 的核心循环:思考 → 行动 → 观察 +- 学会构建一个完整的 Agent +- 理解 LangGraph 的基本用法 + +--- + +## 📖 概念讲解 + +### 什么是 Agent? + +**Agent = LLM + Tool + Loop** + +``` +Agent 工作流程: + + ┌─────────────────────────────────────┐ + │ │ + │ ┌───────────┐ │ + │ │ 思考 │ ◀────────────────┼── LLM 决定 + │ └─────┬─────┘ │ 要执行什么 + │ │ │ + │ ▼ │ + │ ┌───────────┐ ┌─────────┐ │ + │ │ 行动 │───▶│ 执行工具 │ │ + │ └───────────┘ └────┬────┘ │ + │ │ │ + │ ▼ │ + │ ┌─────────┐ │ + │ │ 观察结果 │ │ + │ └────┬────┘ │ + │ │ │ + └──────────────────────────┼────────┘ + │ + ┌───────────────┼───────────────┐ + │ │ │ + ▼ ▼ ▼ + 继续思考 结束 结束 +``` + +### Agent 的核心循环 + +``` +while True: + 1. Think(思考):LLM 分析当前状态,决定下一步行动 + 2. Act(行动):执行工具或返回结果 + 3. Observe(观察):获取执行结果,更新状态 + + 如果决定结束 → break +``` + +--- + +## 💻 代码实现 + +请打开 `concept.py` 查看详细代码注释。 diff --git a/step_03_simple_agent/concept.py b/step_03_simple_agent/concept.py new file mode 100644 index 0000000..ac907e7 --- /dev/null +++ b/step_03_simple_agent/concept.py @@ -0,0 +1,603 @@ +""" +Step 03: 构建简单 Agent + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +🎓 本节内容: + 1. 什么是 Agent?(LLM + Tool + Loop) + 2. Agent 的核心循环:Think → Act → Observe + 3. 如何构建一个完整的 Agent + 4. LangGraph 的基本用法 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from typing import TypedDict, List, Dict, Any, Literal, Optional +from dataclasses import dataclass +from abc import ABC, abstractmethod +import json + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第一部分:理解 Agent 的本质 +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +Agent 的核心公式: + + Agent = LLM + Tool + Loop + +为什么这个公式很重要? + + 1. LLM:负责"思考"——理解用户意图,决定下一步行动 + 2. Tool:负责"行动"——执行具体的操作 + 3. Loop:负责"循环"——不断执行直到任务完成 + +没有 LLM,Agent 只能机械地执行预设命令 +没有 Tool,LLM 只能思考不能行动 +没有 Loop,Agent 只能执行一步 + +只有三者结合,才是真正的"智能代理" +""" + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第二部分:定义 Agent State +# ═══════════════════════════════════════════════════════════════════════════════ + +class SimpleAgentState(TypedDict, total=False): + """ + 简单 Agent 的状态 + + 相比 Step 02 的状态,这个状态更精简,适合教学 + """ + + # 用户输入 + user_input: str + + # 对话历史 + messages: List[dict] + """ + 格式: + [ + {"role": "user", "content": "用户说了什么"}, + {"role": "assistant", "content": "助手说了什么"}, + {"role": "system", "content": "系统做了什么"}, + ] + """ + + # 当前行动 + current_action: str + """ + 当前正在执行的动作: + - "thinking": LLM 正在思考 + - "using_tool": 正在使用工具 + - "finished": 完成 + """ + + # 工具调用相关 + tool_calls: List[dict] # 记录所有工具调用 + last_tool_result: Any # 最后一个工具的执行结果 + tool_result: Any # 当前工具执行结果 + + # 生成结果 + final_output: str # 最终输出 + generation: str # 当前正在生成的文本 + + # 状态 + status: str + """ + 状态值: + - "input": 等待输入 + - "thinking": 思考中 + - "acting": 执行中 + - "finished": 完成 + - "error": 错误 + """ + error: str + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第三部分:定义 Tool 接口 +# ═══════════════════════════════════════════════════════════════════════════════ + +@dataclass +class ToolCall: + """工具调用的数据结构""" + name: str # 工具名称 + arguments: dict # 传递给工具的参数 + result: Any = None # 工具执行结果 + error: str = None # 错误信息 + + +class BaseTool(ABC): + """ + 工具基类(简化版,来自 Step 01) + """ + + @property + @abstractmethod + def name(self) -> str: + """工具名称""" + pass + + @property + @abstractmethod + def description(self) -> str: + """工具描述""" + pass + + @abstractmethod + def execute(self, **kwargs) -> Any: + """执行工具""" + pass + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第四部分:实现具体的 Tool +# ═══════════════════════════════════════════════════════════════════════════════ + +class CalculatorTool(BaseTool): + """计算器工具""" + + @property + def name(self) -> str: + return "calculator" + + @property + def description(self) -> str: + return "执行数学计算。输入一个数学表达式,返回计算结果。" + + def execute(self, expression: str) -> str: + """执行计算""" + try: + result = eval(expression, {"__builtins__": {}}, {}) + return str(result) + except Exception as e: + return f"计算错误: {e}" + + +class JRXMLGeneratorTool(BaseTool): + """JRXML 生成工具""" + + @property + def name(self) -> str: + return "jrxml_generator" + + @property + def description(self) -> str: + return "生成 JasperReports JRXML 报表模板。根据用户需求生成 XML 代码。" + + def execute(self, requirement: str, context: str = "") -> str: + """ + 执行 JRXML 生成 + + 实际应用中,这里会调用 LLM + 这里用模拟数据演示 + """ + # 模拟生成 + template = f''' + + {requirement} + + + + + + + + + 名称 + + + + $F{{name}} + + +''' + + return template + + +class TemplateSearchTool(BaseTool): + """模板搜索工具""" + + @property + def name(self) -> str: + return "template_search" + + @property + def description(self) -> str: + return "搜索 JRXML 报表模板。在知识库中搜索相关的报表模板作为参考。" + + def execute(self, keyword: str, limit: int = 3) -> str: + """搜索模板""" + # 模拟搜索结果 + results = [ + f"模板1: 关于 '{keyword}' 的基础报表模板", + f"模板2: '{keyword}' 的高级报表模板", + f"模板3: '{keyword}' 的数据透视表模板", + ] + return "\n".join(results[:limit]) + + +class JRXMLValidatorTool(BaseTool): + """JRXML 验证工具""" + + @property + def name(self) -> str: + return "jrxml_validator" + + @property + def description(self) -> str: + return "验证 JRXML 报表模板的正确性。检查 XML 语法和 JasperReports 规范。" + + def execute(self, jrxml: str) -> dict: + """验证 JRXML""" + # 模拟验证 + issues = [] + + # 检查基本结构 + if " 根元素") + + if " dict: + """ + 决定下一步行动 + + 返回: + { + "action": "use_tool" | "respond" | "finish", + "tool_name": str, # 如果 action == "use_tool" + "tool_args": dict, # 如果 action == "use_tool" + "response": str, # 如果 action == "respond" + } + """ + user_input = state.get("user_input", "").lower() + messages = state.get("messages", []) + tool_result = state.get("tool_result") + + # 如果有工具执行结果 + if tool_result is not None: + return { + "action": "respond", + "response": f"工具执行完成,结果:{tool_result}" + } + + # 根据用户输入决定行动 + if "生成" in user_input or "报表" in user_input: + return { + "action": "use_tool", + "tool_name": "jrxml_generator", + "tool_args": { + "requirement": state.get("user_input", ""), + "context": "" + } + } + + elif "搜索" in user_input or "模板" in user_input: + return { + "action": "use_tool", + "tool_name": "template_search", + "tool_args": { + "keyword": user_input.replace("搜索", "").replace("模板", "").strip(), + "limit": 3 + } + } + + elif "验证" in user_input and "<" in user_input: + return { + "action": "use_tool", + "tool_name": "jrxml_validator", + "tool_args": { + "jrxml": user_input + } + } + + elif any(op in user_input for op in ["计算", "+", "-", "*", "/"]): + # 提取表达式 + import re + expr = re.search(r'[\d\+\-\*\/\(\)\.]+', user_input) + if expr: + return { + "action": "use_tool", + "tool_name": "calculator", + "tool_args": { + "expression": expr.group() + } + } + + else: + # 默认:直接回答 + return { + "action": "respond", + "response": f"我理解你的需求:{state.get('user_input', '')}。请问具体想做什么?" + } + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第六部分:构建 Agent +# ═══════════════════════════════════════════════════════════════════════════════ + +class SimpleAgent: + """ + 简单 Agent 实现 + + 这个 Agent 的工作流程: + 1. 接收用户输入 + 2. LLM(大脑)决定下一步行动 + 3. 执行行动(使用工具或直接回答) + 4. 返回结果 + 5. 循环直到用户结束 + """ + + def __init__(self): + # 初始化工具 + self.tools = { + "calculator": CalculatorTool(), + "jrxml_generator": JRXMLGeneratorTool(), + "template_search": TemplateSearchTool(), + "jrxml_validator": JRXMLValidatorTool(), + } + + # 初始化大脑 + self.brain = AgentBrain() + + # 初始化状态 + self.state = SimpleAgentState( + messages=[], + tool_calls=[], + status="input" + ) + + def reset(self): + """重置 Agent 状态""" + self.state = SimpleAgentState( + messages=[], + tool_calls=[], + status="input" + ) + + def get_available_tools(self) -> list[dict]: + """获取可用工具列表""" + return [ + {"name": tool.name, "description": tool.description} + for tool in self.tools.values() + ] + + def execute_tool(self, tool_name: str, **kwargs) -> Any: + """ + 执行工具 + + 封装了工具执行的逻辑,包括: + - 查找工具 + - 执行工具 + - 记录调用 + - 处理错误 + """ + if tool_name not in self.tools: + return {"error": f"工具 '{tool_name}' 不存在"} + + tool = self.tools[tool_name] + try: + result = tool.execute(**kwargs) + + # 记录工具调用 + self.state["tool_calls"].append({ + "tool": tool_name, + "args": kwargs, + "result": result + }) + + return result + + except Exception as e: + error_result = {"error": str(e)} + self.state["tool_calls"].append({ + "tool": tool_name, + "args": kwargs, + "error": str(e) + }) + return error_result + + def process(self, user_input: str) -> str: + """ + 处理用户输入 + + 这是 Agent 的主入口 + 每次用户输入,调用这个方法来处理 + """ + # 1. 保存用户输入 + self.state["user_input"] = user_input + self.state["messages"].append({ + "role": "user", + "content": user_input + }) + self.state["status"] = "thinking" + + # 2. 大脑决定行动 + decision = self.brain.decide(self.state) + + # 3. 执行行动 + if decision["action"] == "use_tool": + # 使用工具 + self.state["status"] = "acting" + tool_name = decision["tool_name"] + tool_args = decision["tool_args"] + + result = self.execute_tool(tool_name, **tool_args) + + # 保存结果到状态 + self.state["tool_result"] = result + self.state["current_action"] = f"使用了工具: {tool_name}" + + # 再次调用大脑,决定下一步 + decision = self.brain.decide(self.state) + + if decision["action"] == "respond": + response = decision["response"] + self.state["messages"].append({ + "role": "assistant", + "content": response + }) + self.state["final_output"] = response + self.state["status"] = "finished" + return response + + elif decision["action"] == "finish": + self.state["status"] = "finished" + return self.state.get("final_output", "任务完成") + + # 默认返回当前状态 + return self.state.get("current_action", "处理中...") + + def get_history(self) -> list[dict]: + """获取对话历史""" + return self.state.get("messages", []) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第七部分:使用 LangGraph 重构(可选进阶内容) +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +LangGraph 是 LangChain 提供的图结构框架 + +核心概念: + 1. StateGraph:状态图 + 2. Node:节点(执行函数) + 3. Edge:边(状态转换) + 4. Conditional Edge:条件边(根据状态决定下一步) + +为什么使用 LangGraph? + 1. 代码结构更清晰 + 2. 内置状态管理 + 3. 支持复杂的条件分支 + 4. 内置循环和回退 + +如果你想学习 LangGraph: + 请参考 jaspersoft 项目中的实际实现: + - agent/graph.py:状态图定义 + - agent/nodes.py:节点实现 + - agent/state.py:状态定义 + +这里的 SimpleAgent 展示的是核心原理 +LangGraph 只是实现方式之一 +""" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第八部分:演示代码 +# ═══════════════════════════════════════════════════════════════════════════════ + +def demo(): + """演示简单 Agent 的使用""" + print("=" * 60) + print("Step 03: 构建简单 Agent - 演示") + print("=" * 60) + + # 创建 Agent + agent = SimpleAgent() + + print("\n📋 可用工具:") + for tool in agent.get_available_tools(): + print(f" - {tool['name']}: {tool['description']}") + + # 场景 1:生成报表 + print("\n\n🔄 场景 1: 生成报表") + print("-" * 40) + print("用户: 生成一个销售报表") + + response = agent.process("生成一个销售报表") + print(f"Agent: {response}") + + # 场景 2:搜索模板 + print("\n\n🔄 场景 2: 搜索模板") + print("-" * 40) + print("用户: 搜索采购相关的模板") + + response = agent.process("搜索采购相关的模板") + print(f"Agent: {response}") + + # 场景 3:计算 + print("\n\n🔄 场景 3: 数学计算") + print("-" * 40) + print("用户: 计算 100 + 200 * 3") + + response = agent.process("计算 100 + 200 * 3") + print(f"Agent: {response}") + + # 场景 4:多轮对话 + print("\n\n🔄 场景 4: 多轮对话") + print("-" * 40) + print("用户: 我想生成一个报表") + + response1 = agent.process("我想生成一个报表") + print(f"Agent: {response1}") + + print("用户: 显示月度汇总数据") + + response2 = agent.process("显示月度汇总数据") + print(f"Agent: {response2}") + + # 展示对话历史 + print("\n\n📜 对话历史:") + for msg in agent.get_history(): + print(f" [{msg['role']}]: {msg['content'][:50]}...") + + # 展示工具调用记录 + print("\n\n🔧 工具调用记录:") + for call in agent.state.get("tool_calls", []): + if "error" in call: + print(f" - {call['tool']}: ❌ {call['error']}") + else: + result = str(call.get("result", ""))[:30] + print(f" - {call['tool']}: ✓ {result}...") + + print("\n" + "=" * 60) + print("✅ 演示完成") + print("=" * 60) + + +if __name__ == "__main__": + demo() diff --git a/step_03_simple_agent/main.py b/step_03_simple_agent/main.py new file mode 100644 index 0000000..6ec5d8c --- /dev/null +++ b/step_03_simple_agent/main.py @@ -0,0 +1,78 @@ +""" +Step 03: 构建简单 Agent - 主程序 + +运行方式: + cd step_03_simple_agent + python main.py +""" + +from concept import SimpleAgent + + +def main(): + """交互式演示 Agent""" + + print("=" * 70) + print(" Step 03: 构建简单 Agent - 交互式演示") + print("=" * 70) + print(""" +欢迎使用 Jaspersoft 报表助手! + +我可以帮你: + - 生成 JRXML 报表模板 + - 搜索相关报表模板 + - 验证报表代码 + - 执行数学计算 + +输入 'quit' 或 '退出' 结束对话 +""") + + # 创建 Agent + agent = SimpleAgent() + + # 显示可用工具 + print("\n📋 我的能力:") + for tool in agent.get_available_tools(): + print(f" • {tool['name']}: {tool['description']}") + + print("\n" + "-" * 70) + print("开始对话:") + print("-" * 70) + + # 交互式循环 + while True: + try: + user_input = input("\n👤 你: ").strip() + + if not user_input: + continue + + if user_input.lower() in ["quit", "退出", "exit", "q"]: + print("\n👋 再见!") + break + + # 处理输入 + response = agent.process(user_input) + + # 显示响应 + print(f"\n🤖 Agent: {response}") + + except KeyboardInterrupt: + print("\n\n👋 再见!") + break + except Exception as e: + print(f"\n❌ 出错了: {e}") + + # 显示会话统计 + print("\n\n" + "=" * 70) + print("📊 会话统计") + print("=" * 70) + print(f" 对话轮次: {len(agent.get_history()) // 2}") + print(f" 工具调用: {len(agent.state.get('tool_calls', []))}") + print("\n💡 继续学习:") + print(" Step 04: 添加 Memory - 记忆系统") + print(" Step 05: 添加 RAG - 知识检索") + + +if __name__ == "__main__": + main() diff --git a/step_04_memory/README.md b/step_04_memory/README.md new file mode 100644 index 0000000..4e22da6 --- /dev/null +++ b/step_04_memory/README.md @@ -0,0 +1,51 @@ +# Step 04: 添加 Memory - 记忆系统 + +## 🎯 学习目标 + +- 理解 Agent 为何需要记忆系统 +- 理解多层记忆架构:短期 / 长期 / 工作记忆 +- 学会实现上下文窗口管理 +- 理解对话压缩技术 + +--- + +## 📖 核心概念 + +### 为什么 Agent 需要 Memory? + +``` +没有 Memory: + 用户: "我叫张三" + Agent: 好的,张三先生 + 用户: "我叫啥?" + Agent: ??? 我不记得了 + +有 Memory: + 用户: "我叫张三" + Agent: 好的,张三先生 + ↓ 保存到 Memory + 用户: "我叫啥?" + Agent: 你叫张三 + ↓ 从 Memory 读取 +``` + +### 多层记忆架构 + +``` +┌─────────────────────────────────────────────────────┐ +│ Memory System │ +├─────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │ +│ │ Working │ │ Short │ │ Long │ │ +│ │ Memory │ │ Term │ │ Term │ │ +│ ├─────────────┤ ├─────────────┤ ├────────────┤ │ +│ │ 当前状态 │ │ 对话历史 │ │ 知识库 │ │ +│ │ 正在处理 │ │ 最近几轮 │ │ RAG检索 │ │ +│ │ 的任务 │ │ 对话 │ │ 持久记忆 │ │ +│ └─────────────┘ └─────────────┘ └────────────┘ │ +│ │ +└─────────────────────────────────────────────────────┘ +``` + +请打开 `concept.py` 查看实现。 diff --git a/step_04_memory/concept.py b/step_04_memory/concept.py new file mode 100644 index 0000000..4f0887f --- /dev/null +++ b/step_04_memory/concept.py @@ -0,0 +1,429 @@ +""" +Step 04: Memory - 记忆系统 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +🎓 本节内容: + 1. 为什么 Agent 需要 Memory? + 2. 多层记忆架构 + 3. 上下文窗口管理 + 4. 对话压缩技术 + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from typing import TypedDict, List, Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime +import json + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第一部分:理解为什么需要 Memory +# ═══════════════════════════════════════════════════════════════════════════════ + +""" +回顾 Step 03 的 SimpleAgent: + + 问题:对话历史都存在 state['messages'] 里 + 问题:如果对话很长,发送给 LLM 的 token 会越来越多 + 问题:LLM 有上下文长度限制,不能无限增长 + +解决方案:多层 Memory 系统 + + 1. Working Memory(工作记忆):当前正在处理的任务 + 2. Short-Term Memory(短期记忆):最近的对话轮次 + 3. Long-Term Memory(长期记忆):持久化的知识 + +核心思想: + - 不是所有信息都需要保留 + - 重要的信息保留,不重要的压缩或丢弃 + - 平衡"记住"和"效率" +""" + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第二部分:Memory 的数据类型 +# ═══════════════════════════════════════════════════════════════════════════════ + +@dataclass +class Message: + """对话消息""" + role: str # "user" / "assistant" / "system" + content: str + timestamp: str = "" + metadata: Dict[str, Any] = None + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().isoformat() + if self.metadata is None: + self.metadata = {} + + def to_dict(self) -> dict: + return { + "role": self.role, + "content": self.content, + "timestamp": self.timestamp, + "metadata": self.metadata, + } + + +@dataclass +class MemorySnapshot: + """记忆快照 - 用于保存和恢复状态""" + state: Dict[str, Any] # 关键状态 + messages: List[Message] # 消息历史 + key_info: Dict[str, Any] # 关键信息摘要 + timestamp: str = "" + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().isoformat() + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第三部分:Working Memory(工作记忆) +# ═══════════════════════════════════════════════════════════════════════════════ + +class WorkingMemory: + """ + 工作记忆 - 当前正在处理的任务 + + 特点: + - 容量小,只有几项 + - 频繁读写 + - 断电即失 + - 存放当前任务的关键信息 + """ + + def __init__(self, capacity: int = 7): + """ + Args: + capacity: 最大容量,超过后自动清理 + """ + self.capacity = capacity + self._data: Dict[str, Any] = {} + + def remember(self, key: str, value: Any) -> None: + """记住新信息""" + self._data[key] = { + "value": value, + "timestamp": datetime.now().isoformat() + } + self._cleanup() + + def recall(self, key: str, default: Any = None) -> Any: + """回忆信息""" + item = self._data.get(key) + if item: + return item["value"] + return default + + def forget(self, key: str) -> None: + """忘记信息""" + if key in self._data: + del self._data[key] + + def clear(self) -> None: + """清空所有记忆""" + self._data = {} + + def get_all(self) -> Dict[str, Any]: + """获取所有记忆""" + return {k: v["value"] for k, v in self._data.items()} + + def _cleanup(self) -> None: + """当容量超限时,清理最旧的信息""" + if len(self._data) > self.capacity: + # 按时间排序,删除最旧的 + sorted_items = sorted( + self._data.items(), + key=lambda x: x[1]["timestamp"] + ) + # 删除最早的 1/3 + delete_count = self.capacity // 3 + for key, _ in sorted_items[:delete_count]: + del self._data[key] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第四部分:Short-Term Memory(短期记忆) +# ═══════════════════════════════════════════════════════════════════════════════ + +class ShortTermMemory: + """ + 短期记忆 - 最近的对对话 + + 特点: + - 保存最近 N 轮对话 + - 超出部分要么压缩,要么丢弃 + - 容易被遗忘 + - 模拟人类的短期记忆 + """ + + def __init__(self, max_messages: int = 20): + """ + Args: + max_messages: 最大保存消息数 + """ + self.max_messages = max_messages + self.messages: List[Message] = [] + + def add(self, role: str, content: str, **metadata) -> None: + """添加消息""" + msg = Message(role=role, content=content, metadata=metadata) + self.messages.append(msg) + self._trim() + + def get_recent(self, n: int = 10) -> List[Message]: + """获取最近 N 条消息""" + return self.messages[-n:] + + def get_all(self) -> List[Message]: + """获取所有消息""" + return self.messages.copy() + + def summarize_older(self, keep_recent: int = 5) -> str: + """ + 将较早的消息压缩为摘要 + + Args: + keep_recent: 最近多少条保持不变 + + Returns: + 压缩后的摘要文本 + """ + if len(self.messages) <= keep_recent: + return "" + + older = self.messages[:-keep_recent] + # 简化为摘要 + summary_parts = [] + for msg in older: + role_label = "用户" if msg.role == "user" else "助手" + content_preview = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content + summary_parts.append(f"{role_label}: {content_preview}") + + return "\n".join(summary_parts) + + def clear(self) -> None: + """清空记忆""" + self.messages = [] + + def _trim(self) -> None: + """超出容量时,删除最旧的消息""" + while len(self.messages) > self.max_messages: + self.messages.pop(0) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第五部分:Long-Term Memory(长期记忆) +# ═══════════════════════════════════════════════════════════════════════════════ + +class LongTermMemory: + """ + 长期记忆 - 持久化的知识 + + 特点: + - 存储在磁盘或数据库 + - 持久保存 + - 需要检索才能获取 + - 容量大 + - 对应 RAG(后续 Step 05) + """ + + def __init__(self): + """ + 初始化长期记忆 + + 实际应用中,这里会初始化: + - 向量数据库(ChromaDB/Pinecone) + - 键值存储(Redis) + - 图数据库(Neo4j) + """ + self._storage: Dict[str, Any] = {} + + def memorize(self, key: str, value: Any, tags: List[str] = None) -> None: + """ + 存储信息到长期记忆 + + Args: + key: 唯一标识 + value: 要存储的内容 + tags: 标签,用于分类和检索 + """ + self._storage[key] = { + "value": value, + "tags": tags or [], + "created_at": datetime.now().isoformat(), + "accessed_at": datetime.now().isoformat(), + } + + def recall(self, key: str) -> Optional[Any]: + """ + 从长期记忆中检索 + + Args: + key: 唯一标识 + + Returns: + 存储的内容,如果不存在返回 None + """ + if key in self._storage: + # 更新访问时间 + self._storage[key]["accessed_at"] = datetime.now().isoformat() + return self._storage[key]["value"] + return None + + def search(self, query: str, tags: List[str] = None) -> List[Any]: + """ + 搜索长期记忆 + + 实际应用中,这里会使用向量相似度搜索 + 这里用简单的标签匹配演示 + """ + results = [] + for item in self._storage.values(): + if tags: + # 标签匹配 + if any(tag in item["tags"] for tag in tags): + results.append(item["value"]) + elif query in str(item["value"]): + results.append(item["value"]) + return results + + def forget(self, key: str) -> None: + """删除长期记忆""" + if key in self._storage: + del self._storage[key] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 第六部分:完整的 Memory System +# ═══════════════════════════════════════════════════════════════════════════════ + +class MemorySystem: + """ + 完整的记忆系统 + + 整合三种记忆: + - Working Memory: 当前任务 + - Short-Term Memory: 最近对话 + - Long-Term Memory: 持久知识 + """ + + def __init__(self): + self.working = WorkingMemory(capacity=7) + self.short_term = ShortTermMemory(max_messages=20) + self.long_term = LongTermMemory() + + def remember_task(self, task_id: str, task_info: Dict) -> None: + """记住当前任务""" + self.working.remember("current_task", task_info) + + def get_current_task(self) -> Optional[Dict]: + """获取当前任务""" + return self.working.recall("current_task") + + def add_message(self, role: str, content: str) -> None: + """添加对话消息""" + self.short_term.add(role, content) + + def get_context(self, include_older_summary: bool = True) -> str: + """ + 获取发送给 LLM 的上下文 + + 整合所有记忆,形成完整的上下文 + """ + context_parts = [] + + # 当前任务 + current_task = self.get_current_task() + if current_task: + context_parts.append(f"[当前任务]\n{json.dumps(current_task, ensure_ascii=False)}") + + # 最近对话 + recent = self.short_term.get_recent(10) + if recent: + context_parts.append("[最近对话]") + for msg in recent: + context_parts.append(f"- {msg.role}: {msg.content[:100]}") + + # 较早对话摘要 + if include_older_summary: + older_summary = self.short_term.summarize_older(keep_recent=5) + if older_summary: + context_parts.append(f"[较早对话摘要]\n{older_summary}") + + return "\n\n".join(context_parts) + + def clear_session(self) -> None: + """清除会话相关的记忆(保留长期记忆)""" + self.working.clear() + self.short_term.clear() + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 演示代码 +# ═══════════════════════════════════════════════════════════════════════════════ + +def demo(): + """演示记忆系统""" + print("=" * 60) + print("Step 04: Memory - 记忆系统演示") + print("=" * 60) + + # 创建记忆系统 + memory = MemorySystem() + + # 1. 记住当前任务 + print("\n📝 记住当前任务") + memory.remember_task("task_001", { + "type": "生成报表", + "requirement": "销售月报", + "status": "进行中" + }) + print(f" 当前任务: {memory.get_current_task()}") + + # 2. 添加对话 + print("\n💬 添加对话消息") + messages = [ + ("user", "帮我生成一个销售报表"), + ("assistant", "好的,请问你需要显示哪些数据?"), + ("user", "显示月度汇总,包括销售额和数量"), + ("assistant", "明白了,正在生成..."), + ("user", "再添加一个增长率"), + ("assistant", "好的,正在添加..."), + ] + + for role, content in messages: + memory.add_message(role, content) + print(f" 添加: [{role}] {content[:30]}...") + + # 3. 获取上下文 + print("\n📋 获取完整上下文") + context = memory.get_context() + print(context) + + # 4. 获取较早对话摘要 + print("\n📝 较早对话摘要") + summary = memory.short_term.summarize_older(keep_recent=3) + print(summary) + + # 5. 长期记忆 + print("\n💾 添加长期记忆") + memory.long_term.memorize( + key="user_preferences", + value={"name": "张三", "default_report": "销售报表"}, + tags=["用户信息", "偏好"] + ) + result = memory.long_term.recall("user_preferences") + print(f" 检索结果: {result}") + + print("\n" + "=" * 60) + print("✅ 演示完成") + print("=" * 60) + + +if __name__ == "__main__": + demo() diff --git a/step_05_07_advanced/README.md b/step_05_07_advanced/README.md new file mode 100644 index 0000000..9c689df --- /dev/null +++ b/step_05_07_advanced/README.md @@ -0,0 +1,235 @@ +# Step 05-07: RAG / Self-Correction / Multi-Agent + +> 这些步骤是进阶内容,包含核心概念和实现代码。 + +## Step 05: RAG - 知识检索 + +### 核心概念 + +RAG = Retrieval-Augmented Generation(检索增强生成) + +``` +┌─────────────────────────────────────────────────────┐ +│ RAG 流程 │ +├─────────────────────────────────────────────────────┤ +│ │ +│ 用户问题 ──▶ 编码为向量 ──▶ 向量数据库检索 │ +│ │ │ +│ ▼ │ +│ 找到最相关的文档 │ +│ │ │ +│ ▼ │ +│ 把文档和问题一起发送给 LLM │ +│ │ │ +│ ▼ │ +│ 生成答案 │ +│ │ +└─────────────────────────────────────────────────────┘ +``` + +### 为什么需要 RAG? + +1. LLM 的知识有截止日期 +2. LLM 不知道你私有的数据 +3. RAG 让 LLM 能"查阅"外部知识 + +### 关键组件 + +| 组件 | 作用 | +|------|------| +| Embedding Model | 把文本变成向量 | +| Vector Database | 存储和检索向量 | +| Retrieval | 找到最相关的文档 | +| Generation | 用检索结果生成答案 | + +### 简化实现 + +```python +class SimpleRAG: + """简化版 RAG 系统""" + + def __init__(self): + # 文档存储 + self.documents = [] + # 向量存储(简化版,用关键词) + self.vectors = {} + + def add_document(self, text: str, metadata: dict = None): + """添加文档""" + self.documents.append({ + "text": text, + "metadata": metadata or {} + }) + + def retrieve(self, query: str, top_k: int = 3) -> list: + """检索相关文档""" + # 简化版:基于关键词匹配 + results = [] + for doc in self.documents: + # 计算简单相关性分数 + score = sum(1 for word in query if word in doc["text"].lower()) + if score > 0: + results.append((score, doc)) + # 排序并返回 top_k + results.sort(key=lambda x: x[0], reverse=True) + return [doc for _, doc in results[:top_k]] + + def generate(self, query: str, llm) -> str: + """RAG 生成""" + docs = self.retrieve(query) + context = "\n".join([d["text"] for d in docs]) + + prompt = f""" +根据以下上下文回答问题: + +上下文: +{context} + +问题:{query} + +答案: +""" + return llm.invoke(prompt) +``` + +--- + +## Step 06: Self-Correction - 自我修正 + +### 核心概念 + +Self-Correction = 让 Agent 能够自我发现并修复错误 + +``` +┌─────────────────────────────────────────────────────┐ +│ Self-Correction 流程 │ +├─────────────────────────────────────────────────────┤ +│ │ +│ 生成结果 ──▶ 验证 ──▶ 有问题? │ +│ │ │ +│ ┌────┴────┐ │ +│ │ │ │ +│ 是 否 │ +│ │ │ │ +│ ▼ ▼ │ +│ 分析错误 返回结果 │ +│ │ │ +│ ▼ │ +│ 生成修复方案 │ +│ │ │ +│ ▼ │ +│ 重新生成 ──▶ 再次验证 │ +│ │ +└─────────────────────────────────────────────────────┘ +``` + +### 实现要点 + +```python +class SelfCorrectingAgent: + """自我修正 Agent""" + + def __init__(self): + self.max_retries = 3 + + def try_generate(self, requirement: str) -> str: + """带自我修正的生成""" + for attempt in range(self.max_retries): + # 1. 生成 + result = self.generate(requirement) + + # 2. 验证 + validation = self.validate(result) + + # 3. 检查是否通过 + if validation["passed"]: + return result + + # 4. 分析错误 + error = validation["error"] + print(f"尝试 {attempt + 1} 失败: {error}") + + # 5. 准备修复 + requirement = self.prepare_fix(requirement, error, result) + + return f"经过 {self.max_retries} 次尝试仍失败" +``` + +--- + +## Step 07: Multi-Agent - 多 Agent 协作 + +### 核心概念 + +Multi-Agent = 多个专门的 Agent 协同工作 + +``` +┌─────────────────────────────────────────────────────┐ +│ Multi-Agent 架构 │ +├─────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ │ +│ │ Orchestrator │ │ +│ │ (协调者) │ │ +│ └──────┬───────┘ │ +│ │ │ +│ ┌───────────────────┼───────────────────┐ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Generator│ │ Validator│ │ Searcher │ │ +│ │ (生成者) │ │ (验证者) │ │ (搜索者) │ │ +│ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +└─────────────────────────────────────────────────────┘ +``` + +### 协作模式 + +| 模式 | 说明 | 适用场景 | +|------|------|---------| +| 串行 | A → B → C 依次执行 | 步骤有依赖 | +| 并行 | A / B / C 同时执行 | 步骤独立 | +| 循环 | A → B → A → B 循环 | 需要反复验证 | + +### 简化实现 + +```python +class MultiAgentSystem: + """多 Agent 协作系统""" + + def __init__(self): + # 注册各个 Agent + self.agents = { + "generator": GeneratorAgent(), + "validator": ValidatorAgent(), + "searcher": SearcherAgent(), + } + # 协调器 + self.orchestrator = Orchestrator(self.agents) + + def process(self, requirement: str) -> str: + """协调多个 Agent 处理请求""" + # 1. 搜索相关知识 + context = self.agents["searcher"].search(requirement) + + # 2. 生成(可能需要多轮) + for attempt in range(3): + draft = self.agents["generator"].generate(requirement, context) + + # 3. 验证 + validation = self.agents["validator"].validate(draft) + + if validation["passed"]: + return validation["result"] + + return "处理失败" +``` + +--- + +## 📚 学习资源 + +- [LangGraph 文档](https://langchain-ai.github.io/langgraph/) +- [RAG 最佳实践](https://www.pinecone.io/learn/rag/) +- [Multi-Agent 系统设计](https://arxiv.org/abs/2308.03688) diff --git a/step_05_07_advanced/concept.py b/step_05_07_advanced/concept.py new file mode 100644 index 0000000..baafb96 --- /dev/null +++ b/step_05_07_advanced/concept.py @@ -0,0 +1,286 @@ +""" +Step 05-07: RAG / Self-Correction / Multi-Agent + +进阶内容代码示例 +""" + +# ═══════════════════════════════════════════════════════════════════════════════════════ +# RAG 实现 +# ═══════════════════════════════════════════════════════════════════════════════════════ + +class SimpleRAG: + """ + 简化版 RAG 系统 + + 实际应用中请使用: + - ChromaDB / Pinecone / Weaviate(向量数据库) + - sentence-transformers / OpenAI Embeddings(向量模型) + """ + + def __init__(self): + self.documents = [] + + def add_document(self, text: str, metadata: dict = None): + """添加文档""" + self.documents.append({ + "text": text, + "metadata": metadata or {}, + "id": len(self.documents) + }) + + def retrieve(self, query: str, top_k: int = 3) -> list: + """检索相关文档(简化版:基于关键词)""" + results = [] + query_words = set(query.lower().split()) + + for doc in self.documents: + doc_words = set(doc["text"].lower().split()) + # 简单的 Jaccard 相似度 + intersection = query_words & doc_words + union = query_words | doc_words + if union: + score = len(intersection) / len(union) + results.append((score, doc)) + + results.sort(key=lambda x: x[0], reverse=True) + return [doc for _, doc in results[:top_k]] + + def generate(self, query: str, context_only: bool = False): + """ + 生成答案 + + 如果 context_only=True,只返回检索到的上下文 + 否则进行 RAG 生成(需要接入 LLM) + """ + docs = self.retrieve(query) + context = "\n\n".join([ + f"[来源: {d['metadata'].get('source', '未知')}]\n{d['text']}" + for d in docs + ]) + return context + + +# ═══════════════════════════════════════════════════════════════════════════════════════ +# Self-Correction 实现 +# ═══════════════════════════════════════════════════════════════════════════════════════ + +@dataclass +class ValidationResult: + """验证结果""" + passed: bool + score: float + issues: List[str] + suggestion: str = "" + + +class SelfCorrectingAgent: + """ + 自我修正 Agent + + 工作流程: + 1. 生成初始结果 + 2. 验证结果 + 3. 如果有问题,分析错误并修复 + 4. 循环直到通过或达到最大重试次数 + """ + + def __init__(self, generator, validator): + self.generator = generator + self.validator = validator + self.max_retries = 3 + + def generate_with_correction(self, requirement: str) -> dict: + """带自我修正的生成""" + history = [] + current_requirement = requirement + + for attempt in range(self.max_retries): + # 1. 生成 + result = self.generator.generate(current_requirement) + history.append({ + "attempt": attempt + 1, + "requirement": current_requirement, + "result": result + }) + + # 2. 验证 + validation = self.validator.validate(result) + history[-1]["validation"] = validation + + if validation.passed: + return { + "success": True, + "result": result, + "attempts": attempt + 1, + "history": history + } + + # 3. 分析错误,准备修复 + print(f"尝试 {attempt + 1} 失败: {validation.issues}") + current_requirement = self._prepare_fix( + requirement, + validation, + result + ) + + return { + "success": False, + "error": "达到最大重试次数", + "history": history + } + + def _prepare_fix(self, original: str, validation: ValidationResult, result) -> str: + """准备修复提示""" + issues_text = "\n".join(f"- {issue}" for issue in validation.issues) + + return f""" +原始需求:{original} + +上次生成结果: +{result} + +验证发现的问题: +{issues_text} + +验证建议:{validation.suggestion} + +请根据以上信息,修正生成结果。 +""" + + +# ═══════════════════════════════════════════════════════════════════════════════════════ +# Multi-Agent 实现 +# ═══════════════════════════════════════════════════════════════════════════════════════ + +from dataclasses import dataclass, field +from typing import Dict, List, Callable + + +@dataclass +class AgentMessage: + """Agent 之间的消息""" + from_agent: str + to_agent: str + content: Any + message_type: str # "request" / "response" / "broadcast" + + +class Agent: + """基础 Agent 类""" + name: str + + def process(self, input_data: Any) -> Any: + """处理输入,返回结果""" + raise NotImplementedError + + +class MultiAgentSystem: + """ + 多 Agent 协作系统 + + 组件: + - agents: 注册的 Agent 字典 + - orchestrator: 协调器,决定消息路由 + - message_queue: 消息队列 + """ + + def __init__(self): + self.agents: Dict[str, Agent] = {} + self.message_queue: List[AgentMessage] = [] + self.history: List[AgentMessage] = [] + + def register(self, agent: Agent): + """注册 Agent""" + self.agents[agent.name] = agent + + def send_message(self, from_agent: str, to_agent: str, content: Any, + msg_type: str = "request"): + """发送消息""" + msg = AgentMessage( + from_agent=from_agent, + to_agent=to_agent, + content=content, + message_type=msg_type + ) + self.message_queue.append(msg) + + def broadcast(self, from_agent: str, content: Any): + """广播消息给所有 Agent""" + for agent_name in self.agents: + if agent_name != from_agent: + self.send_message(from_agent, agent_name, content, "broadcast") + + def process(self, requirement: str) -> Any: + """ + 处理请求 + + 简化实现:顺序执行各个 Agent + """ + # 1. 搜索 + searcher = self.agents.get("searcher") + context = searcher.process(requirement) if searcher else "" + + # 2. 生成 + generator = self.agents.get("generator") + result = generator.process({"requirement": requirement, "context": context}) if generator else requirement + + # 3. 验证 + validator = self.agents.get("validator") + validation = validator.process(result) if validator else {"passed": True} + + if not validation.get("passed", True): + return {"error": "验证失败", "validation": validation} + + return result + + +def demo(): + """演示""" + print("=" * 60) + print("Step 05-07: 进阶功能演示") + print("=" * 60) + + # RAG 演示 + print("\n📚 RAG 演示") + rag = SimpleRAG() + rag.add_document("JasperReports 是一个 Java 报表库", {"source": "文档1"}) + rag.add_document("JRXML 是 JasperReports 的报表模板格式", {"source": "文档2"}) + rag.add_document("可以使用 LLM 生成 JRXML 代码", {"source": "文档3"}) + + result = rag.retrieve("JasperReports 是什么") + print(f" 查询 'JasperReports 是什么'") + for doc in result: + print(f" - {doc['text']} (来源: {doc['metadata']['source']})") + + # Multi-Agent 演示 + print("\n\n🤖 Multi-Agent 演示") + + class DemoSearcher(Agent): + name = "searcher" + def process(self, input_data): + print(f" [{self.name}] 搜索相关资料...") + return "找到相关模板和文档" + + class DemoGenerator(Agent): + name = "generator" + def process(self, input_data): + print(f" [{self.name}] 生成报表...") + return "生成的报表" + + class DemoValidator(Agent): + name = "validator" + def process(self, input_data): + print(f" [{self.name}] 验证结果...") + return {"passed": True} + + system = MultiAgentSystem() + system.register(DemoSearcher()) + system.register(DemoGenerator()) + system.register(DemoValidator()) + + result = system.process("生成销售报表") + print(f"\n 最终结果: {result[:50]}...") + + +if __name__ == "__main__": + demo()