287 lines
9.2 KiB
Python
287 lines
9.2 KiB
Python
"""
|
|
Step 05-07: RAG / Self-Correction / Multi-Agent
|
|
|
|
进阶内容代码示例
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════════════
|
|
# RAG 实现
|
|
# ═══════════════════════════════════════════════════════════════════════════════════════
|
|
|
|
class SimpleRAG:
|
|
"""
|
|
简化版 RAG 系统
|
|
|
|
实际应用中请使用:
|
|
- ChromaDB / Pinecone / Weaviate(向量数据库)
|
|
- sentence-transformers / OpenAI Embeddings(向量模型)
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.documents = []
|
|
|
|
def add_document(self, text: str, metadata: dict = None):
|
|
"""添加文档"""
|
|
self.documents.append({
|
|
"text": text,
|
|
"metadata": metadata or {},
|
|
"id": len(self.documents)
|
|
})
|
|
|
|
def retrieve(self, query: str, top_k: int = 3) -> list:
|
|
"""检索相关文档(简化版:基于关键词)"""
|
|
results = []
|
|
query_words = set(query.lower().split())
|
|
|
|
for doc in self.documents:
|
|
doc_words = set(doc["text"].lower().split())
|
|
# 简单的 Jaccard 相似度
|
|
intersection = query_words & doc_words
|
|
union = query_words | doc_words
|
|
if union:
|
|
score = len(intersection) / len(union)
|
|
results.append((score, doc))
|
|
|
|
results.sort(key=lambda x: x[0], reverse=True)
|
|
return [doc for _, doc in results[:top_k]]
|
|
|
|
def generate(self, query: str, context_only: bool = False):
|
|
"""
|
|
生成答案
|
|
|
|
如果 context_only=True,只返回检索到的上下文
|
|
否则进行 RAG 生成(需要接入 LLM)
|
|
"""
|
|
docs = self.retrieve(query)
|
|
context = "\n\n".join([
|
|
f"[来源: {d['metadata'].get('source', '未知')}]\n{d['text']}"
|
|
for d in docs
|
|
])
|
|
return context
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════════════
|
|
# Self-Correction 实现
|
|
# ═══════════════════════════════════════════════════════════════════════════════════════
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""验证结果"""
|
|
passed: bool
|
|
score: float
|
|
issues: List[str]
|
|
suggestion: str = ""
|
|
|
|
|
|
class SelfCorrectingAgent:
|
|
"""
|
|
自我修正 Agent
|
|
|
|
工作流程:
|
|
1. 生成初始结果
|
|
2. 验证结果
|
|
3. 如果有问题,分析错误并修复
|
|
4. 循环直到通过或达到最大重试次数
|
|
"""
|
|
|
|
def __init__(self, generator, validator):
|
|
self.generator = generator
|
|
self.validator = validator
|
|
self.max_retries = 3
|
|
|
|
def generate_with_correction(self, requirement: str) -> dict:
|
|
"""带自我修正的生成"""
|
|
history = []
|
|
current_requirement = requirement
|
|
|
|
for attempt in range(self.max_retries):
|
|
# 1. 生成
|
|
result = self.generator.generate(current_requirement)
|
|
history.append({
|
|
"attempt": attempt + 1,
|
|
"requirement": current_requirement,
|
|
"result": result
|
|
})
|
|
|
|
# 2. 验证
|
|
validation = self.validator.validate(result)
|
|
history[-1]["validation"] = validation
|
|
|
|
if validation.passed:
|
|
return {
|
|
"success": True,
|
|
"result": result,
|
|
"attempts": attempt + 1,
|
|
"history": history
|
|
}
|
|
|
|
# 3. 分析错误,准备修复
|
|
print(f"尝试 {attempt + 1} 失败: {validation.issues}")
|
|
current_requirement = self._prepare_fix(
|
|
requirement,
|
|
validation,
|
|
result
|
|
)
|
|
|
|
return {
|
|
"success": False,
|
|
"error": "达到最大重试次数",
|
|
"history": history
|
|
}
|
|
|
|
def _prepare_fix(self, original: str, validation: ValidationResult, result) -> str:
|
|
"""准备修复提示"""
|
|
issues_text = "\n".join(f"- {issue}" for issue in validation.issues)
|
|
|
|
return f"""
|
|
原始需求:{original}
|
|
|
|
上次生成结果:
|
|
{result}
|
|
|
|
验证发现的问题:
|
|
{issues_text}
|
|
|
|
验证建议:{validation.suggestion}
|
|
|
|
请根据以上信息,修正生成结果。
|
|
"""
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════════════
|
|
# Multi-Agent 实现
|
|
# ═══════════════════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@dataclass
|
|
class AgentMessage:
|
|
"""Agent 之间的消息"""
|
|
from_agent: str
|
|
to_agent: str
|
|
content: Any
|
|
message_type: str # "request" / "response" / "broadcast"
|
|
|
|
|
|
class Agent:
|
|
"""基础 Agent 类"""
|
|
name: str
|
|
|
|
def process(self, input_data: Any) -> Any:
|
|
"""处理输入,返回结果"""
|
|
raise NotImplementedError
|
|
|
|
|
|
class MultiAgentSystem:
|
|
"""
|
|
多 Agent 协作系统
|
|
|
|
组件:
|
|
- agents: 注册的 Agent 字典
|
|
- orchestrator: 协调器,决定消息路由
|
|
- message_queue: 消息队列
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.agents: Dict[str, Agent] = {}
|
|
self.message_queue: List[AgentMessage] = []
|
|
self.history: List[AgentMessage] = []
|
|
|
|
def register(self, agent: Agent):
|
|
"""注册 Agent"""
|
|
self.agents[agent.name] = agent
|
|
|
|
def send_message(self, from_agent: str, to_agent: str, content: Any,
|
|
msg_type: str = "request"):
|
|
"""发送消息"""
|
|
msg = AgentMessage(
|
|
from_agent=from_agent,
|
|
to_agent=to_agent,
|
|
content=content,
|
|
message_type=msg_type
|
|
)
|
|
self.message_queue.append(msg)
|
|
|
|
def broadcast(self, from_agent: str, content: Any):
|
|
"""广播消息给所有 Agent"""
|
|
for agent_name in self.agents:
|
|
if agent_name != from_agent:
|
|
self.send_message(from_agent, agent_name, content, "broadcast")
|
|
|
|
def process(self, requirement: str) -> Any:
|
|
"""
|
|
处理请求
|
|
|
|
简化实现:顺序执行各个 Agent
|
|
"""
|
|
# 1. 搜索
|
|
searcher = self.agents.get("searcher")
|
|
context = searcher.process(requirement) if searcher else ""
|
|
|
|
# 2. 生成
|
|
generator = self.agents.get("generator")
|
|
result = generator.process({"requirement": requirement, "context": context}) if generator else requirement
|
|
|
|
# 3. 验证
|
|
validator = self.agents.get("validator")
|
|
validation = validator.process(result) if validator else {"passed": True}
|
|
|
|
if not validation.get("passed", True):
|
|
return {"error": "验证失败", "validation": validation}
|
|
|
|
return result
|
|
|
|
|
|
def demo():
|
|
"""演示"""
|
|
print("=" * 60)
|
|
print("Step 05-07: 进阶功能演示")
|
|
print("=" * 60)
|
|
|
|
# RAG 演示
|
|
print("\n📚 RAG 演示")
|
|
rag = SimpleRAG()
|
|
rag.add_document("JasperReports 是一个 Java 报表库", {"source": "文档1"})
|
|
rag.add_document("JRXML 是 JasperReports 的报表模板格式", {"source": "文档2"})
|
|
rag.add_document("可以使用 LLM 生成 JRXML 代码", {"source": "文档3"})
|
|
|
|
result = rag.retrieve("JasperReports 是什么")
|
|
print(f" 查询 'JasperReports 是什么'")
|
|
for doc in result:
|
|
print(f" - {doc['text']} (来源: {doc['metadata']['source']})")
|
|
|
|
# Multi-Agent 演示
|
|
print("\n\n🤖 Multi-Agent 演示")
|
|
|
|
class DemoSearcher(Agent):
|
|
name = "searcher"
|
|
def process(self, input_data):
|
|
print(f" [{self.name}] 搜索相关资料...")
|
|
return "找到相关模板和文档"
|
|
|
|
class DemoGenerator(Agent):
|
|
name = "generator"
|
|
def process(self, input_data):
|
|
print(f" [{self.name}] 生成报表...")
|
|
return "<jasperReport>生成的报表</jasperReport>"
|
|
|
|
class DemoValidator(Agent):
|
|
name = "validator"
|
|
def process(self, input_data):
|
|
print(f" [{self.name}] 验证结果...")
|
|
return {"passed": True}
|
|
|
|
system = MultiAgentSystem()
|
|
system.register(DemoSearcher())
|
|
system.register(DemoGenerator())
|
|
system.register(DemoValidator())
|
|
|
|
result = system.process("生成销售报表")
|
|
print(f"\n 最终结果: {result[:50]}...")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
demo()
|