161 lines
7.0 KiB
Python
161 lines
7.0 KiB
Python
"""
|
|
Step 03 练习题答案
|
|
|
|
⚠️ 先自己思考,再看答案!
|
|
⚠️ 答案不是唯一的,这里只是其中一种实现
|
|
"""
|
|
|
|
import copy
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from step_03_simple_agent.concept import (
|
|
BaseTool,
|
|
SimpleAgent,
|
|
ToolResult,
|
|
)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# 练习 1 答案:DateTimeTool
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class DateTimeTool(BaseTool):
|
|
"""日期时间工具"""
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "datetime"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "日期时间工具,支持 now(当前时间)/ today(今天日期)/ weekday(星期几)"
|
|
|
|
def execute(self, **kwargs) -> ToolResult:
|
|
operation = kwargs.get("operation", "now")
|
|
now = datetime.now()
|
|
if operation == "now":
|
|
return ToolResult(success=True, result=now.strftime("%Y-%m-%d %H:%M:%S"))
|
|
if operation == "today":
|
|
return ToolResult(success=True, result=now.strftime("%Y-%m-%d"))
|
|
if operation == "weekday":
|
|
names = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
|
|
return ToolResult(success=True, result=names[now.weekday()])
|
|
return ToolResult(success=False, error=f"不支持的操作: {operation}")
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# 练习 2 答案:稳健的 Brain 路由
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
EXPR_PATTERN = re.compile(r"-?\d+(?:\.\d+)?\s*[+\-*/]\s*-?\d+(?:\.\d+)?")
|
|
|
|
|
|
def improved_brain_decide(self, state):
|
|
"""把 AgentBrain.decide 替换为更稳健的版本。"""
|
|
user_input = state.get("user_input", "")
|
|
tool_result = state.get("tool_result")
|
|
|
|
# 工具结果回灌:直接产出回应
|
|
if tool_result is not None:
|
|
if isinstance(tool_result, dict) and not tool_result.get("success", True):
|
|
return {
|
|
"action": "respond",
|
|
"response": f"工具执行失败: {tool_result.get('error', '未知错误')}",
|
|
}
|
|
return {"action": "respond", "response": f"工具执行完成,结果:{tool_result}"}
|
|
|
|
text = user_input.lower()
|
|
|
|
# 明确的算式(如 "1 + 2" / "10*3")才走 calculator
|
|
expr_match = EXPR_PATTERN.search(user_input)
|
|
if expr_match and re.fullmatch(r"[\d\s+\-*/().]+", expr_match.group()):
|
|
return {
|
|
"action": "use_tool",
|
|
"tool_name": "calculator",
|
|
"tool_args": {"expression": expr_match.group().strip()},
|
|
}
|
|
|
|
# 日期时间路由
|
|
if any(kw in text for kw in ["现在几点", "现在时间", "今天", "日期", "星期"]):
|
|
op = "weekday" if "星期" in text else ("today" if "今天" in text or "日期" in text else "now")
|
|
return {"action": "use_tool", "tool_name": "datetime", "tool_args": {"operation": op}}
|
|
|
|
# 其他交给模板搜索
|
|
if any(kw in user_input for kw in ["报表", "模板", "jrxml", "jasper"]):
|
|
return {
|
|
"action": "use_tool",
|
|
"tool_name": "template_search",
|
|
"tool_args": {"keyword": user_input},
|
|
}
|
|
|
|
return {
|
|
"action": "respond",
|
|
"response": f"我收到了你的输入:{user_input}(暂无可用工具直接处理)",
|
|
}
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# 练习 3 答案:snapshot / restore
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def install_snapshot_methods(agent: SimpleAgent) -> None:
|
|
"""给 SimpleAgent 实例挂上 snapshot/restore 方法。"""
|
|
|
|
def snapshot(self):
|
|
return {
|
|
"messages": copy.deepcopy(self.state["messages"]),
|
|
"tool_result": copy.deepcopy(self.state.get("tool_result")),
|
|
"current_action": self.state.get("current_action"),
|
|
"round_count": self.round_count,
|
|
}
|
|
|
|
def restore(self, snap):
|
|
self.state["messages"] = copy.deepcopy(snap["messages"])
|
|
self.state["tool_result"] = copy.deepcopy(snap.get("tool_result"))
|
|
self.state["current_action"] = snap.get("current_action")
|
|
self.round_count = snap.get("round_count", 0)
|
|
|
|
SimpleAgent.snapshot = snapshot
|
|
SimpleAgent.restore = restore
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# 测试
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def test_answers():
|
|
print("\n" + "=" * 60)
|
|
print("Step 03 练习答案测试")
|
|
print("=" * 60)
|
|
|
|
agent = SimpleAgent()
|
|
|
|
# 注册新工具 + 替换 brain
|
|
agent.tools["datetime"] = DateTimeTool()
|
|
agent.brain.decide = improved_brain_decide.__get__(agent.brain)
|
|
|
|
print("\n📝 练习 1: DateTimeTool")
|
|
print(" 工具列表:", list(agent.tools.keys()))
|
|
print(" now ->", agent.tools["datetime"].execute(operation="now").result)
|
|
print(" weekday ->", agent.tools["datetime"].execute(operation="weekday").result)
|
|
|
|
print("\n📝 练习 2: 改进的 Brain 路由")
|
|
for q in ["1 + 2", "iOS / Android 兼容性", "现在几点", "今天星期几"]:
|
|
decision = agent.brain.decide({"user_input": q, "tool_result": None})
|
|
print(f" '{q}' -> {decision['action']} / {decision.get('tool_name', decision.get('response'))}")
|
|
|
|
print("\n📝 练习 3: snapshot / restore")
|
|
install_snapshot_methods(agent)
|
|
agent.process("1 + 2")
|
|
snap = agent.snapshot()
|
|
print(" snapshot round_count =", snap["round_count"])
|
|
agent.process("3 * 4")
|
|
print(" after 2 rounds, round_count =", agent.round_count)
|
|
agent.restore(snap)
|
|
print(" after restore, round_count =", agent.round_count)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_answers()
|