Initial commit: jaspersoft-agent-learn teaching project

This commit is contained in:
zy187
2026-05-29 23:22:18 +08:00
commit 05bb511aab
20 changed files with 4476 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
# 环境变量配置
# LLM API 配置
OPENAI_API_KEY=your_openai_api_key_here
ANTHROPIC_API_KEY=your_anthropic_api_key_here
# LLM 配置
LLM_PROVIDER=anthropic # 或 openai
LLM_MODEL=MiniMax-M2.7 # 或 gpt-4o
LLM_MAX_TOKENS=8192
# RAG 配置
RAG_CHROMA_PATH=./db/chroma
RAG_COLLECTION_NAME=jrxml_chunks
RAG_EMBED_MODEL=sentence-transformers/all-MiniLM-L6-v2
# 验证服务
VALIDATION_SERVICE_URL=http://localhost:8001
# 日志配置
LOG_LEVEL=INFO
+124
View File
@@ -0,0 +1,124 @@
# Jaspersoft Learn - AI Agent 开发教学项目
> **目标**:手把手教你从零构建一个完整的 AI Agent 系统
> **教学方式**:代码即文档,每一行代码都有详细解释
> **前置知识**:Python 基础(2年经验足够)
---
## 🎯 学习路径
本项目采用"渐进式构建"方式,从最简单的概念开始,逐步添加复杂度。
```
学习顺序:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Step 01 ──▶ Step 02 ──▶ Step 03 ──▶ Step 04 ──▶ Step 05 ──▶ Step 06 ──▶ Step 07
│ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼
Tool基础 State状态 简单Agent Memory RAG检索 自我修正 Multi-Agent
管理 记忆系统
```
---
## 📚 每个 Step 的内容
| Step | 主题 | 核心概念 | 完成后你将理解 |
|------|------|---------|--------------|
| **01** | Tool 工具系统 | 什么是 Tool,为什么需要 Tool | 如何设计可扩展的工具系统 |
| **02** | State 状态管理 | 什么是 Agent State | 如何在 Agent 中传递信息 |
| **03** | 简单 Agent | LLM + Tool + Loop = Agent | Agent 的核心工作原理 |
| **04** | Memory 记忆 | 短期/长期/工作记忆 | 如何让 Agent "记住"上下文 |
| **05** | RAG 知识检索 | 检索增强生成 | 如何让 Agent 知道"私有知识" |
| **06** | Self-Correction | 自我修正循环 | 如何让 Agent 自动修复错误 |
| **07** | Multi-Agent | 多 Agent 协作 | 如何构建 Agent 团队 |
---
## 🚀 快速开始
### 环境准备
```bash
# 1. 创建虚拟环境
python -m venv venv
.\venv\Scripts\activate
# 2. 安装依赖
pip install langchain langchain-openai langchain-anthropic python-dotenv
# 3. 配置环境变量
cp .env.example .env
# 编辑 .env,填入你的 API Key
```
### 开始学习
```bash
# Step 01: 理解 Tool
cd step_01_tools
python main.py
# Step 02: 理解 State
cd ../step_02_state
python main.py
# ...以此类推
```
---
## 📖 学习方法
### 每一步的结构
```
step_XX_xxx/
├── README.md # 本步骤的概念讲解
├── concept.py # 核心概念代码(有详细注释)
├── exercise.py # 练习题
├── exercise_answer.py # 练习答案(先自己思考再看)
└── main.py # 可运行的示例
```
### 如何使用本项目
1. **先读 README**:理解这个 Step 要学什么
2. **再看 concept.py**:跟着代码理解概念
3. **做 exercise.py**:巩固理解
4. **对照答案**:检查自己的理解
5. **运行 main.py**:看完整示例
---
## 🎓 最终目标
完成所有 Step 后,你将能够:
- [ ] 理解 Agent 的核心工作原理
- [ ] 设计可扩展的工具系统
- [ ] 实现状态管理和记忆系统
- [ ] 构建完整的 RAG 知识检索
- [ ] 实现 Agent 自我修正能力
- [ ] 设计多 Agent 协作系统
- [ ] 阅读并理解 jaspersoft 项目的源码
---
## 🔗 参考资料
- [LangGraph 官方文档](https://langchain-ai.github.io/langgraph/)
- [LangChain Tool Calling](https://python.langchain.com/docs/how_to/tool_calling/)
- [Prompt Engineering Guide](https://www.promptingguide.ai/zh)
---
## 📝 贡献指南
发现错误或有改进建议?欢迎提交 PR!
---
*本项目是学习用途,代码尽量写得清晰易懂,而非追求极致性能。*
+82
View File
@@ -0,0 +1,82 @@
# Step 01: 理解 Tool - 工具系统基础
## 🎯 学习目标
- 理解什么是 Tool(工具)
- 理解为什么 Agent 需要 Tool
- 学会设计一个可扩展的工具系统
- 理解 Tool 的核心要素:name、description、execute
---
## 📖 概念讲解
### 什么是 Tool
在 AI Agent 系统中,**Tool(工具)** 是 Agent 与外部世界交互的方式。
```
没有 Tool 的 LLM
┌──────────────┐
│ 用户输入 │
└──────┬───────┘
┌──────────────┐
│ LLM │ ← LLM 只能"想",不能"做"
│ (只能思考) │
└──────┬───────┘
┌──────────────┐
│ 返回答案 │ ← 答案可能不准确,没有执行能力
└──────────────┘
有 Tool 的 Agent
┌──────────────┐
│ 用户输入 │
└──────┬───────┘
┌──────────────┐
│ LLM │
│ (思考 + 决策) │
└──────┬───────┘
┌──────────────┐
│ 需要执行工具? │──── 是 ──▶ 调用 Tool
└──────┬───────┘ │
│ ▼
│ 否 ┌──────────┐
▼ │ 执行 Tool │
┌──────────────┐ │ (访问外部) │
│ 返回答案 │ └─────┬─────┘
└──────────────┘ │
把执行结果反馈给 LLM
```
### 为什么需要 Tool
1. **LLM 不知道最新信息**Tool 可以搜索网页、查数据库
2. **LLM 不能执行操作**Tool 可以写文件、调用 API
3. **LLM 可能有幻觉**Tool 可以验证信息
4. **LLM 需要精确计算**Tool 可以调用计算器
### Tool 的核心要素
每个 Tool 都有三个核心要素:
```python
class Tool:
name: str # 工具名称(LLM 通过名字选择工具)
description: str # 工具描述(LLM 通过描述理解何时使用)
execute: function # 执行函数(实际做事的代码)
```
---
## 💻 代码实现
请打开 `concept.py` 查看详细代码注释。
+19
View File
@@ -0,0 +1,19 @@
# Jaspersoft Learn - AI Agent 开发教学项目
from .concept import (
BaseTool,
ToolResult,
ToolRegistry,
CalculatorTool,
SearchTool,
JaspersoftCodeGeneratorTool,
)
__all__ = [
"BaseTool",
"ToolResult",
"ToolRegistry",
"CalculatorTool",
"SearchTool",
"JaspersoftCodeGeneratorTool",
]
+592
View File
@@ -0,0 +1,592 @@
"""
Step 01: 理解 Tool - 工具系统基础
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎓 本节内容:
1. 什么是 Tool
2. 如何定义一个 Tool
3. 如何设计可扩展的工具系统?
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from dataclasses import dataclass
import json
# ═══════════════════════════════════════════════════════════════════════════════
# 第一部分:理解 Tool 的基本结构
# ═══════════════════════════════════════════════════════════════════════════════
"""
在开始写代码之前,我们先理解 Tool 的本质:
Tool = 名称 + 描述 + 执行逻辑
为什么这样设计?因为 LLM 选择工具时只看:
1. 工具名称(叫什么)
2. 工具描述(什么时候用)
然后 LLM 会决定:是否调用这个工具?调用时传什么参数?
所以一个好的 Tool 设计,必须:
- 名称清晰、一目了然
- 描述准确、告诉 LLM 何时使用
- 执行逻辑独立、不影响其他工具
"""
# ═══════════════════════════════════════════════════════════════════════════════
# 第二部分:定义 Tool 的数据结构
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass # Python 数据类,自动生成 __init__ 等方法
class ToolResult:
"""
Tool 执行结果的数据结构
为什么需要单独定义这个?
因为 Tool 执行可能成功,也可能失败,我们需要统一格式来传递结果。
属性说明:
success: 是否执行成功
result: 执行结果(如果成功)
error: 错误信息(如果失败)
metadata: 额外元数据(如执行时间、使用的参数等)
"""
success: bool
result: Any = None
error: Optional[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
"""初始化后确保 metadata 不为 None"""
if self.metadata is None:
self.metadata = {}
# ═══════════════════════════════════════════════════════════════════════════════
# 第三部分:定义抽象 Tool 基类(重要概念)
# ═══════════════════════════════════════════════════════════════════════════════
class BaseTool(ABC):
"""
所有工具的抽象基类
为什么需要抽象基类?
因为我们希望所有工具都有统一的接口,这样:
1. 任意 Tool 都可以被统一管理
2. 新增 Tool 时不需要修改已有代码(开闭原则)
3. 可以批量操作所有 Tool
抽象基类 = 定义"接口规范",子类负责"具体实现"
"""
# ============================================================
# 属性:每个 Tool 必须有的特性
# ============================================================
@property
@abstractmethod
def name(self) -> str:
"""
工具名称
为什么用 @property
因为 name 通常是只读的,我们不希望运行时被随意修改
为什么用 @abstractmethod
因为这是一个"抽象方法",所有子类必须实现
如果不实现,Python 会报错,强制你提供具体实现
"""
pass
@property
@abstractmethod
def description(self) -> str:
"""
工具描述
这是 LLM 理解"何时使用这个工具"的关键!
描述越准确,LLM 越能正确选择工具。
好的描述应该包含:
1. 工具能做什么
2. 输入参数是什么
3. 输出结果是什么
4. 适用的场景
"""
pass
# ============================================================
# 方法:Tool 的核心执行逻辑
# ============================================================
@abstractmethod
def execute(self, **kwargs) -> ToolResult:
"""
执行工具
参数使用 **kwargs 的原因:
不同的 Tool 可能需要不同的参数
用 **kwargs 可以接受任意参数,具体由子类决定
返回 ToolResult 而不是直接返回值的原因:
1. 统一成功/失败的判断
2. 可以附带错误信息
3. 可以附带元数据(执行时间、使用的参数等)
为什么是抽象方法?
因为"如何执行"是每个工具自己的事,基类不知道具体逻辑
"""
pass
# ============================================================
# 辅助方法:让 Tool 更容易使用
# ============================================================
def to_dict(self) -> dict:
"""
将 Tool 转换为字典格式
这个方法用于:
1. 给 LLM 展示可用的工具列表
2. 序列化/反序列化工具配置
返回格式:
{
"name": "工具名称",
"description": "工具描述",
"parameters": {...} # 可选,参数schema
}
"""
return {
"name": self.name,
"description": self.description,
}
def __repr__(self) -> str:
"""调试时显示工具信息"""
return f"<Tool: {self.name}>"
# ═══════════════════════════════════════════════════════════════════════════════
# 第四部分:实现具体的 Tool
# ═══════════════════════════════════════════════════════════════════════════════
class CalculatorTool(BaseTool):
"""
计算器工具示例
这个工具演示:
1. 如何定义一个简单的 Tool
2. 如何处理输入参数
3. 如何返回结果
4. 如何处理错误
"""
@property
def name(self) -> str:
return "calculator"
@property
def description(self) -> str:
return """
计算器工具,执行数学运算。
输入:
expression: str - 数学表达式,如 "2 + 3""10 * 5"
输出:
计算结果(数字)
示例:
expression="2 + 3" -> 5
expression="10 / 2" -> 5.0
expression="2 ** 3" -> 8
"""
def execute(self, **kwargs) -> ToolResult:
"""
执行计算
为什么用 try-except
因为 eval() 可能执行恶意代码或语法错误
我们需要捕获异常,返回友好的错误信息
"""
expression = kwargs.get("expression")
# 参数验证
if expression is None:
return ToolResult(
success=False,
error="缺少必需参数 'expression'"
)
if not isinstance(expression, str):
return ToolResult(
success=False,
error=f"参数 'expression' 应该是字符串,实际是 {type(expression)}"
)
try:
# 使用 eval 计算表达式
# 注意:生产环境中应该用更安全的计算方式
# 这里用 eval 是为了简化,实际推荐用 ast.literal_eval 或专用计算库
result = eval(expression, {"__builtins__": {}}, {})
return ToolResult(
success=True,
result=result,
metadata={"expression": expression}
)
except SyntaxError as e:
return ToolResult(
success=False,
error=f"表达式语法错误: {e}"
)
except ZeroDivisionError:
return ToolResult(
success=False,
error="除数不能为零"
)
except Exception as e:
return ToolResult(
success=False,
error=f"计算错误: {e}"
)
class SearchTool(BaseTool):
"""
搜索工具示例
这个工具演示:
1. 如何模拟一个搜索功能
2. 如何返回结构化数据
3. 如何设计工具的"真实感"
实际应用中,这里会调用真实的搜索 API
"""
@property
def name(self) -> str:
return "web_search"
@property
def description(self) -> str:
return """
网络搜索工具,在互联网上搜索信息。
输入:
query: str - 搜索关键词
limit: int - 返回结果数量(默认5条)
输出:
搜索结果列表,每条包含标题、链接、摘要
适用场景:
- 查找最新资讯
- 搜索技术文档
- 查找某个问题的答案
"""
def execute(self, **kwargs) -> ToolResult:
"""
执行搜索
注意:这里用模拟数据演示
真实场景中,你需要调用 Bing/Google API
"""
query = kwargs.get("query", "")
limit = kwargs.get("limit", 5)
if not query:
return ToolResult(
success=False,
error="搜索关键词不能为空"
)
# 模拟搜索结果
# 真实场景:这里调用 search_api(query)
mock_results = [
{
"title": f"关于 '{query}' 的官方文档",
"url": "https://example.com/doc",
"snippet": f"这是关于 {query} 的详细官方文档..."
},
{
"title": f"{query} 入门教程",
"url": "https://example.com/tutorial",
"snippet": f"学习 {query} 的快速入门指南..."
},
]
return ToolResult(
success=True,
result=mock_results[:limit],
metadata={"query": query, "count": len(mock_results[:limit])}
)
class JaspersoftCodeGeneratorTool(BaseTool):
"""
Jaspersoft JRXML 代码生成工具(对应你的实际项目)
这个工具演示:
1. 如何设计业务相关的 Tool
2. 如何处理复杂输入
3. 如何返回有意义的结果
这个工具的作用:
用户说"生成一个销售报表"
-> Tool 调用 LLM 生成 JRXML 代码
-> 返回生成的代码
"""
@property
def name(self) -> str:
return "jrxml_generator"
@property
def description(self) -> str:
return """
JasperReports JRXML 报表生成工具。
根据用户的自然语言需求,生成 JRXML 报表模板代码。
输入:
requirement: str - 用户的需求描述
context: str - 额外的上下文信息(如参考模板、数据源信息等)
输出:
生成的 JRXML 代码(字符串)
适用场景:
- 从零生成新报表
- 基于现有模板修改
- 生成特定格式的报表
注意:
生成的代码需要通过 jrxml_validator 验证后才能使用
"""
def execute(self, **kwargs) -> ToolResult:
"""
执行 JRXML 生成
参数处理说明:
- 使用 kwargs.get() 安全获取参数,避免 KeyError
- 提供默认值作为后备
- 类型检查确保参数正确
"""
requirement = kwargs.get("requirement", "")
context = kwargs.get("context", "")
if not requirement:
return ToolResult(
success=False,
error="需求描述不能为空"
)
# 实际场景中,这里会:
# 1. 构建 Prompt
# 2. 调用 LLM
# 3. 提取生成的 JRXML 代码
#
# 示例代码:
# prompt = build_generation_prompt(requirement, context)
# response = llm.invoke(prompt)
# jrxml = extract_jrxml(response)
# 模拟生成结果
mock_jrxml = f'''<?xml version="1.0" encoding="UTF-8"?>
<jasperReport xmlns="http://jasperreports.sourceforge.net/jasperreports"
name="GeneratedReport" pageWidth="595" pageHeight="842">
<queryString>
<![CDATA[SELECT * FROM sales WHERE date > '2024-01-01']]>
</queryString>
<field name="product_name" class="java.lang.String"/>
<field name="quantity" class="java.lang.Integer"/>
<field name="price" class="java.math.BigDecimal"/>
<band height="50">
<staticText>
<reportElement x="0" y="0" width="100" height="20"/>
<text>Product</text>
</staticText>
<textField>
<reportElement x="0" y="20" width="100" height="20"/>
<textFieldExpression>$F{{product_name}}</textFieldExpression>
</textField>
</band>
</jasperReport>'''
return ToolResult(
success=True,
result=mock_jrxml,
metadata={
"requirement": requirement,
"context_provided": bool(context),
"note": "这是模拟结果,实际需要调用 LLM"
}
)
# ═══════════════════════════════════════════════════════════════════════════════
# 第五部分:工具管理系统
# ═══════════════════════════════════════════════════════════════════════════════
class ToolRegistry:
"""
工具注册表
这个类负责:
1. 注册所有可用的 Tool
2. 根据名称查找 Tool
3. 列出所有可用 Tool(供 LLM 了解能力)
为什么需要这个?
因为一个 Agent 可能有很多 Tool
需要一个统一的地方来管理它们
设计模式:注册表模式(Registry Pattern
核心思想:用一个中心来管理所有实例
"""
def __init__(self):
"""初始化空的工具注册表"""
self._tools: Dict[str, BaseTool] = {}
def register(self, tool: BaseTool) -> None:
"""
注册一个工具
为什么用 tool.name 作为 key
因为 name 是工具的唯一标识符
同一个 name 的工具只能注册一次
实际应用:
registry = ToolRegistry()
registry.register(CalculatorTool())
registry.register(SearchTool())
"""
if tool.name in self._tools:
raise ValueError(f"工具 '{tool.name}' 已经注册过了")
self._tools[tool.name] = tool
def get(self, name: str) -> Optional[BaseTool]:
"""
根据名称获取工具
返回 Optional[BaseTool]
如果找到,返回工具实例
如果没找到,返回 None
"""
return self._tools.get(name)
def list_tools(self) -> list[dict]:
"""
列出所有已注册的工具
返回格式:
[
{"name": "calculator", "description": "..."},
{"name": "web_search", "description": "..."},
]
这个格式是为了方便给 LLM 展示可用工具列表
"""
return [tool.to_dict() for tool in self._tools.values()]
def execute(self, tool_name: str, **kwargs) -> ToolResult:
"""
执行指定名称的工具
这是对用户暴露的统一接口
用户不需要知道具体哪个工具,只要说"执行这个"
"""
tool = self.get(tool_name)
if tool is None:
return ToolResult(
success=False,
error=f"工具 '{tool_name}' 不存在"
)
return tool.execute(**kwargs)
# ═══════════════════════════════════════════════════════════════════════════════
# 第六部分:演示代码
# ═══════════════════════════════════════════════════════════════════════════════
def demo():
"""
演示如何使用工具系统
这个函数展示:
1. 创建工具注册表
2. 注册工具
3. 执行工具
4. 查看可用工具
"""
print("=" * 60)
print("Step 01: 理解 Tool - 工具系统演示")
print("=" * 60)
# 1. 创建工具注册表
registry = ToolRegistry()
# 2. 注册工具
registry.register(CalculatorTool())
registry.register(SearchTool())
registry.register(JaspersoftCodeGeneratorTool())
print("\n📋 已注册的工具列表:")
for tool_info in registry.list_tools():
print(f" - {tool_info['name']}: {tool_info['description'][:50]}...")
# 3. 执行计算器工具
print("\n🔧 执行计算器工具:")
result = registry.execute("calculator", expression="2 + 3 * 4")
if result.success:
print(f" 结果: {result.result}")
else:
print(f" 错误: {result.error}")
# 4. 执行搜索工具
print("\n🔍 执行搜索工具:")
result = registry.execute("web_search", query="JasperReports 教程", limit=2)
if result.success:
for item in result.result:
print(f" - {item['title']}")
else:
print(f" 错误: {result.error}")
# 5. 执行 JRXML 生成工具
print("\n📄 执行 JRXML 生成工具:")
result = registry.execute(
"jrxml_generator",
requirement="生成一个销售报表,显示月度汇总"
)
if result.success:
print(f" 生成成功! (长度: {len(result.result)} 字符)")
print(f" 前100字符: {result.result[:100]}...")
else:
print(f" 错误: {result.error}")
# 6. 演示错误处理
print("\n⚠️ 演示错误处理:")
result = registry.execute("calculator", expression="1 / 0")
if not result.success:
print(f" 预期错误: {result.error}")
if __name__ == "__main__":
demo()
+223
View File
@@ -0,0 +1,223 @@
"""
Step 01 练习题:设计你的第一个 Tool
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 练习目标:
1. 巩固 Tool 的基本结构
2. 设计一个业务相关的 Tool
3. 理解 Tool 注册系统
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 1:完善 TextProcessorTool
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
完善 TextProcessorTool,这个工具用于处理文本。
要求:
1. 实现 word_count 方法:统计单词数量
2. 实现 character_count 方法:统计字符数量(不包括空格)
3. 实现 sentence_count 方法:统计句子数量(按句号、问号、感叹号分割)
提示:
- text 可能包含多行
- 句子分割要考虑常见的句子结束符:. ! ?
- 单词分割可以考虑按空格分割
完成后测试:
text = "Hello, world! This is a test. How are you?"
预期结果:
- word_count: 9
- character_count: 38 (不包括空格)
- sentence_count: 3
"""
from step_01_tools.concept import BaseTool, ToolResult
class TextProcessorTool(BaseTool):
"""文本处理工具"""
@property
def name(self) -> str:
return "text_processor"
@property
def description(self) -> str:
return "文本处理工具,统计文本的各种特征"
def execute(self, **kwargs) -> ToolResult:
"""处理文本请求"""
text = kwargs.get("text", "")
operation = kwargs.get("operation", "word_count") # 默认操作
if not text:
return ToolResult(success=False, error="文本不能为空")
if operation == "word_count":
# TODO: 实现单词统计
result = self.word_count(text)
elif operation == "character_count":
# TODO: 实现字符统计
result = self.character_count(text)
elif operation == "sentence_count":
# TODO: 实现句子统计
result = self.sentence_count(text)
else:
return ToolResult(success=False, error=f"不支持的操作: {operation}")
return ToolResult(success=True, result=result)
def word_count(self, text: str) -> int:
"""统计单词数量"""
# 提示:按空格分割,过滤空字符串
# 你的代码:
pass
def character_count(self, text: str) -> int:
"""统计字符数量(不包括空格)"""
# 提示:去除所有空白字符后统计长度
# 你的代码:
pass
def sentence_count(self, text: str) -> int:
"""统计句子数量"""
# 提示:按 . ! ? 分割
# 你的代码:
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 2:设计一个 EmailTool
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
设计一个 EmailTool,用于处理邮件相关操作。
功能要求:
1. send_email: 发送邮件(收件人、主题、正文)
2. search_emails: 搜索邮件(关键词)
3. get_unread_count: 获取未读邮件数量
设计提示:
- 工具名称要简洁明了
- 描述要告诉 LLM 这个工具能做什么
- execute 方法要处理不同的 operation
这个练习的目的是让你学会:
- 如何设计多功能的 Tool
- 如何用 operation 参数区分不同功能
- 如何返回结构化的结果
"""
class EmailTool(BaseTool):
"""邮件处理工具"""
# TODO: 实现属性和方法
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 3:改进工具注册表
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
给 ToolRegistry 添加一个新功能:根据描述关键词搜索工具
方法签名:
def search_tools(self, keyword: str) -> list[dict]:
'''
根据关键词搜索工具
参数:
keyword: str - 搜索关键词
返回:
匹配的工具列表,格式同 list_tools()
匹配规则:
如果关键词出现在工具名称或描述中,就算匹配
匹配应该不区分大小写
'''
预期行为:
registry = ToolRegistry()
registry.register(CalculatorTool())
registry.register(SearchTool())
registry.register(TextProcessorTool()) # 用你刚完成的
results = registry.search_tools("text")
# 应该返回 text_processor
results = registry.search_tools("calculate")
# 应该返回 calculator
results = registry.search_tools("web")
# 应该返回 web_search
"""
# 从 step_01_tools.concept 导入已有的类
from step_01_tools.concept import ToolRegistry, CalculatorTool, SearchTool
def add_search_to_registry():
"""
在这里实现 search_tools 方法
提示:
1. 遍历所有已注册的工具
2. 检查 name 或 description 中是否包含 keyword
3. 收集匹配的工具
4. 返回列表
完成后,在 main.py 中测试
"""
# 你的代码:
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 运行测试
# ═══════════════════════════════════════════════════════════════════════════════
def test_exercises():
"""测试所有练习"""
print("\n" + "=" * 60)
print("测试练习答案")
print("=" * 60)
# 测试练习 1
print("\n📝 练习 1: TextProcessorTool")
tool = TextProcessorTool()
test_text = "Hello, world! This is a test. How are you?"
print(f"测试文本: {test_text}")
print(f"单词数量: {tool.word_count(test_text)}") # 应该是 9
print(f"字符数量: {tool.character_count(test_text)}") # 应该是 38
print(f"句子数量: {tool.sentence_count(test_text)}") # 应该是 3
# 测试练习 2
print("\n📝 练习 2: EmailTool")
# 运行看看你完成了多少
# 测试练习 3
print("\n📝 练习 3: search_tools")
registry = ToolRegistry()
registry.register(CalculatorTool())
registry.register(SearchTool())
registry.register(tool)
# 你的测试代码:
if __name__ == "__main__":
test_exercises()
+264
View File
@@ -0,0 +1,264 @@
"""
Step 01 练习题答案
⚠️ 先自己思考,再看答案!
⚠️ 答案不是唯一的,这里只是其中一种实现
"""
from step_01_tools.concept import BaseTool, ToolResult
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 1 答案:TextProcessorTool
# ═══════════════════════════════════════════════════════════════════════════════
class TextProcessorTool(BaseTool):
"""文本处理工具"""
@property
def name(self) -> str:
return "text_processor"
@property
def description(self) -> str:
return "文本处理工具,统计文本的各种特征"
def execute(self, **kwargs) -> ToolResult:
"""处理文本请求"""
text = kwargs.get("text", "")
operation = kwargs.get("operation", "word_count")
if not text:
return ToolResult(success=False, error="文本不能为空")
if operation == "word_count":
result = self.word_count(text)
elif operation == "character_count":
result = self.character_count(text)
elif operation == "sentence_count":
result = self.sentence_count(text)
else:
return ToolResult(success=False, error=f"不支持的操作: {operation}")
return ToolResult(success=True, result=result)
def word_count(self, text: str) -> int:
"""统计单词数量"""
# 按空格分割,过滤空字符串
words = [w for w in text.split() if w.strip()]
return len(words)
def character_count(self, text: str) -> int:
"""统计字符数量(不包括空格)"""
# 去除所有空白字符后统计
return len(text.replace(" ", "").replace("\n", "").replace("\t", ""))
def sentence_count(self, text: str) -> int:
"""统计句子数量"""
# 按常见句子结束符分割
import re
sentences = re.split(r'[.!?]+', text)
# 过滤空字符串
sentences = [s for s in sentences if s.strip()]
return len(sentences)
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 2 答案:EmailTool
# ═══════════════════════════════════════════════════════════════════════════════
class EmailTool(BaseTool):
"""邮件处理工具"""
@property
def name(self) -> str:
return "email"
@property
def description(self) -> str:
return """
邮件处理工具,用于发送和搜索邮件。
支持的操作:
send_email: 发送邮件
- to: 收件人邮箱
- subject: 邮件主题
- body: 邮件正文
返回: {"success": true, "message_id": "xxx"}
search_emails: 搜索邮件
- keyword: 搜索关键词
- limit: 返回数量(默认10
返回: [{"from": "...", "subject": "...", "date": "..."}, ...]
get_unread_count: 获取未读邮件数量
无需参数
返回: {"count": 5}
"""
def execute(self, **kwargs) -> ToolResult:
"""执行邮件操作"""
operation = kwargs.get("operation", "get_unread_count")
if operation == "send_email":
return self.send_email(
to=kwargs.get("to", ""),
subject=kwargs.get("subject", ""),
body=kwargs.get("body", "")
)
elif operation == "search_emails":
return self.search_emails(
keyword=kwargs.get("keyword", ""),
limit=kwargs.get("limit", 10)
)
elif operation == "get_unread_count":
return self.get_unread_count()
else:
return ToolResult(success=False, error=f"不支持的操作: {operation}")
def send_email(self, to: str, subject: str, body: str) -> ToolResult:
"""发送邮件"""
if not to:
return ToolResult(success=False, error="收件人不能为空")
if not subject:
return ToolResult(success=False, error="主题不能为空")
# 实际场景:调用邮件发送 API
# 这里用模拟数据
message_id = f"msg_{hash(to + subject) % 100000}"
return ToolResult(
success=True,
result={
"success": True,
"message_id": message_id,
"to": to,
"subject": subject
}
)
def search_emails(self, keyword: str, limit: int) -> ToolResult:
"""搜索邮件"""
if not keyword:
return ToolResult(success=False, error="搜索关键词不能为空")
# 模拟搜索结果
mock_results = [
{
"from": "boss@company.com",
"subject": f"关于项目的{keyword}",
"date": "2024-01-15",
"snippet": "..."
},
{
"from": "colleague@company.com",
"subject": f"回复: {keyword}的讨论",
"date": "2024-01-14",
"snippet": "..."
},
]
return ToolResult(
success=True,
result=mock_results[:limit]
)
def get_unread_count(self) -> ToolResult:
"""获取未读邮件数量"""
# 实际场景:调用邮件 API 获取未读数
return ToolResult(success=True, result={"count": 5})
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 3 答案:给 ToolRegistry 添加 search_tools
# ═══════════════════════════════════════════════════════════════════════════════
from step_01_tools.concept import ToolRegistry, CalculatorTool, SearchTool
def add_search_to_registry():
"""
演示如何给 ToolRegistry 添加 search_tools 方法
"""
# 方法1:在原类上添加(直接修改原类)
def search_tools_original(self, keyword: str) -> list[dict]:
"""根据关键词搜索工具"""
keyword_lower = keyword.lower()
results = []
for tool in self._tools.values():
# 检查名称或描述中是否包含关键词
name_match = keyword_lower in tool.name.lower()
desc_match = keyword_lower in tool.description.lower()
if name_match or desc_match:
results.append(tool.to_dict())
return results
# 给 ToolRegistry 添加方法
ToolRegistry.search_tools = search_tools_original
# 测试
registry = ToolRegistry()
registry.register(CalculatorTool())
registry.register(SearchTool())
registry.register(TextProcessorTool())
print("搜索 'text':")
for t in registry.search_tools("text"):
print(f" - {t['name']}")
print("搜索 'calculate':")
for t in registry.search_tools("calculate"):
print(f" - {t['name']}")
print("搜索 'web':")
for t in registry.search_tools("web"):
print(f" - {t['name']}")
# ═══════════════════════════════════════════════════════════════════════════════
# 测试运行
# ═══════════════════════════════════════════════════════════════════════════════
def test_answers():
"""测试答案"""
print("\n" + "=" * 60)
print("测试练习答案")
print("=" * 60)
# 测试练习 1
print("\n📝 练习 1: TextProcessorTool")
tool = TextProcessorTool()
test_text = "Hello, world! This is a test. How are you?"
print(f"测试文本: {test_text}")
print(f"单词数量: {tool.word_count(test_text)}") # 应该是 9
print(f"字符数量: {tool.character_count(test_text)}") # 应该是 38
print(f"句子数量: {tool.sentence_count(test_text)}") # 应该是 3
# 测试练习 2
print("\n📝 练习 2: EmailTool")
email_tool = EmailTool()
print("\n 发送邮件:")
result = email_tool.send_email("test@example.com", "测试", "这是一封测试邮件")
print(f" 结果: {result.result}")
print("\n 搜索邮件:")
result = email_tool.search_emails("项目", limit=5)
print(f" 结果: {result.result}")
print("\n 未读邮件数:")
result = email_tool.get_unread_count()
print(f" 结果: {result.result}")
# 测试练习 3
print("\n📝 练习 3: search_tools")
add_search_to_registry()
if __name__ == "__main__":
test_answers()
+169
View File
@@ -0,0 +1,169 @@
"""
Step 01: 工具系统基础 - 主程序
运行方式:
cd step_01_tools
python main.py
"""
from concept import (
ToolRegistry,
CalculatorTool,
SearchTool,
JaspersoftCodeGeneratorTool
)
def main():
"""演示工具系统的完整使用流程"""
print("=" * 70)
print(" Step 01: 理解 Tool - 工具系统基础")
print("=" * 70)
print()
# ═══════════════════════════════════════════════════════════════════════
# 场景:构建一个 Jaspersoft 报表助手的工具集
# ═══════════════════════════════════════════════════════════════════════
print("📦 场景:构建 Jaspersoft 报表助手")
print("-" * 70)
print("""
假设我们要构建一个 AI 助手,帮助用户:
1. 根据需求生成 JRXML 代码
2. 验证生成的代码是否正确
3. 计算报表的统计数据
4. 搜索相关的报表模板
我们可以设计以下工具:
- jrxml_generator: 生成 JRXML 代码
- jrxml_validator: 验证 JRXML 语法
- calculator: 计算统计数据
- web_search: 搜索报表模板
""")
# ═══════════════════════════════════════════════════════════════════════
# 步骤 1:创建工具注册表
# ═══════════════════════════════════════════════════════════════════════
print("\n📋 步骤 1: 创建工具注册表")
print("-" * 40)
registry = ToolRegistry()
print("✓ 创建了空的工具注册表")
print(f" 当前注册工具数量: {len(registry.list_tools())}")
# ═══════════════════════════════════════════════════════════════════════
# 步骤 2:注册工具
# ═══════════════════════════════════════════════════════════════════════
print("\n🔧 步骤 2: 注册工具")
print("-" * 40)
# 注册各种工具
registry.register(CalculatorTool())
print("✓ 注册计算器工具")
registry.register(SearchTool())
print("✓ 注册搜索工具")
registry.register(JaspersoftCodeGeneratorTool())
print("✓ 注册 JRXML 生成工具")
print(f"\n 当前注册工具数量: {len(registry.list_tools())}")
# ═══════════════════════════════════════════════════════════════════════
# 步骤 3:查看可用工具
# ═══════════════════════════════════════════════════════════════════════
print("\n📜 步骤 3: 查看可用工具")
print("-" * 40)
for tool_info in registry.list_tools():
print(f"\n [{tool_info['name']}]")
# 截取描述的前100个字符
desc = tool_info['description'].strip().replace('\n', ' ')[:100]
print(f" {desc}...")
# ═══════════════════════════════════════════════════════════════════════
# 步骤 4:执行工具
# ═══════════════════════════════════════════════════════════════════════
print("\n\n⚡ 步骤 4: 执行工具")
print("-" * 40)
# 场景:用户想要生成一个销售报表
user_requirement = "生成一个销售报表,显示月度汇总"
print(f"\n 用户需求: {user_requirement}")
# 4.1 先搜索相关的模板
print("\n 4.1 搜索相关模板...")
search_result = registry.execute("web_search", query="JasperReports 销售报表模板")
if search_result.success:
print(f" 找到 {len(search_result.result)} 个相关模板")
for i, item in enumerate(search_result.result[:2], 1):
print(f" {i}. {item['title']}")
# 4.2 生成 JRXML
print("\n 4.2 生成 JRXML 代码...")
generate_result = registry.execute(
"jrxml_generator",
requirement=user_requirement,
context="参考销售报表模板"
)
if generate_result.success:
print(f" ✓ 生成成功!")
print(f" 代码长度: {len(generate_result.result)} 字符")
print(f" 代码预览:")
print(" " + "-" * 30)
for line in generate_result.result.split('\n')[:5]:
print(f" {line}")
print(" ...")
# 4.3 计算统计
print("\n 4.3 计算统计...")
calc_result = registry.execute("calculator", expression="10 + 20 + 30")
if calc_result.success:
print(f" 10 + 20 + 30 = {calc_result.result}")
# ═══════════════════════════════════════════════════════════════════════
# 步骤 5:错误处理
# ═══════════════════════════════════════════════════════════════════════
print("\n\n⚠️ 步骤 5: 错误处理演示")
print("-" * 40)
# 场景:用户尝试除以零
print("\n 尝试执行: calculator(expression='10 / 0')")
error_result = registry.execute("calculator", expression="10 / 0")
if not error_result.success:
print(f" ✓ 错误被正确捕获: {error_result.error}")
# 尝试调用不存在的工具
print("\n 尝试执行: nonexistent_tool()")
error_result = registry.execute("nonexistent_tool")
if not error_result.success:
print(f" ✓ 错误被正确捕获: {error_result.error}")
# ═══════════════════════════════════════════════════════════════════════
# 总结
# ═══════════════════════════════════════════════════════════════════════
print("\n\n" + "=" * 70)
print(" ✅ Step 01 完成!")
print("=" * 70)
print("""
学到的关键概念:
1. Tool = name + description + execute
2. BaseTool 是所有工具的基类
3. ToolRegistry 统一管理所有工具
4. ToolResult 统一返回结果格式
下一步:
继续 Step 02,学习如何管理 Agent 的状态
""")
if __name__ == "__main__":
main()
+54
View File
@@ -0,0 +1,54 @@
# Step 02: 理解 State - 状态管理
## 🎯 学习目标
- 理解什么是 Agent State(代理状态)
- 理解为什么 Agent 需要状态
- 学会设计合理的状态结构
- 理解状态在多步骤任务中的作用
---
## 📖 概念讲解
### 什么是 State
State(状态)是 Agent 的"记忆"——它记录了:
1. **当前任务进展**:完成了多少,还剩多少
2. **历史数据**:用户说过什么,生成过什么
3. **中间结果**:每个步骤的输出是什么
4. **工具调用结果**:工具返回了什么
```
没有 State 的 Agent
用户: "生成报表"
Agent: 生成报表
用户: "把标题改成黑色"
Agent: ??? 我不记得你刚才生成的是什么报表
有 State 的 Agent
用户: "生成报表"
Agent: 生成报表,记录到 state
state = {current_jrxml: "..."}
用户: "把标题改成黑色"
Agent: 从 state 读取 current_jrxml
修改标题
更新 state = {current_jrxml: "新报表"}
```
### 为什么需要精心设计 State?
一个好的 State 设计应该:
1. **包含所有必要信息**:不遗漏关键数据
2. **避免信息冗余**:不要重复存储相同数据
3. **结构清晰**:易于读取和更新
4. **类型安全**:有类型提示,减少 bug
---
## 💻 代码实现
请打开 `concept.py` 查看详细代码注释。
+486
View File
@@ -0,0 +1,486 @@
"""
Step 02: 理解 State - 状态管理
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎓 本节内容:
1. 什么是 Agent State
2. 如何设计 State 结构?
3. State 如何在多步骤任务中传递?
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from typing import TypedDict, List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
# ═══════════════════════════════════════════════════════════════════════════════
# 第一部分:理解为什么需要 State
# ═══════════════════════════════════════════════════════════════════════════════
"""
在开始写代码之前,我们先理解 State 的本质。
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
场景:用户想生成一个报表,然后修改它
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
❌ 没有 State 的实现(错误):
def handle_request(user_input):
if "生成" in user_input:
jrxml = generate_jrxml(user_input)
return jrxml # 生成后就""
if "修改" in user_input:
# 糟糕!我不知道当前报表是什么
# 只能让用户重新描述
return "请重新描述你想要修改的报表"
✅ 有 State 的实现(正确):
class Agent:
def __init__(self):
self.state = {} # 用一个字典存储状态
def handle_request(self, user_input):
if "生成" in user_input:
jrxml = generate_jrxml(user_input)
self.state["current_jrxml"] = jrxml # 保存到状态
return jrxml
if "修改" in user_input:
current = self.state.get("current_jrxml") # 从状态读取
if not current:
return "没有可修改的报表"
modified = modify_jrxml(current, user_input)
self.state["current_jrxml"] = modified # 更新状态
return modified
这就是 State 的作用:在多次交互中保持信息!
"""
# ═══════════════════════════════════════════════════════════════════════════════
# 第二部分:设计 State 的数据结构
# ═══════════════════════════════════════════════════════════════════════════════
"""
设计 State 时,我们使用 Python 的 TypedDict
这是因为:
1. 有类型提示,IDE 能帮你检查错误
2. 有代码补全,写代码更方便
3. 文档化,其他人知道 State 里有什么
"""
class AgentState(TypedDict, total=False):
"""
Agent 的状态定义
为什么用 TypedDict 而不是 dataclass
因为 TypedDict 更直观,看起来就像一个字典
而且 LangGraph 直接支持 TypedDict
total=False 的含义:
所有字段都是可选的
这样初始化时可以只填需要的字段
每个字段的用途:
- user_input: 当前用户的输入
- current_jrxml: 当前正在编辑的报表代码
- conversation_history: 对话历史
- status: 当前状态(处理中/完成/错误)
- error_msg: 错误信息(如果有)
"""
# === 核心工作字段 ===
user_input: str # 用户当前输入
current_jrxml: str # 当前 JRXML 代码
status: str # 处理状态: "processing" / "success" / "error"
error_msg: str # 错误信息
# === 对话相关 ===
conversation_history: List[dict] # 对话历史 [{"role": "user", "content": "..."}]
full_conversation_history: List[dict] # 完整的对话历史(含时间戳)
# === 生成相关 ===
stage: str # 当前阶段: "initial" / "refine" / "mapping"
generated_jrxml: str # 生成的完整 JRXML
is_modified: bool # 是否有未保存的修改
# === 验证相关 ===
validation_result: dict # 验证结果
retry_count: int # 重试次数
# === 元信息 ===
session_id: str # 会话 ID
created_at: str # 创建时间
updated_at: str # 更新时间
# ═══════════════════════════════════════════════════════════════════════════════
# 第三部分:实际应用 - Jaspersoft 报表生成的完整状态
# ═══════════════════════════════════════════════════════════════════════════════
class JaspersoftAgentState(TypedDict, total=False):
"""
Jaspersoft 报表生成 Agent 的完整状态
这个状态设计对应了你实际项目中的需求
我们逐个解释每个字段的作用
"""
# ═══════════════════════════════════════════════════════════════════════
# 1. 基础信息
# ═══════════════════════════════════════════════════════════════════════
session_id: str # 会话唯一标识
session_name: str # 会话名称(用户友好)
created_at: str # 创建时间
updated_at: str # 最后更新时间
# ═══════════════════════════════════════════════════════════════════════
# 2. 用户输入
# ═══════════════════════════════════════════════════════════════════════
user_input: str # 用户当前的输入
uploaded_file_path: str # 上传的文件路径(如果有)
# ═══════════════════════════════════════════════════════════════════════
# 3. 对话历史
# ═══════════════════════════════════════════════════════════════════════
"""
对话历史的设计考虑:
为什么需要两种历史?
1. conversation_history:精简版,用于发送给 LLM(节省 token)
2. full_conversation_history:完整版,包含时间戳等元信息(用于审计)
为什么不直接保存所有消息?
因为 LLM 有上下文长度限制
当对话很长时,我们只能发送最近的几轮
所以需要"精简版""完整版"的区分
"""
conversation_history: List[dict] # 精简对话历史
full_conversation_history: List[dict] # 完整对话历史
compressed_history: str # 压缩后的早期对话
# ═══════════════════════════════════════════════════════════════════════
# 4. 报表相关
# ═══════════════════════════════════════════════════════════════════════
"""
报表相关的状态字段是最核心的部分
current_jrxml vs final_jrxml 的区别:
- current_jrxml:正在编辑的版本,可能还没验证通过
- final_jrxml:经过验证的最终版本,可以导出
为什么需要版本历史?
- 支持撤销操作
- 用户可能想回退到之前的某个版本
- 记录每次修改的轨迹
"""
current_jrxml: str # 当前正在编辑的 JRXML
final_jrxml: str # 最终确认的 JRXML(验证通过)
# 版本管理
jrxml_versions: List[dict] # 历史版本列表
history_states: List[dict] # 历史状态快照(用于撤销)
last_saved_version: int # 最后保存的版本号
# ═══════════════════════════════════════════════════════════════════════
# 5. 生成过程
# ═══════════════════════════════════════════════════════════════════════
"""
生成过程的中间状态
为什么要记录这些?
1. 用户可能想知道生成到哪一步了
2. 出错时可以定位问题在哪一步
3. 方便调试和优化
"""
stage: str # 当前阶段
"""
可能的阶段值:
- "initial_generation": 初始生成
- "layout_refine": 布局精调
- "field_mapping": 字段映射
- "validation": 验证
- "correction": 修正
"""
intent: str # 用户意图
"""
可能的意图值:
- "initial_generation": 生成新报表
- "modify_report": 修改现有报表
- "preview_report": 预览报表
- "consult_question": 咨询问题
"""
# 生成相关的中间结果
retrieved_context: str # RAG 检索到的上下文
layout_schema: dict # OCR 分析出的布局信息
ocr_extraction_result: dict # OCR 提取的字段
# ═══════════════════════════════════════════════════════════════════════
# 6. 验证和错误处理
# ═══════════════════════════════════════════════════════════════════════
"""
验证和错误处理是 Agent 可靠性的关键
retry_count 的设计:
- 每次生成失败后 +1
- 达到上限后停止重试
- 这样避免无限循环
"""
status: str # 状态:success / error / processing
error_msg: str # 错误信息
retry_count: int # 当前重试次数
max_retries: int # 最大重试次数
# 错误处理
pending_failure_context: dict # 待处理的失败上下文
"""
这个字段用于"失败恢复"
当重试耗尽时,我们保存失败信息
下次用户输入时,自动注入这个上下文
"""
# ═══════════════════════════════════════════════════════════════════════
# 7. 知识库相关
# ═══════════════════════════════════════════════════════════════════════
"""
知识库(KB)相关的状态
多租户设计:
- kb_id:当前会话绑定的知识库 ID
- 不同用户/项目可以使用不同的知识库
"""
kb_id: str # 当前知识库 ID
kb_fields: List[dict] # 知识库中的字段定义
kb_template_jrxml: str # 知识库中的模板 JRXML
# ═══════════════════════════════════════════════════════════════════════
# 8. 用户解释(让用户理解 Agent 在做什么)
# ═══════════════════════════════════════════════════════════════════════
"""
natural_explanation 是给用户看的解释
为什么需要这个?
- 用户不只是想知道结果,还想知道 Agent 是怎么想的
- 如果出错,用户想知道哪里出了问题
- 这增加了透明度和信任
"""
natural_explanation: str # 对用户的自然语言解释
# ═══════════════════════════════════════════════════════════════════════════════
# 第四部分:State 的操作工具函数
# ═══════════════════════════════════════════════════════════════════════════════
def create_initial_state(session_id: str) -> JaspersoftAgentState:
"""
创建初始状态
这是一个工厂函数,用于生成新的状态实例
所有必要的默认值在这里设置
为什么用工厂函数而不是直接初始化?
1. 确保所有必填字段有默认值
2. 统一初始化逻辑
3. 方便以后修改默认行为
"""
now = datetime.now().isoformat()
return JaspersoftAgentState(
# 基础信息
session_id=session_id,
session_name="新会话",
created_at=now,
updated_at=now,
# 对话历史
conversation_history=[],
full_conversation_history=[],
compressed_history="",
# 报表相关
current_jrxml="",
final_jrxml="",
jrxml_versions=[],
history_states=[],
last_saved_version=0,
# 生成过程
stage="initial",
intent="initial_generation",
retrieved_context="",
layout_schema={},
ocr_extraction_result={},
# 验证和错误
status="processing",
error_msg="",
retry_count=0,
max_retries=5,
# 知识库
kb_id="",
kb_fields=[],
kb_template_jrxml="",
# 解释
natural_explanation="",
)
def update_state(state: JaspersoftAgentState, **updates) -> JaspersoftAgentState:
"""
更新状态
这是一个辅助函数,用于安全地更新状态字段
为什么需要这个函数?
1. 自动更新时间戳
2. 类型检查(确保字段存在)
3. 记录更新历史(可选)
用法:
state = update_state(state, current_jrxml="new content", status="success")
"""
# 更新指定字段
for key, value in updates.items():
if key in state:
state[key] = value
else:
raise KeyError(f"State 没有字段: {key}")
# 自动更新时间戳
state["updated_at"] = datetime.now().isoformat()
return state
def save_state_snapshot(state: JaspersoftAgentState) -> dict:
"""
保存状态快照
这用于"撤销"功能
在执行重要操作前,保存当前状态的快照
如果操作失败,可以回滚到这个快照
返回的快照包含:
- 报表内容
- 对话历史
- 意图
- 用户请求
"""
return {
"current_jrxml": state.get("current_jrxml", ""),
"final_jrxml": state.get("final_jrxml", ""),
"status": state.get("status", ""),
"conversation_history": list(state.get("conversation_history", [])),
"user_input": state.get("user_input", ""),
"intent": state.get("intent", ""),
"timestamp": datetime.now().isoformat(),
}
def restore_state_snapshot(state: JaspersoftAgentState, snapshot: dict) -> JaspersoftAgentState:
"""
从快照恢复状态
用于"撤销"操作
从历史快照中恢复之前保存的状态
"""
state["current_jrxml"] = snapshot.get("current_jrxml", "")
state["final_jrxml"] = snapshot.get("final_jrxml", "")
state["status"] = snapshot.get("status", "")
state["conversation_history"] = snapshot.get("conversation_history", [])
state["updated_at"] = datetime.now().isoformat()
return state
# ═══════════════════════════════════════════════════════════════════════════════
# 第五部分:演示代码
# ═══════════════════════════════════════════════════════════════════════════════
def demo():
"""
演示 State 的使用
"""
print("=" * 60)
print("Step 02: 理解 State - 状态管理演示")
print("=" * 60)
# 1. 创建初始状态
print("\n📦 步骤 1: 创建初始状态")
state = create_initial_state("session_001")
print(f" 会话 ID: {state['session_id']}")
print(f" 创建时间: {state['created_at']}")
print(f" 当前状态: {state['status']}")
# 2. 更新状态
print("\n🔄 步骤 2: 更新状态")
state = update_state(
state,
user_input="生成一个销售报表",
current_jrxml='<?xml version="1.0"?><jasperReport/>',
status="success",
)
print(f" 用户输入: {state['user_input']}")
print(f" 生成状态: {state['status']}")
print(f" JRXML 长度: {len(state['current_jrxml'])} 字符")
# 3. 保存快照
print("\n📸 步骤 3: 保存状态快照")
snapshot = save_state_snapshot(state)
print(f" 快照时间: {snapshot['timestamp']}")
print(f" 快照内容: JRXML ({len(snapshot['current_jrxml'])} 字符)")
# 4. 修改状态(模拟用户修改)
print("\n✏️ 步骤 4: 修改报表(模拟)")
state["current_jrxml"] = '<?xml version="1.0"?><jasperReport modified="true"/>'
state["status"] = "modified"
print(f" 新状态: {state['status']}")
print(f" 新 JRXML: {state['current_jrxml']}")
# 5. 撤销(恢复到快照)
print("\n↩️ 步骤 5: 撤销操作")
state = restore_state_snapshot(state, snapshot)
print(f" 恢复状态: {state['status']}")
print(f" 恢复 JRXML: {state['current_jrxml']}")
# 6. 模拟完整的生成流程
print("\n🔄 步骤 6: 模拟完整生成流程")
state = create_initial_state("session_002")
# 阶段 1: 用户输入
state["user_input"] = "生成一个采购单报表"
state["intent"] = "initial_generation"
print(f" [阶段1] 用户输入: {state['user_input']}")
# 阶段 2: 生成
state["current_jrxml"] = "<?xml>...生成的 JRXML...</xml>"
state["stage"] = "initial_generation"
print(f" [阶段2] 生成完成,长度: {len(state['current_jrxml'])}")
# 阶段 3: 验证
state["status"] = "success"
state["final_jrxml"] = state["current_jrxml"]
print(f" [阶段3] 验证通过,状态: {state['status']}")
print("\n" + "=" * 60)
print("✅ State 管理演示完成")
print("=" * 60)
if __name__ == "__main__":
demo()
+230
View File
@@ -0,0 +1,230 @@
"""
Step 02 练习题:设计你的第一个 Agent State
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 练习目标:
1. 巩固 State 的基本结构
2. 设计一个业务相关的 State
3. 理解状态在多步骤任务中的作用
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from typing import TypedDict, List, Dict, Any
import json
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 1:完善一个简单的聊天机器人状态
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
设计一个客服聊天机器人的状态
要求:
1. 记录用户信息(用户名、ID)
2. 记录对话历史
3. 记录当前正在处理的问题
4. 记录问题解决状态
5. 记录用户满意度评分
提示:
- 使用 TypedDict 定义状态
- 考虑哪些字段是必须的,哪些是可选的
"""
class CustomerServiceState(TypedDict, total=False):
"""
客服聊天机器人的状态
请补全以下字段的定义:
"""
# 用户信息
user_id: str
user_name: str
# TODO: 添加更多字段...
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 2:设计一个数据分析 Agent 的状态
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
设计一个数据分析 Agent 的状态
功能:
1. 用户提出数据分析需求
2. Agent 连接数据源
3. Agent 执行查询
4. Agent 生成报告
请设计状态来支持这个流程,包括:
- 用户需求
- 数据源配置
- 查询结果
- 生成的报告
- 中间状态
"""
class DataAnalysisState(TypedDict, total=False):
"""
数据分析 Agent 的状态
请补全状态定义...
"""
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 3:实现状态快照和恢复
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
给以下状态实现快照和恢复功能
提示:
- 快照应该保存足够的信息来恢复状态
- 恢复后状态应该是完整的
- 考虑哪些字段需要保存,哪些不需要
"""
def create_snapshot(state: dict) -> dict:
"""
创建状态快照
应该保存:
- 关键业务数据
- 不包括临时计算的中间结果
返回格式:
{
"data": {...}, # 快照数据
"timestamp": "..." # 时间戳
}
"""
# TODO: 实现这个函数
pass
def restore_from_snapshot(state: dict, snapshot: dict) -> dict:
"""
从快照恢复状态
参数:
state: 当前状态(会被更新)
snapshot: 之前保存的快照
返回:
恢复后的状态
"""
# TODO: 实现这个函数
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 4:状态验证
# ═══════════════════════════════════════════════════════════════════════════════
"""
任务:
实现状态验证函数
验证规则:
1. 必填字段不能为空
2. 字段类型要正确
3. 某些字段有取值范围限制
返回:
{
"valid": True/False,
"errors": ["错误1", "错误2", ...]
}
"""
def validate_state(state: dict, rules: dict) -> dict:
"""
验证状态
参数:
state: 要验证的状态
rules: 验证规则,格式:
{
"field_name": {
"type": int/str/list/dict,
"required": True/False,
"min": 0, # 可选,数字最小值
"max": 100, # 可选,数字最大值
"choices": ["a", "b"] # 可选,枚举值
}
}
示例:
rules = {
"user_id": {"type": str, "required": True},
"age": {"type": int, "min": 0, "max": 150},
"status": {"type": str, "choices": ["active", "inactive"]}
}
"""
# TODO: 实现这个函数
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 测试
# ═══════════════════════════════════════════════════════════════════════════════
def test_exercises():
"""测试所有练习"""
print("\n" + "=" * 60)
print("测试练习答案")
print("=" * 60)
# 测试练习 3
print("\n📝 练习 3: 快照和恢复")
# 示例状态
sample_state = {
"user_id": "123",
"current_task": "数据分析",
"progress": 50,
"temp_data": ["计算中...", "处理中..."], # 临时数据
}
print(f"原始状态: {sample_state}")
# 创建快照
snapshot = create_snapshot(sample_state)
print(f"快照: {snapshot}")
# 修改状态
sample_state["progress"] = 100
print(f"修改后状态: {sample_state}")
# 恢复
restored = restore_from_snapshot(sample_state, snapshot)
print(f"恢复后状态: {restored}")
# 测试练习 4
print("\n📝 练习 4: 状态验证")
rules = {
"user_id": {"type": str, "required": True},
"age": {"type": int, "min": 0, "max": 150},
"status": {"type": str, "choices": ["active", "inactive"]}
}
# 有效状态
valid_state = {"user_id": "123", "age": 25, "status": "active"}
print(f"验证有效状态: {validate_state(valid_state, rules)}")
# 无效状态
invalid_state = {"user_id": "123", "age": 200, "status": "unknown"}
print(f"验证无效状态: {validate_state(invalid_state, rules)}")
if __name__ == "__main__":
test_exercises()
+274
View File
@@ -0,0 +1,274 @@
"""
Step 02 练习题答案
⚠️ 先自己思考,再看答案!
⚠️ 答案不是唯一的,这里只是其中一种实现
"""
from typing import TypedDict, List, Dict, Any
from datetime import datetime
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 1 答案:客服聊天机器人状态
# ═══════════════════════════════════════════════════════════════════════════════
class CustomerServiceState(TypedDict, total=False):
"""客服聊天机器人的状态"""
# === 用户信息 ===
user_id: str # 用户 ID
user_name: str # 用户名
user_email: str # 用户邮箱(可选)
# === 对话历史 ===
conversation_history: List[dict] # 对话历史
"""
格式: [{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}]
"""
# === 问题处理 ===
current_issue: str # 当前正在处理的问题描述
issue_status: str # 问题状态: "open" / "investigating" / "resolved" / "closed"
issue_priority: str # 优先级: "low" / "medium" / "high" / "urgent"
# === 解决方案 ===
proposed_solution: str # 提出的解决方案
solution_steps: List[str] # 解决步骤列表
is_resolved: bool # 是否已解决
# === 用户反馈 ===
satisfaction_rating: int # 满意度评分 1-5
feedback_comment: str # 反馈意见
# === 元信息 ===
session_start: str # 会话开始时间
last_interaction: str # 最后互动时间
agent_id: str # 处理此会话的客服 ID
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 2 答案:数据分析 Agent 状态
# ═══════════════════════════════════════════════════════════════════════════════
class DataAnalysisState(TypedDict, total=False):
"""数据分析 Agent 的状态"""
# === 用户需求 ===
user_request: str # 用户的分析需求
session_id: str # 会话 ID
# === 数据源配置 ===
data_source_type: str # 数据源类型: "database" / "file" / "api"
data_source_config: dict # 数据源配置(连接信息等)
"""
示例:
{
"host": "localhost",
"database": "sales_db",
"table": "orders"
}
"""
# === 查询和结果 ===
query: str # 执行的 SQL 或查询条件
query_result: Any # 查询结果
result_row_count: int # 结果行数
result_columns: List[str] # 结果列名
# === 分析过程 ===
stage: str # 当前阶段
"""
阶段值:
- "initial": 初始状态
- "connecting": 连接数据源
- "querying": 执行查询
- "analyzing": 分析数据
- "generating_report": 生成报告
- "completed": 完成
- "error": 出错
"""
# === 生成的报告 ===
generated_report: str # 生成的报告内容
report_format: str # 报告格式: "json" / "csv" / "markdown" / "html"
# === 错误处理 ===
error_message: str # 错误信息(如果有)
retry_count: int # 重试次数
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 3 答案:快照和恢复
# ═══════════════════════════════════════════════════════════════════════════════
def create_snapshot(state: dict) -> dict:
"""
创建状态快照
策略:只保存"业务关键"数据,不保存临时计算结果
"""
# 定义需要保存的字段(业务关键数据)
business_fields = [
"user_id",
"user_name",
"current_task",
"progress",
"status",
# 根据实际情况添加更多...
]
snapshot_data = {}
for field in business_fields:
if field in state:
snapshot_data[field] = state[field]
return {
"data": snapshot_data,
"timestamp": datetime.now().isoformat(),
"version": "1.0"
}
def restore_from_snapshot(state: dict, snapshot: dict) -> dict:
"""
从快照恢复状态
"""
if not snapshot or "data" not in snapshot:
return state
# 从快照恢复数据
snapshot_data = snapshot["data"]
for key, value in snapshot_data.items():
state[key] = value
# 更新恢复后的时间戳
state["_restored_at"] = datetime.now().isoformat()
state["_restored_from"] = snapshot.get("timestamp", "unknown")
return state
# ═══════════════════════════════════════════════════════════════════════════════
# 练习 4 答案:状态验证
# ═══════════════════════════════════════════════════════════════════════════════
def validate_state(state: dict, rules: dict) -> dict:
"""
验证状态
"""
errors = []
for field_name, field_rules in rules.items():
value = state.get(field_name)
field_type = field_rules.get("type")
required = field_rules.get("required", False)
# 1. 检查必填
if required and (value is None or value == ""):
errors.append(f"字段 '{field_name}' 是必填的")
continue
# 如果字段为空且不是必填,跳过后续检查
if value is None or value == "":
continue
# 2. 检查类型
if field_type and not isinstance(value, field_type):
errors.append(
f"字段 '{field_name}' 类型错误: "
f"期望 {field_type.__name__}, 实际 {type(value).__name__}"
)
continue
# 3. 检查数值范围
if isinstance(value, (int, float)):
if "min" in field_rules and value < field_rules["min"]:
errors.append(
f"字段 '{field_name}' 小于最小值: "
f"{value} < {field_rules['min']}"
)
if "max" in field_rules and value > field_rules["max"]:
errors.append(
f"字段 '{field_name}' 大于最大值: "
f"{value} > {field_rules['max']}"
)
# 4. 检查枚举值
if "choices" in field_rules:
if value not in field_rules["choices"]:
errors.append(
f"字段 '{field_name}' 值不在允许范围内: "
f"{value} not in {field_rules['choices']}"
)
return {
"valid": len(errors) == 0,
"errors": errors
}
# ═══════════════════════════════════════════════════════════════════════════════
# 测试
# ═══════════════════════════════════════════════════════════════════════════════
def test_answers():
"""测试答案"""
print("\n" + "=" * 60)
print("测试练习答案")
print("=" * 60)
# 测试练习 1
print("\n📝 练习 1: 客服状态")
cs_state: CustomerServiceState = {
"user_id": "user_001",
"user_name": "张三",
"issue_status": "open",
"issue_priority": "high",
}
print(f" 状态: {cs_state}")
# 测试练习 2
print("\n📝 练习 2: 数据分析状态")
da_state: DataAnalysisState = {
"user_request": "分析本月销售数据",
"data_source_type": "database",
"stage": "querying",
}
print(f" 状态: {da_state}")
# 测试练习 3
print("\n📝 练习 3: 快照和恢复")
sample_state = {
"user_id": "123",
"current_task": "数据分析",
"progress": 50,
"temp_data": ["计算中...", "处理中..."],
}
print(f" 原始状态: {sample_state}")
snapshot = create_snapshot(sample_state)
print(f" 快照: {snapshot}")
sample_state["progress"] = 100
restored = restore_from_snapshot(sample_state, snapshot)
print(f" 恢复后状态: {restored}")
# 测试练习 4
print("\n📝 练习 4: 状态验证")
rules = {
"user_id": {"type": str, "required": True},
"age": {"type": int, "min": 0, "max": 150},
"status": {"type": str, "choices": ["active", "inactive"]}
}
valid_state = {"user_id": "123", "age": 25, "status": "active"}
print(f" 有效状态: {validate_state(valid_state, rules)}")
invalid_state = {"user_id": "123", "age": 200, "status": "unknown"}
print(f" 无效状态: {validate_state(invalid_state, rules)}")
if __name__ == "__main__":
test_answers()
+196
View File
@@ -0,0 +1,196 @@
"""
Step 02: State 状态管理 - 主程序
运行方式:
cd step_02_state
python main.py
"""
from concept import (
JaspersoftAgentState,
create_initial_state,
update_state,
save_state_snapshot,
restore_state_snapshot,
)
def main():
"""演示状态管理的完整使用流程"""
print("=" * 70)
print(" Step 02: 理解 State - 状态管理")
print("=" * 70)
print()
# ═══════════════════════════════════════════════════════════════════════════════
# 场景:用户生成并修改报表的完整流程
# ═══════════════════════════════════════════════════════════════════════════════
print("📦 场景:用户生成并修改报表的完整流程")
print("-" * 70)
print("""
假设用户执行以下操作:
1. 描述需求:生成一个销售报表
2. Agent 生成报表
3. Agent 验证通过
4. 用户要求修改标题
5. 用户要求撤销修改
我们需要状态来追踪这个完整的流程。
""")
# ═══════════════════════════════════════════════════════════════════════════════
# 步骤 1:创建初始状态
# ═══════════════════════════════════════════════════════════════════════════════
print("\n📋 步骤 1: 创建初始状态")
print("-" * 40)
state = create_initial_state("session_001")
print(f" 会话 ID: {state['session_id']}")
print(f" 创建时间: {state['created_at']}")
print(f" 初始状态: {state['status']}")
# ═══════════════════════════════════════════════════════════════════════════════
# 步骤 2:用户描述需求
# ═══════════════════════════════════════════════════════════════════════════════
print("\n📝 步骤 2: 用户描述需求")
print("-" * 40)
state["user_input"] = "生成一个销售报表,显示月度汇总"
state["intent"] = "initial_generation"
state["stage"] = "initial"
print(f" 用户输入: {state['user_input']}")
print(f" 意图: {state['intent']}")
print(f" 阶段: {state['stage']}")
# ═══════════════════════════════════════════════════════════════════════════════
# 步骤 3:生成报表(模拟)
# ═══════════════════════════════════════════════════════════════════════════════
print("\n🔧 步骤 3: Agent 生成报表")
print("-" * 40)
# 模拟生成过程
state["stage"] = "generation"
generated_jrxml = '''<?xml version="1.0" encoding="UTF-8"?>
<jasperReport name="SalesReport">
<title>月度销售汇总报表</title>
<queryString>SELECT product, SUM(amount) FROM sales GROUP BY product</queryString>
<field name="product" class="java.lang.String"/>
<field name="amount" class="java.math.BigDecimal"/>
<band height="100">
<staticText><text>产品</text></staticText>
<textField><textFieldExpression>$F{product}</textFieldExpression></textField>
</band>
</jasperReport>'''
state["current_jrxml"] = generated_jrxml
print(f" 生成完成!")
print(f" JRXML 长度: {len(generated_jrxml)} 字符")
print(f" 当前阶段: {state['stage']}")
# ═══════════════════════════════════════════════════════════════════════════════
# 步骤 4:验证通过
# ═══════════════════════════════════════════════════════════════════════════════
print("\n✅ 步骤 4: 验证通过")
print("-" * 40)
# 保存状态快照(修改前的备份)
state_snapshot = save_state_snapshot(state)
print(f" ✓ 保存状态快照")
print(f" 快照时间: {state_snapshot['timestamp']}")
print(f" 包含内容: current_jrxml, conversation_history 等")
# 验证通过,更新状态
state["status"] = "success"
state["final_jrxml"] = state["current_jrxml"]
state["stage"] = "completed"
print(f" 验证状态: {state['status']}")
print(f" 最终版本: ✓ 已保存")
# ═══════════════════════════════════════════════════════════════════════════════
# 步骤 5:用户要求修改标题
# ═══════════════════════════════════════════════════════════════════════════════
print("\n✏️ 步骤 5: 用户要求修改标题")
print("-" * 40)
# 保存修改前的快照
pre_modify_snapshot = save_state_snapshot(state)
print(f" ✓ 修改前保存快照")
# 执行修改
new_jrxml = state["current_jrxml"].replace("月度销售汇总报表", "2024年销售汇总报表")
state["current_jrxml"] = new_jrxml
state["intent"] = "modify_report"
state["stage"] = "modification"
print(f" 修改内容: 标题从'月度销售汇总报表'改为'2024年销售汇总报表'")
print(f" 当前意图: {state['intent']}")
# ═══════════════════════════════════════════════════════════════════════════════
# 步骤 6:用户撤销修改
# ═══════════════════════════════════════════════════════════════════════════════
print("\n↩️ 步骤 6: 用户撤销修改")
print("-" * 40)
# 恢复到修改前的状态
state = restore_state_snapshot(state, pre_modify_snapshot)
print(f" ✓ 撤销成功!")
print(f" 恢复标题: {state['current_jrxml'][:50]}...")
print(f" 当前意图: {state['intent']}")
# ═══════════════════════════════════════════════════════════════════════════════
# 展示完整状态
# ═══════════════════════════════════════════════════════════════════════════════
print("\n\n" + "=" * 70)
print("📊 完整状态一览")
print("=" * 70)
key_fields = [
("session_id", "会话ID"),
("status", "状态"),
("intent", "意图"),
("stage", "阶段"),
("user_input", "用户输入"),
("current_jrxml", "当前JRXML"),
("final_jrxml", "最终JRXML"),
("created_at", "创建时间"),
("updated_at", "更新时间"),
]
for field, desc in key_fields:
value = state.get(field, "")
if field in ["current_jrxml", "final_jrxml"] and value:
value = f"{value[:50]}..." if len(value) > 50 else value
print(f" {desc}: {value}")
# ═══════════════════════════════════════════════════════════════════════════════
# 总结
# ═══════════════════════════════════════════════════════════════════════════════
print("\n\n" + "=" * 70)
print(" ✅ Step 02 完成!")
print("=" * 70)
print("""
学到的关键概念:
1. State 是 Agent 的"记忆",在多步骤任务中保持信息
2. 使用 TypedDict 定义状态,有类型提示更安全
3. 状态快照用于"撤销"功能
4. 不同字段用于不同目的:业务数据、对话历史、元信息
下一步:
继续 Step 03,学习如何把 Tool + State 组合成简单的 Agent
""")
if __name__ == "__main__":
main()
+60
View File
@@ -0,0 +1,60 @@
# Step 03: 构建简单 Agent
## 🎯 学习目标
- 理解什么是 Agent(代理)
- 理解 Agent 的核心循环:思考 → 行动 → 观察
- 学会构建一个完整的 Agent
- 理解 LangGraph 的基本用法
---
## 📖 概念讲解
### 什么是 Agent
**Agent = LLM + Tool + Loop**
```
Agent 工作流程:
┌─────────────────────────────────────┐
│ │
│ ┌───────────┐ │
│ │ 思考 │ ◀────────────────┼── LLM 决定
│ └─────┬─────┘ │ 要执行什么
│ │ │
│ ▼ │
│ ┌───────────┐ ┌─────────┐ │
│ │ 行动 │───▶│ 执行工具 │ │
│ └───────────┘ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ 观察结果 │ │
│ └────┬────┘ │
│ │ │
└──────────────────────────┼────────┘
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
继续思考 结束 结束
```
### Agent 的核心循环
```
while True:
1. Think(思考):LLM 分析当前状态,决定下一步行动
2. Act(行动):执行工具或返回结果
3. Observe(观察):获取执行结果,更新状态
如果决定结束 → break
```
---
## 💻 代码实现
请打开 `concept.py` 查看详细代码注释。
+603
View File
@@ -0,0 +1,603 @@
"""
Step 03: 构建简单 Agent
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎓 本节内容:
1. 什么是 Agent?(LLM + Tool + Loop
2. Agent 的核心循环:Think → Act → Observe
3. 如何构建一个完整的 Agent
4. LangGraph 的基本用法
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from typing import TypedDict, List, Dict, Any, Literal, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import json
# ═══════════════════════════════════════════════════════════════════════════════
# 第一部分:理解 Agent 的本质
# ═══════════════════════════════════════════════════════════════════════════════
"""
Agent 的核心公式:
Agent = LLM + Tool + Loop
为什么这个公式很重要?
1. LLM:负责"思考"——理解用户意图,决定下一步行动
2. Tool:负责"行动"——执行具体的操作
3. Loop:负责"循环"——不断执行直到任务完成
没有 LLM,Agent 只能机械地执行预设命令
没有 Tool,LLM 只能思考不能行动
没有 Loop,Agent 只能执行一步
只有三者结合,才是真正的"智能代理"
"""
# ═══════════════════════════════════════════════════════════════════════════════
# 第二部分:定义 Agent State
# ═══════════════════════════════════════════════════════════════════════════════
class SimpleAgentState(TypedDict, total=False):
"""
简单 Agent 的状态
相比 Step 02 的状态,这个状态更精简,适合教学
"""
# 用户输入
user_input: str
# 对话历史
messages: List[dict]
"""
格式:
[
{"role": "user", "content": "用户说了什么"},
{"role": "assistant", "content": "助手说了什么"},
{"role": "system", "content": "系统做了什么"},
]
"""
# 当前行动
current_action: str
"""
当前正在执行的动作:
- "thinking": LLM 正在思考
- "using_tool": 正在使用工具
- "finished": 完成
"""
# 工具调用相关
tool_calls: List[dict] # 记录所有工具调用
last_tool_result: Any # 最后一个工具的执行结果
tool_result: Any # 当前工具执行结果
# 生成结果
final_output: str # 最终输出
generation: str # 当前正在生成的文本
# 状态
status: str
"""
状态值:
- "input": 等待输入
- "thinking": 思考中
- "acting": 执行中
- "finished": 完成
- "error": 错误
"""
error: str
# ═══════════════════════════════════════════════════════════════════════════════
# 第三部分:定义 Tool 接口
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class ToolCall:
"""工具调用的数据结构"""
name: str # 工具名称
arguments: dict # 传递给工具的参数
result: Any = None # 工具执行结果
error: str = None # 错误信息
class BaseTool(ABC):
"""
工具基类(简化版,来自 Step 01)
"""
@property
@abstractmethod
def name(self) -> str:
"""工具名称"""
pass
@property
@abstractmethod
def description(self) -> str:
"""工具描述"""
pass
@abstractmethod
def execute(self, **kwargs) -> Any:
"""执行工具"""
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 第四部分:实现具体的 Tool
# ═══════════════════════════════════════════════════════════════════════════════
class CalculatorTool(BaseTool):
"""计算器工具"""
@property
def name(self) -> str:
return "calculator"
@property
def description(self) -> str:
return "执行数学计算。输入一个数学表达式,返回计算结果。"
def execute(self, expression: str) -> str:
"""执行计算"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return str(result)
except Exception as e:
return f"计算错误: {e}"
class JRXMLGeneratorTool(BaseTool):
"""JRXML 生成工具"""
@property
def name(self) -> str:
return "jrxml_generator"
@property
def description(self) -> str:
return "生成 JasperReports JRXML 报表模板。根据用户需求生成 XML 代码。"
def execute(self, requirement: str, context: str = "") -> str:
"""
执行 JRXML 生成
实际应用中,这里会调用 LLM
这里用模拟数据演示
"""
# 模拟生成
template = f'''<?xml version="1.0" encoding="UTF-8"?>
<jasperReport xmlns="http://jasperreports.sourceforge.net/jasperreports"
name="{requirement}" pageWidth="595" pageHeight="842">
<title>{requirement}</title>
<queryString>
<![CDATA[SELECT * FROM data WHERE 1=1]]>
</queryString>
<field name="id" class="java.lang.Integer"/>
<field name="name" class="java.lang.String"/>
<band height="50">
<staticText>
<reportElement x="0" y="0" width="100" height="20"/>
<text>名称</text>
</staticText>
<textField>
<reportElement x="0" y="20" width="100" height="20"/>
<textFieldExpression>$F{{name}}</textFieldExpression>
</textField>
</band>
</jasperReport>'''
return template
class TemplateSearchTool(BaseTool):
"""模板搜索工具"""
@property
def name(self) -> str:
return "template_search"
@property
def description(self) -> str:
return "搜索 JRXML 报表模板。在知识库中搜索相关的报表模板作为参考。"
def execute(self, keyword: str, limit: int = 3) -> str:
"""搜索模板"""
# 模拟搜索结果
results = [
f"模板1: 关于 '{keyword}' 的基础报表模板",
f"模板2: '{keyword}' 的高级报表模板",
f"模板3: '{keyword}' 的数据透视表模板",
]
return "\n".join(results[:limit])
class JRXMLValidatorTool(BaseTool):
"""JRXML 验证工具"""
@property
def name(self) -> str:
return "jrxml_validator"
@property
def description(self) -> str:
return "验证 JRXML 报表模板的正确性。检查 XML 语法和 JasperReports 规范。"
def execute(self, jrxml: str) -> dict:
"""验证 JRXML"""
# 模拟验证
issues = []
# 检查基本结构
if "<?xml" not in jrxml:
issues.append("缺少 XML 声明")
if "<jasperReport" not in jrxml:
issues.append("缺少 <jasperReport> 根元素")
if "<field" not in jrxml and "<band" not in jrxml:
issues.append("报表中没有定义字段或波段")
return {
"valid": len(issues) == 0,
"issues": issues
}
# ═══════════════════════════════════════════════════════════════════════════════
# 第五部分:构建 Agent 的大脑(LLM 决策)
# ═══════════════════════════════════════════════════════════════════════════════
class AgentBrain:
"""
Agent 的大脑——负责决定下一步做什么
在真实系统中,这里会调用 LLM
这里用规则引擎模拟 LLM 的决策过程
决策逻辑:
1. 如果用户要求生成报表 → 调用 jrxml_generator
2. 如果用户要求搜索模板 → 调用 template_search
3. 如果用户要求验证 → 调用 jrxml_validator
4. 如果用户只是聊天 → 直接回答
5. 如果工具执行完成 → 返回结果或继续调用
"""
def __init__(self):
# 工具描述(用于告诉 LLM 有哪些工具可用)
self.tools = {
"calculator": "计算器,执行数学表达式,如 2+3*4",
"jrxml_generator": "生成 JasperReports JRXML 报表模板",
"template_search": "搜索报表模板",
"jrxml_validator": "验证 JRXML 报表的正确性",
}
def decide(self, state: SimpleAgentState) -> dict:
"""
决定下一步行动
返回:
{
"action": "use_tool" | "respond" | "finish",
"tool_name": str, # 如果 action == "use_tool"
"tool_args": dict, # 如果 action == "use_tool"
"response": str, # 如果 action == "respond"
}
"""
user_input = state.get("user_input", "").lower()
messages = state.get("messages", [])
tool_result = state.get("tool_result")
# 如果有工具执行结果
if tool_result is not None:
return {
"action": "respond",
"response": f"工具执行完成,结果:{tool_result}"
}
# 根据用户输入决定行动
if "生成" in user_input or "报表" in user_input:
return {
"action": "use_tool",
"tool_name": "jrxml_generator",
"tool_args": {
"requirement": state.get("user_input", ""),
"context": ""
}
}
elif "搜索" in user_input or "模板" in user_input:
return {
"action": "use_tool",
"tool_name": "template_search",
"tool_args": {
"keyword": user_input.replace("搜索", "").replace("模板", "").strip(),
"limit": 3
}
}
elif "验证" in user_input and "<" in user_input:
return {
"action": "use_tool",
"tool_name": "jrxml_validator",
"tool_args": {
"jrxml": user_input
}
}
elif any(op in user_input for op in ["计算", "+", "-", "*", "/"]):
# 提取表达式
import re
expr = re.search(r'[\d\+\-\*\/\(\)\.]+', user_input)
if expr:
return {
"action": "use_tool",
"tool_name": "calculator",
"tool_args": {
"expression": expr.group()
}
}
else:
# 默认:直接回答
return {
"action": "respond",
"response": f"我理解你的需求:{state.get('user_input', '')}。请问具体想做什么?"
}
# ═══════════════════════════════════════════════════════════════════════════════
# 第六部分:构建 Agent
# ═══════════════════════════════════════════════════════════════════════════════
class SimpleAgent:
"""
简单 Agent 实现
这个 Agent 的工作流程:
1. 接收用户输入
2. LLM(大脑)决定下一步行动
3. 执行行动(使用工具或直接回答)
4. 返回结果
5. 循环直到用户结束
"""
def __init__(self):
# 初始化工具
self.tools = {
"calculator": CalculatorTool(),
"jrxml_generator": JRXMLGeneratorTool(),
"template_search": TemplateSearchTool(),
"jrxml_validator": JRXMLValidatorTool(),
}
# 初始化大脑
self.brain = AgentBrain()
# 初始化状态
self.state = SimpleAgentState(
messages=[],
tool_calls=[],
status="input"
)
def reset(self):
"""重置 Agent 状态"""
self.state = SimpleAgentState(
messages=[],
tool_calls=[],
status="input"
)
def get_available_tools(self) -> list[dict]:
"""获取可用工具列表"""
return [
{"name": tool.name, "description": tool.description}
for tool in self.tools.values()
]
def execute_tool(self, tool_name: str, **kwargs) -> Any:
"""
执行工具
封装了工具执行的逻辑,包括:
- 查找工具
- 执行工具
- 记录调用
- 处理错误
"""
if tool_name not in self.tools:
return {"error": f"工具 '{tool_name}' 不存在"}
tool = self.tools[tool_name]
try:
result = tool.execute(**kwargs)
# 记录工具调用
self.state["tool_calls"].append({
"tool": tool_name,
"args": kwargs,
"result": result
})
return result
except Exception as e:
error_result = {"error": str(e)}
self.state["tool_calls"].append({
"tool": tool_name,
"args": kwargs,
"error": str(e)
})
return error_result
def process(self, user_input: str) -> str:
"""
处理用户输入
这是 Agent 的主入口
每次用户输入,调用这个方法来处理
"""
# 1. 保存用户输入
self.state["user_input"] = user_input
self.state["messages"].append({
"role": "user",
"content": user_input
})
self.state["status"] = "thinking"
# 2. 大脑决定行动
decision = self.brain.decide(self.state)
# 3. 执行行动
if decision["action"] == "use_tool":
# 使用工具
self.state["status"] = "acting"
tool_name = decision["tool_name"]
tool_args = decision["tool_args"]
result = self.execute_tool(tool_name, **tool_args)
# 保存结果到状态
self.state["tool_result"] = result
self.state["current_action"] = f"使用了工具: {tool_name}"
# 再次调用大脑,决定下一步
decision = self.brain.decide(self.state)
if decision["action"] == "respond":
response = decision["response"]
self.state["messages"].append({
"role": "assistant",
"content": response
})
self.state["final_output"] = response
self.state["status"] = "finished"
return response
elif decision["action"] == "finish":
self.state["status"] = "finished"
return self.state.get("final_output", "任务完成")
# 默认返回当前状态
return self.state.get("current_action", "处理中...")
def get_history(self) -> list[dict]:
"""获取对话历史"""
return self.state.get("messages", [])
# ═══════════════════════════════════════════════════════════════════════════════
# 第七部分:使用 LangGraph 重构(可选进阶内容)
# ═══════════════════════════════════════════════════════════════════════════════
"""
LangGraph 是 LangChain 提供的图结构框架
核心概念:
1. StateGraph:状态图
2. Node:节点(执行函数)
3. Edge:边(状态转换)
4. Conditional Edge:条件边(根据状态决定下一步)
为什么使用 LangGraph
1. 代码结构更清晰
2. 内置状态管理
3. 支持复杂的条件分支
4. 内置循环和回退
如果你想学习 LangGraph
请参考 jaspersoft 项目中的实际实现:
- agent/graph.py:状态图定义
- agent/nodes.py:节点实现
- agent/state.py:状态定义
这里的 SimpleAgent 展示的是核心原理
LangGraph 只是实现方式之一
"""
# ═══════════════════════════════════════════════════════════════════════════════
# 第八部分:演示代码
# ═══════════════════════════════════════════════════════════════════════════════
def demo():
"""演示简单 Agent 的使用"""
print("=" * 60)
print("Step 03: 构建简单 Agent - 演示")
print("=" * 60)
# 创建 Agent
agent = SimpleAgent()
print("\n📋 可用工具:")
for tool in agent.get_available_tools():
print(f" - {tool['name']}: {tool['description']}")
# 场景 1:生成报表
print("\n\n🔄 场景 1: 生成报表")
print("-" * 40)
print("用户: 生成一个销售报表")
response = agent.process("生成一个销售报表")
print(f"Agent: {response}")
# 场景 2:搜索模板
print("\n\n🔄 场景 2: 搜索模板")
print("-" * 40)
print("用户: 搜索采购相关的模板")
response = agent.process("搜索采购相关的模板")
print(f"Agent: {response}")
# 场景 3:计算
print("\n\n🔄 场景 3: 数学计算")
print("-" * 40)
print("用户: 计算 100 + 200 * 3")
response = agent.process("计算 100 + 200 * 3")
print(f"Agent: {response}")
# 场景 4:多轮对话
print("\n\n🔄 场景 4: 多轮对话")
print("-" * 40)
print("用户: 我想生成一个报表")
response1 = agent.process("我想生成一个报表")
print(f"Agent: {response1}")
print("用户: 显示月度汇总数据")
response2 = agent.process("显示月度汇总数据")
print(f"Agent: {response2}")
# 展示对话历史
print("\n\n📜 对话历史:")
for msg in agent.get_history():
print(f" [{msg['role']}]: {msg['content'][:50]}...")
# 展示工具调用记录
print("\n\n🔧 工具调用记录:")
for call in agent.state.get("tool_calls", []):
if "error" in call:
print(f" - {call['tool']}: ❌ {call['error']}")
else:
result = str(call.get("result", ""))[:30]
print(f" - {call['tool']}: ✓ {result}...")
print("\n" + "=" * 60)
print("✅ 演示完成")
print("=" * 60)
if __name__ == "__main__":
demo()
+78
View File
@@ -0,0 +1,78 @@
"""
Step 03: 构建简单 Agent - 主程序
运行方式:
cd step_03_simple_agent
python main.py
"""
from concept import SimpleAgent
def main():
"""交互式演示 Agent"""
print("=" * 70)
print(" Step 03: 构建简单 Agent - 交互式演示")
print("=" * 70)
print("""
欢迎使用 Jaspersoft 报表助手!
我可以帮你:
- 生成 JRXML 报表模板
- 搜索相关报表模板
- 验证报表代码
- 执行数学计算
输入 'quit''退出' 结束对话
""")
# 创建 Agent
agent = SimpleAgent()
# 显示可用工具
print("\n📋 我的能力:")
for tool in agent.get_available_tools():
print(f"{tool['name']}: {tool['description']}")
print("\n" + "-" * 70)
print("开始对话:")
print("-" * 70)
# 交互式循环
while True:
try:
user_input = input("\n👤 你: ").strip()
if not user_input:
continue
if user_input.lower() in ["quit", "退出", "exit", "q"]:
print("\n👋 再见!")
break
# 处理输入
response = agent.process(user_input)
# 显示响应
print(f"\n🤖 Agent: {response}")
except KeyboardInterrupt:
print("\n\n👋 再见!")
break
except Exception as e:
print(f"\n❌ 出错了: {e}")
# 显示会话统计
print("\n\n" + "=" * 70)
print("📊 会话统计")
print("=" * 70)
print(f" 对话轮次: {len(agent.get_history()) // 2}")
print(f" 工具调用: {len(agent.state.get('tool_calls', []))}")
print("\n💡 继续学习:")
print(" Step 04: 添加 Memory - 记忆系统")
print(" Step 05: 添加 RAG - 知识检索")
if __name__ == "__main__":
main()
+51
View File
@@ -0,0 +1,51 @@
# Step 04: 添加 Memory - 记忆系统
## 🎯 学习目标
- 理解 Agent 为何需要记忆系统
- 理解多层记忆架构:短期 / 长期 / 工作记忆
- 学会实现上下文窗口管理
- 理解对话压缩技术
---
## 📖 核心概念
### 为什么 Agent 需要 Memory
```
没有 Memory
用户: "我叫张三"
Agent: 好的,张三先生
用户: "我叫啥?"
Agent: ??? 我不记得了
有 Memory
用户: "我叫张三"
Agent: 好的,张三先生
↓ 保存到 Memory
用户: "我叫啥?"
Agent: 你叫张三
↓ 从 Memory 读取
```
### 多层记忆架构
```
┌─────────────────────────────────────────────────────┐
│ Memory System │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │
│ │ Working │ │ Short │ │ Long │ │
│ │ Memory │ │ Term │ │ Term │ │
│ ├─────────────┤ ├─────────────┤ ├────────────┤ │
│ │ 当前状态 │ │ 对话历史 │ │ 知识库 │ │
│ │ 正在处理 │ │ 最近几轮 │ │ RAG检索 │ │
│ │ 的任务 │ │ 对话 │ │ 持久记忆 │ │
│ └─────────────┘ └─────────────┘ └────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
```
请打开 `concept.py` 查看实现。
+429
View File
@@ -0,0 +1,429 @@
"""
Step 04: Memory - 记忆系统
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎓 本节内容:
1. 为什么 Agent 需要 Memory
2. 多层记忆架构
3. 上下文窗口管理
4. 对话压缩技术
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from typing import TypedDict, List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import json
# ═══════════════════════════════════════════════════════════════════════════════
# 第一部分:理解为什么需要 Memory
# ═══════════════════════════════════════════════════════════════════════════════
"""
回顾 Step 03 的 SimpleAgent
问题:对话历史都存在 state['messages'] 里
问题:如果对话很长,发送给 LLM 的 token 会越来越多
问题:LLM 有上下文长度限制,不能无限增长
解决方案:多层 Memory 系统
1. Working Memory(工作记忆):当前正在处理的任务
2. Short-Term Memory(短期记忆):最近的对话轮次
3. Long-Term Memory(长期记忆):持久化的知识
核心思想:
- 不是所有信息都需要保留
- 重要的信息保留,不重要的压缩或丢弃
- 平衡"记住""效率"
"""
# ═══════════════════════════════════════════════════════════════════════════════
# 第二部分:Memory 的数据类型
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class Message:
"""对话消息"""
role: str # "user" / "assistant" / "system"
content: str
timestamp: str = ""
metadata: Dict[str, Any] = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
if self.metadata is None:
self.metadata = {}
def to_dict(self) -> dict:
return {
"role": self.role,
"content": self.content,
"timestamp": self.timestamp,
"metadata": self.metadata,
}
@dataclass
class MemorySnapshot:
"""记忆快照 - 用于保存和恢复状态"""
state: Dict[str, Any] # 关键状态
messages: List[Message] # 消息历史
key_info: Dict[str, Any] # 关键信息摘要
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
# ═══════════════════════════════════════════════════════════════════════════════
# 第三部分:Working Memory(工作记忆)
# ═══════════════════════════════════════════════════════════════════════════════
class WorkingMemory:
"""
工作记忆 - 当前正在处理的任务
特点:
- 容量小,只有几项
- 频繁读写
- 断电即失
- 存放当前任务的关键信息
"""
def __init__(self, capacity: int = 7):
"""
Args:
capacity: 最大容量,超过后自动清理
"""
self.capacity = capacity
self._data: Dict[str, Any] = {}
def remember(self, key: str, value: Any) -> None:
"""记住新信息"""
self._data[key] = {
"value": value,
"timestamp": datetime.now().isoformat()
}
self._cleanup()
def recall(self, key: str, default: Any = None) -> Any:
"""回忆信息"""
item = self._data.get(key)
if item:
return item["value"]
return default
def forget(self, key: str) -> None:
"""忘记信息"""
if key in self._data:
del self._data[key]
def clear(self) -> None:
"""清空所有记忆"""
self._data = {}
def get_all(self) -> Dict[str, Any]:
"""获取所有记忆"""
return {k: v["value"] for k, v in self._data.items()}
def _cleanup(self) -> None:
"""当容量超限时,清理最旧的信息"""
if len(self._data) > self.capacity:
# 按时间排序,删除最旧的
sorted_items = sorted(
self._data.items(),
key=lambda x: x[1]["timestamp"]
)
# 删除最早的 1/3
delete_count = self.capacity // 3
for key, _ in sorted_items[:delete_count]:
del self._data[key]
# ═══════════════════════════════════════════════════════════════════════════════
# 第四部分:Short-Term Memory(短期记忆)
# ═══════════════════════════════════════════════════════════════════════════════
class ShortTermMemory:
"""
短期记忆 - 最近的对对话
特点:
- 保存最近 N 轮对话
- 超出部分要么压缩,要么丢弃
- 容易被遗忘
- 模拟人类的短期记忆
"""
def __init__(self, max_messages: int = 20):
"""
Args:
max_messages: 最大保存消息数
"""
self.max_messages = max_messages
self.messages: List[Message] = []
def add(self, role: str, content: str, **metadata) -> None:
"""添加消息"""
msg = Message(role=role, content=content, metadata=metadata)
self.messages.append(msg)
self._trim()
def get_recent(self, n: int = 10) -> List[Message]:
"""获取最近 N 条消息"""
return self.messages[-n:]
def get_all(self) -> List[Message]:
"""获取所有消息"""
return self.messages.copy()
def summarize_older(self, keep_recent: int = 5) -> str:
"""
将较早的消息压缩为摘要
Args:
keep_recent: 最近多少条保持不变
Returns:
压缩后的摘要文本
"""
if len(self.messages) <= keep_recent:
return ""
older = self.messages[:-keep_recent]
# 简化为摘要
summary_parts = []
for msg in older:
role_label = "用户" if msg.role == "user" else "助手"
content_preview = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
summary_parts.append(f"{role_label}: {content_preview}")
return "\n".join(summary_parts)
def clear(self) -> None:
"""清空记忆"""
self.messages = []
def _trim(self) -> None:
"""超出容量时,删除最旧的消息"""
while len(self.messages) > self.max_messages:
self.messages.pop(0)
# ═══════════════════════════════════════════════════════════════════════════════
# 第五部分:Long-Term Memory(长期记忆)
# ═══════════════════════════════════════════════════════════════════════════════
class LongTermMemory:
"""
长期记忆 - 持久化的知识
特点:
- 存储在磁盘或数据库
- 持久保存
- 需要检索才能获取
- 容量大
- 对应 RAG(后续 Step 05
"""
def __init__(self):
"""
初始化长期记忆
实际应用中,这里会初始化:
- 向量数据库(ChromaDB/Pinecone
- 键值存储(Redis)
- 图数据库(Neo4j)
"""
self._storage: Dict[str, Any] = {}
def memorize(self, key: str, value: Any, tags: List[str] = None) -> None:
"""
存储信息到长期记忆
Args:
key: 唯一标识
value: 要存储的内容
tags: 标签,用于分类和检索
"""
self._storage[key] = {
"value": value,
"tags": tags or [],
"created_at": datetime.now().isoformat(),
"accessed_at": datetime.now().isoformat(),
}
def recall(self, key: str) -> Optional[Any]:
"""
从长期记忆中检索
Args:
key: 唯一标识
Returns:
存储的内容,如果不存在返回 None
"""
if key in self._storage:
# 更新访问时间
self._storage[key]["accessed_at"] = datetime.now().isoformat()
return self._storage[key]["value"]
return None
def search(self, query: str, tags: List[str] = None) -> List[Any]:
"""
搜索长期记忆
实际应用中,这里会使用向量相似度搜索
这里用简单的标签匹配演示
"""
results = []
for item in self._storage.values():
if tags:
# 标签匹配
if any(tag in item["tags"] for tag in tags):
results.append(item["value"])
elif query in str(item["value"]):
results.append(item["value"])
return results
def forget(self, key: str) -> None:
"""删除长期记忆"""
if key in self._storage:
del self._storage[key]
# ═══════════════════════════════════════════════════════════════════════════════
# 第六部分:完整的 Memory System
# ═══════════════════════════════════════════════════════════════════════════════
class MemorySystem:
"""
完整的记忆系统
整合三种记忆:
- Working Memory: 当前任务
- Short-Term Memory: 最近对话
- Long-Term Memory: 持久知识
"""
def __init__(self):
self.working = WorkingMemory(capacity=7)
self.short_term = ShortTermMemory(max_messages=20)
self.long_term = LongTermMemory()
def remember_task(self, task_id: str, task_info: Dict) -> None:
"""记住当前任务"""
self.working.remember("current_task", task_info)
def get_current_task(self) -> Optional[Dict]:
"""获取当前任务"""
return self.working.recall("current_task")
def add_message(self, role: str, content: str) -> None:
"""添加对话消息"""
self.short_term.add(role, content)
def get_context(self, include_older_summary: bool = True) -> str:
"""
获取发送给 LLM 的上下文
整合所有记忆,形成完整的上下文
"""
context_parts = []
# 当前任务
current_task = self.get_current_task()
if current_task:
context_parts.append(f"[当前任务]\n{json.dumps(current_task, ensure_ascii=False)}")
# 最近对话
recent = self.short_term.get_recent(10)
if recent:
context_parts.append("[最近对话]")
for msg in recent:
context_parts.append(f"- {msg.role}: {msg.content[:100]}")
# 较早对话摘要
if include_older_summary:
older_summary = self.short_term.summarize_older(keep_recent=5)
if older_summary:
context_parts.append(f"[较早对话摘要]\n{older_summary}")
return "\n\n".join(context_parts)
def clear_session(self) -> None:
"""清除会话相关的记忆(保留长期记忆)"""
self.working.clear()
self.short_term.clear()
# ═══════════════════════════════════════════════════════════════════════════════
# 演示代码
# ═══════════════════════════════════════════════════════════════════════════════
def demo():
"""演示记忆系统"""
print("=" * 60)
print("Step 04: Memory - 记忆系统演示")
print("=" * 60)
# 创建记忆系统
memory = MemorySystem()
# 1. 记住当前任务
print("\n📝 记住当前任务")
memory.remember_task("task_001", {
"type": "生成报表",
"requirement": "销售月报",
"status": "进行中"
})
print(f" 当前任务: {memory.get_current_task()}")
# 2. 添加对话
print("\n💬 添加对话消息")
messages = [
("user", "帮我生成一个销售报表"),
("assistant", "好的,请问你需要显示哪些数据?"),
("user", "显示月度汇总,包括销售额和数量"),
("assistant", "明白了,正在生成..."),
("user", "再添加一个增长率"),
("assistant", "好的,正在添加..."),
]
for role, content in messages:
memory.add_message(role, content)
print(f" 添加: [{role}] {content[:30]}...")
# 3. 获取上下文
print("\n📋 获取完整上下文")
context = memory.get_context()
print(context)
# 4. 获取较早对话摘要
print("\n📝 较早对话摘要")
summary = memory.short_term.summarize_older(keep_recent=3)
print(summary)
# 5. 长期记忆
print("\n💾 添加长期记忆")
memory.long_term.memorize(
key="user_preferences",
value={"name": "张三", "default_report": "销售报表"},
tags=["用户信息", "偏好"]
)
result = memory.long_term.recall("user_preferences")
print(f" 检索结果: {result}")
print("\n" + "=" * 60)
print("✅ 演示完成")
print("=" * 60)
if __name__ == "__main__":
demo()
+235
View File
@@ -0,0 +1,235 @@
# Step 05-07: RAG / Self-Correction / Multi-Agent
> 这些步骤是进阶内容,包含核心概念和实现代码。
## Step 05: RAG - 知识检索
### 核心概念
RAG = Retrieval-Augmented Generation(检索增强生成)
```
┌─────────────────────────────────────────────────────┐
│ RAG 流程 │
├─────────────────────────────────────────────────────┤
│ │
│ 用户问题 ──▶ 编码为向量 ──▶ 向量数据库检索 │
│ │ │
│ ▼ │
│ 找到最相关的文档 │
│ │ │
│ ▼ │
│ 把文档和问题一起发送给 LLM │
│ │ │
│ ▼ │
│ 生成答案 │
│ │
└─────────────────────────────────────────────────────┘
```
### 为什么需要 RAG
1. LLM 的知识有截止日期
2. LLM 不知道你私有的数据
3. RAG 让 LLM 能"查阅"外部知识
### 关键组件
| 组件 | 作用 |
|------|------|
| Embedding Model | 把文本变成向量 |
| Vector Database | 存储和检索向量 |
| Retrieval | 找到最相关的文档 |
| Generation | 用检索结果生成答案 |
### 简化实现
```python
class SimpleRAG:
"""简化版 RAG 系统"""
def __init__(self):
# 文档存储
self.documents = []
# 向量存储(简化版,用关键词)
self.vectors = {}
def add_document(self, text: str, metadata: dict = None):
"""添加文档"""
self.documents.append({
"text": text,
"metadata": metadata or {}
})
def retrieve(self, query: str, top_k: int = 3) -> list:
"""检索相关文档"""
# 简化版:基于关键词匹配
results = []
for doc in self.documents:
# 计算简单相关性分数
score = sum(1 for word in query if word in doc["text"].lower())
if score > 0:
results.append((score, doc))
# 排序并返回 top_k
results.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in results[:top_k]]
def generate(self, query: str, llm) -> str:
"""RAG 生成"""
docs = self.retrieve(query)
context = "\n".join([d["text"] for d in docs])
prompt = f"""
根据以下上下文回答问题:
上下文:
{context}
问题:{query}
答案:
"""
return llm.invoke(prompt)
```
---
## Step 06: Self-Correction - 自我修正
### 核心概念
Self-Correction = 让 Agent 能够自我发现并修复错误
```
┌─────────────────────────────────────────────────────┐
│ Self-Correction 流程 │
├─────────────────────────────────────────────────────┤
│ │
│ 生成结果 ──▶ 验证 ──▶ 有问题? │
│ │ │
│ ┌────┴────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 分析错误 返回结果 │
│ │ │
│ ▼ │
│ 生成修复方案 │
│ │ │
│ ▼ │
│ 重新生成 ──▶ 再次验证 │
│ │
└─────────────────────────────────────────────────────┘
```
### 实现要点
```python
class SelfCorrectingAgent:
"""自我修正 Agent"""
def __init__(self):
self.max_retries = 3
def try_generate(self, requirement: str) -> str:
"""带自我修正的生成"""
for attempt in range(self.max_retries):
# 1. 生成
result = self.generate(requirement)
# 2. 验证
validation = self.validate(result)
# 3. 检查是否通过
if validation["passed"]:
return result
# 4. 分析错误
error = validation["error"]
print(f"尝试 {attempt + 1} 失败: {error}")
# 5. 准备修复
requirement = self.prepare_fix(requirement, error, result)
return f"经过 {self.max_retries} 次尝试仍失败"
```
---
## Step 07: Multi-Agent - 多 Agent 协作
### 核心概念
Multi-Agent = 多个专门的 Agent 协同工作
```
┌─────────────────────────────────────────────────────┐
│ Multi-Agent 架构 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Orchestrator │ │
│ │ (协调者) │ │
│ └──────┬───────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Generator│ │ Validator│ │ Searcher │ │
│ │ (生成者) │ │ (验证者) │ │ (搜索者) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────┘
```
### 协作模式
| 模式 | 说明 | 适用场景 |
|------|------|---------|
| 串行 | A → B → C 依次执行 | 步骤有依赖 |
| 并行 | A / B / C 同时执行 | 步骤独立 |
| 循环 | A → B → A → B 循环 | 需要反复验证 |
### 简化实现
```python
class MultiAgentSystem:
"""多 Agent 协作系统"""
def __init__(self):
# 注册各个 Agent
self.agents = {
"generator": GeneratorAgent(),
"validator": ValidatorAgent(),
"searcher": SearcherAgent(),
}
# 协调器
self.orchestrator = Orchestrator(self.agents)
def process(self, requirement: str) -> str:
"""协调多个 Agent 处理请求"""
# 1. 搜索相关知识
context = self.agents["searcher"].search(requirement)
# 2. 生成(可能需要多轮)
for attempt in range(3):
draft = self.agents["generator"].generate(requirement, context)
# 3. 验证
validation = self.agents["validator"].validate(draft)
if validation["passed"]:
return validation["result"]
return "处理失败"
```
---
## 📚 学习资源
- [LangGraph 文档](https://langchain-ai.github.io/langgraph/)
- [RAG 最佳实践](https://www.pinecone.io/learn/rag/)
- [Multi-Agent 系统设计](https://arxiv.org/abs/2308.03688)
+286
View File
@@ -0,0 +1,286 @@
"""
Step 05-07: RAG / Self-Correction / Multi-Agent
进阶内容代码示例
"""
# ═══════════════════════════════════════════════════════════════════════════════════════
# RAG 实现
# ═══════════════════════════════════════════════════════════════════════════════════════
class SimpleRAG:
"""
简化版 RAG 系统
实际应用中请使用:
- ChromaDB / Pinecone / Weaviate(向量数据库)
- sentence-transformers / OpenAI Embeddings(向量模型)
"""
def __init__(self):
self.documents = []
def add_document(self, text: str, metadata: dict = None):
"""添加文档"""
self.documents.append({
"text": text,
"metadata": metadata or {},
"id": len(self.documents)
})
def retrieve(self, query: str, top_k: int = 3) -> list:
"""检索相关文档(简化版:基于关键词)"""
results = []
query_words = set(query.lower().split())
for doc in self.documents:
doc_words = set(doc["text"].lower().split())
# 简单的 Jaccard 相似度
intersection = query_words & doc_words
union = query_words | doc_words
if union:
score = len(intersection) / len(union)
results.append((score, doc))
results.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in results[:top_k]]
def generate(self, query: str, context_only: bool = False):
"""
生成答案
如果 context_only=True,只返回检索到的上下文
否则进行 RAG 生成(需要接入 LLM)
"""
docs = self.retrieve(query)
context = "\n\n".join([
f"[来源: {d['metadata'].get('source', '未知')}]\n{d['text']}"
for d in docs
])
return context
# ═══════════════════════════════════════════════════════════════════════════════════════
# Self-Correction 实现
# ═══════════════════════════════════════════════════════════════════════════════════════
@dataclass
class ValidationResult:
"""验证结果"""
passed: bool
score: float
issues: List[str]
suggestion: str = ""
class SelfCorrectingAgent:
"""
自我修正 Agent
工作流程:
1. 生成初始结果
2. 验证结果
3. 如果有问题,分析错误并修复
4. 循环直到通过或达到最大重试次数
"""
def __init__(self, generator, validator):
self.generator = generator
self.validator = validator
self.max_retries = 3
def generate_with_correction(self, requirement: str) -> dict:
"""带自我修正的生成"""
history = []
current_requirement = requirement
for attempt in range(self.max_retries):
# 1. 生成
result = self.generator.generate(current_requirement)
history.append({
"attempt": attempt + 1,
"requirement": current_requirement,
"result": result
})
# 2. 验证
validation = self.validator.validate(result)
history[-1]["validation"] = validation
if validation.passed:
return {
"success": True,
"result": result,
"attempts": attempt + 1,
"history": history
}
# 3. 分析错误,准备修复
print(f"尝试 {attempt + 1} 失败: {validation.issues}")
current_requirement = self._prepare_fix(
requirement,
validation,
result
)
return {
"success": False,
"error": "达到最大重试次数",
"history": history
}
def _prepare_fix(self, original: str, validation: ValidationResult, result) -> str:
"""准备修复提示"""
issues_text = "\n".join(f"- {issue}" for issue in validation.issues)
return f"""
原始需求:{original}
上次生成结果:
{result}
验证发现的问题:
{issues_text}
验证建议:{validation.suggestion}
请根据以上信息,修正生成结果。
"""
# ═══════════════════════════════════════════════════════════════════════════════════════
# Multi-Agent 实现
# ═══════════════════════════════════════════════════════════════════════════════════════
from dataclasses import dataclass, field
from typing import Dict, List, Callable
@dataclass
class AgentMessage:
"""Agent 之间的消息"""
from_agent: str
to_agent: str
content: Any
message_type: str # "request" / "response" / "broadcast"
class Agent:
"""基础 Agent 类"""
name: str
def process(self, input_data: Any) -> Any:
"""处理输入,返回结果"""
raise NotImplementedError
class MultiAgentSystem:
"""
多 Agent 协作系统
组件:
- agents: 注册的 Agent 字典
- orchestrator: 协调器,决定消息路由
- message_queue: 消息队列
"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.message_queue: List[AgentMessage] = []
self.history: List[AgentMessage] = []
def register(self, agent: Agent):
"""注册 Agent"""
self.agents[agent.name] = agent
def send_message(self, from_agent: str, to_agent: str, content: Any,
msg_type: str = "request"):
"""发送消息"""
msg = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
content=content,
message_type=msg_type
)
self.message_queue.append(msg)
def broadcast(self, from_agent: str, content: Any):
"""广播消息给所有 Agent"""
for agent_name in self.agents:
if agent_name != from_agent:
self.send_message(from_agent, agent_name, content, "broadcast")
def process(self, requirement: str) -> Any:
"""
处理请求
简化实现:顺序执行各个 Agent
"""
# 1. 搜索
searcher = self.agents.get("searcher")
context = searcher.process(requirement) if searcher else ""
# 2. 生成
generator = self.agents.get("generator")
result = generator.process({"requirement": requirement, "context": context}) if generator else requirement
# 3. 验证
validator = self.agents.get("validator")
validation = validator.process(result) if validator else {"passed": True}
if not validation.get("passed", True):
return {"error": "验证失败", "validation": validation}
return result
def demo():
"""演示"""
print("=" * 60)
print("Step 05-07: 进阶功能演示")
print("=" * 60)
# RAG 演示
print("\n📚 RAG 演示")
rag = SimpleRAG()
rag.add_document("JasperReports 是一个 Java 报表库", {"source": "文档1"})
rag.add_document("JRXML 是 JasperReports 的报表模板格式", {"source": "文档2"})
rag.add_document("可以使用 LLM 生成 JRXML 代码", {"source": "文档3"})
result = rag.retrieve("JasperReports 是什么")
print(f" 查询 'JasperReports 是什么'")
for doc in result:
print(f" - {doc['text']} (来源: {doc['metadata']['source']})")
# Multi-Agent 演示
print("\n\n🤖 Multi-Agent 演示")
class DemoSearcher(Agent):
name = "searcher"
def process(self, input_data):
print(f" [{self.name}] 搜索相关资料...")
return "找到相关模板和文档"
class DemoGenerator(Agent):
name = "generator"
def process(self, input_data):
print(f" [{self.name}] 生成报表...")
return "<jasperReport>生成的报表</jasperReport>"
class DemoValidator(Agent):
name = "validator"
def process(self, input_data):
print(f" [{self.name}] 验证结果...")
return {"passed": True}
system = MultiAgentSystem()
system.register(DemoSearcher())
system.register(DemoGenerator())
system.register(DemoValidator())
result = system.process("生成销售报表")
print(f"\n 最终结果: {result[:50]}...")
if __name__ == "__main__":
demo()