极简LLM入门指南 7
本文介绍了如何开发基于LLM的智能Agent系统。主要内容包括: 最简Agent实现:使用LangChain快速构建具备搜索引擎功能的Agent ReAct框架实现:详细展示了思考-行动-观察循环的完整代码实现 工具开发与集成:演示如何创建自定义工具(如天气查询和代码执行工具)并进行工具编排 文章提供了可直接运行的代码示例,帮助开发者快速上手Agent开发,构建能够自主决策、调用工具完成复杂任务的
·
【LLM实操系列07】Agent开发:构建自主AI智能体
在开始之前,建议先完成第04篇(理解ReAct概念)和第03篇(API调用)。你需要理解工具调用和思考-行动-观察循环的基本概念,并安装langchain及相关工具库。
10分钟实现第一个Agent
什么是Agent?
Agent = LLM + 工具 + 记忆 + 规划
最简Agent实现
# pip install langchain openai wikipedia duckduckgo-search
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain.chat_models import ChatOpenAI
from langchain.tools import DuckDuckGoSearchRun
# 1. 定义工具
search = DuckDuckGoSearchRun()
tools = [
Tool(
name="Search",
func=search.run,
description="搜索引擎,用于查找实时信息"
)
]
# 2. 创建Agent
llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 3. 执行任务
result = agent.run("今天上海的天气如何?")
print(result)
ReAct框架:思考→行动→观察
基于第04篇的ReAct完整实现
# 注:第04篇介绍了ReAct的概念,这里实现完整的工具调用
class ReActAgent:
"""ReAct模式Agent实现(完整版)"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = 5
def run(self, task: str):
"""执行任务"""
prompt = f"""
任务:{task}
可用工具:
{self._format_tools()}
使用以下格式:
Thought: 我需要思考下一步做什么
Action: 工具名称
Action Input: 工具输入参数
Observation: 工具返回结果
... (重复N次)
Thought: 我知道最终答案了
Final Answer: 最终答案
开始:
"""
for i in range(self.max_iterations):
# 获取LLM输出
response = self.llm.predict(prompt)
# 解析行动
action = self._parse_action(response)
if action["type"] == "Final Answer":
return action["content"]
# 执行工具
tool = self.tools[action["tool"]]
observation = tool.func(action["input"])
# 添加观察结果
prompt += f"""
Thought: {action["thought"]}
Action: {action["tool"]}
Action Input: {action["input"]}
Observation: {observation}
"""
return "达到最大迭代次数"
def _parse_action(self, text: str):
"""解析LLM输出"""
if "Final Answer:" in text:
return {
"type": "Final Answer",
"content": text.split("Final Answer:")[-1].strip()
}
# 提取Action和Input
lines = text.strip().split('\n')
thought = ""
action = ""
action_input = ""
for line in lines:
if line.startswith("Thought:"):
thought = line[8:].strip()
elif line.startswith("Action:"):
action = line[7:].strip()
elif line.startswith("Action Input:"):
action_input = line[13:].strip()
return {
"type": "Action",
"thought": thought,
"tool": action,
"input": action_input
}
工具开发与集成
1. 自定义工具
from typing import Optional, Type
from pydantic import BaseModel, Field
from langchain.tools import BaseTool
import requests
class WeatherInput(BaseModel):
"""天气查询输入"""
city: str = Field(description="城市名称")
class WeatherTool(BaseTool):
"""天气查询工具"""
name = "weather"
description = "查询指定城市的天气"
args_schema: Type[BaseModel] = WeatherInput
def _run(self, city: str) -> str:
"""同步执行"""
# 实际调用天气API
api_key = "YOUR_API_KEY"
url = f"http://api.weatherapi.com/v1/current.json?key={api_key}&q={city}"
response = requests.get(url)
data = response.json()
return f"{city}天气:{data['current']['condition']['text']},温度:{data['current']['temp_c']}°C"
async def _arun(self, city: str) -> str:
"""异步执行"""
# 异步版本实现
pass
# 代码执行工具
class CodeExecutor(BaseTool):
"""Python代码执行工具"""
name = "python"
description = "执行Python代码并返回结果"
def _run(self, code: str) -> str:
"""安全执行代码"""
import sys
from io import StringIO
# 捕获输出
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
# 限制执行环境
exec(code, {"__builtins__": {
"print": print,
"len": len,
"range": range,
"str": str,
"int": int,
"float": float,
}})
output = sys.stdout.getvalue()
except Exception as e:
output = f"错误:{e}"
finally:
sys.stdout = old_stdout
return output
2. 工具编排
class ToolOrchestrator:
"""工具编排器"""
def __init__(self):
self.tools = {}
def register_tool(self, tool: BaseTool):
"""注册工具"""
self.tools[tool.name] = tool
def create_toolkit(self, task_type: str):
"""根据任务类型选择工具集"""
toolkits = {
"research": ["search", "wikipedia", "arxiv"],
"coding": ["python", "shell", "file_ops"],
"analysis": ["calculator", "pandas", "plot"],
"communication": ["email", "slack", "calendar"]
}
selected_tools = []
for tool_name in toolkits.get(task_type, []):
if tool_name in self.tools:
selected_tools.append(self.tools[tool_name])
return selected_tools
Function Calling(OpenAI风格)
实现Function Calling
import json
from typing import List, Dict, Any
class FunctionCallingAgent:
"""支持Function Calling的Agent"""
def __init__(self, model="gpt-3.5-turbo"):
self.model = model
self.functions = []
def register_function(self, func, description: str, parameters: dict):
"""注册函数"""
self.functions.append({
"name": func.__name__,
"description": description,
"parameters": parameters,
"func": func
})
def run(self, messages: List[Dict]):
"""执行对话"""
import openai
# 准备函数定义
functions_def = [
{
"name": f["name"],
"description": f["description"],
"parameters": f["parameters"]
}
for f in self.functions
]
# 调用OpenAI API
response = openai.ChatCompletion.create(
model=self.model,
messages=messages,
functions=functions_def,
function_call="auto"
)
message = response.choices[0].message
# 检查是否需要调用函数
if message.get("function_call"):
function_name = message["function_call"]["name"]
function_args = json.loads(message["function_call"]["arguments"])
# 执行函数
func = next(f["func"] for f in self.functions
if f["name"] == function_name)
result = func(**function_args)
# 将结果返回给模型
messages.append(message)
messages.append({
"role": "function",
"name": function_name,
"content": str(result)
})
# 获取最终回答
final_response = openai.ChatCompletion.create(
model=self.model,
messages=messages
)
return final_response.choices[0].message["content"]
return message["content"]
# 使用示例
agent = FunctionCallingAgent()
# 注册函数
def get_weather(location: str, unit: str = "celsius"):
"""获取天气信息"""
return f"{location}的温度是25°{unit[0].upper()}"
agent.register_function(
get_weather,
"获取指定地点的天气",
{
"type": "object",
"properties": {
"location": {"type": "string", "description": "城市名"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
)
# 执行
result = agent.run([
{"role": "user", "content": "北京今天天气怎么样?"}
])
多Agent协作系统
1. Agent角色定义
class SpecializedAgent:
"""专业Agent基类"""
def __init__(self, name: str, role: str, skills: List[str]):
self.name = name
self.role = role
self.skills = skills
self.llm = ChatOpenAI(temperature=0.7)
def can_handle(self, task: str) -> bool:
"""判断是否能处理任务"""
prompt = f"""
作为{self.role},我的技能包括:{', '.join(self.skills)}
判断我是否适合处理以下任务:{task}
回答:是/否
"""
response = self.llm.predict(prompt)
return "是" in response
def process(self, task: str) -> str:
"""处理任务"""
prompt = f"""
角色:{self.role}
技能:{', '.join(self.skills)}
任务:{task}
请完成任务:
"""
return self.llm.predict(prompt)
# 创建专业Agent
researcher = SpecializedAgent(
"研究员",
"信息研究专家",
["搜索", "文献分析", "数据收集"]
)
coder = SpecializedAgent(
"程序员",
"软件开发专家",
["Python", "调试", "代码优化"]
)
analyst = SpecializedAgent(
"分析师",
"数据分析专家",
["统计分析", "可视化", "报告撰写"]
)
2. 协作框架
class MultiAgentSystem:
"""多Agent协作系统"""
def __init__(self):
self.agents = []
self.coordinator = ChatOpenAI(temperature=0)
self.conversation_history = []
def add_agent(self, agent: SpecializedAgent):
"""添加Agent"""
self.agents.append(agent)
def execute(self, task: str):
"""执行复杂任务"""
# 1. 任务分解
subtasks = self._decompose_task(task)
results = {}
for subtask in subtasks:
# 2. 分配Agent
agent = self._assign_agent(subtask)
if agent:
# 3. 执行子任务
result = agent.process(subtask)
results[subtask] = {
"agent": agent.name,
"result": result
}
# 4. 记录对话
self.conversation_history.append({
"agent": agent.name,
"task": subtask,
"result": result
})
# 5. 整合结果
return self._synthesize_results(task, results)
def _decompose_task(self, task: str) -> List[str]:
"""任务分解"""
prompt = f"""
将以下复杂任务分解为3-5个子任务:
任务:{task}
输出格式:
1. 子任务1
2. 子任务2
...
"""
response = self.coordinator.predict(prompt)
subtasks = []
for line in response.split('\n'):
if line.strip() and line[0].isdigit():
subtasks.append(line.split('.', 1)[1].strip())
return subtasks
def _assign_agent(self, subtask: str) -> SpecializedAgent:
"""分配最合适的Agent"""
for agent in self.agents:
if agent.can_handle(subtask):
return agent
return None
def _synthesize_results(self, task: str, results: Dict):
"""整合结果"""
prompt = f"""
原始任务:{task}
各Agent完成情况:
{json.dumps(results, ensure_ascii=False, indent=2)}
请整合以上结果,给出最终答案:
"""
return self.coordinator.predict(prompt)
生产级Agent实战
完整的客服Agent系统
class CustomerServiceAgent:
"""客服Agent系统"""
def __init__(self):
self.llm = ChatOpenAI(temperature=0.3)
self.memory = ConversationBufferWindowMemory(k=5)
self.knowledge_base = None # RAG系统
self.tools = self._init_tools()
def _init_tools(self):
"""初始化工具集"""
return [
Tool(name="查询订单", func=self._query_order),
Tool(name="退款处理", func=self._process_refund),
Tool(name="人工转接", func=self._transfer_human),
Tool(name="知识库", func=self._search_kb),
]
def handle_customer(self, message: str, customer_id: str):
"""处理客户消息"""
# 1. 意图识别
intent = self._identify_intent(message)
# 2. 情绪检测
emotion = self._detect_emotion(message)
# 3. 获取客户历史
history = self._get_customer_history(customer_id)
# 4. 构建Agent
agent = initialize_agent(
self.tools,
self.llm,
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=self.memory,
verbose=True
)
# 5. 生成回复
context = f"""
客户ID: {customer_id}
意图: {intent}
情绪: {emotion}
历史问题: {history[-3:] if history else '无'}
客户消息: {message}
"""
response = agent.run(context)
# 6. 记录交互
self._log_interaction(customer_id, message, response)
return response
def _identify_intent(self, message: str) -> str:
"""意图识别"""
intents = ["查询", "投诉", "退款", "咨询", "其他"]
prompt = f"""
识别客户意图:
消息:{message}
可选意图:{', '.join(intents)}
输出意图:
"""
return self.llm.predict(prompt).strip()
def _detect_emotion(self, message: str) -> str:
"""情绪检测"""
prompt = f"""
检测客户情绪(积极/中性/消极/愤怒):
消息:{message}
情绪:
"""
return self.llm.predict(prompt).strip()
调试与优化
class AgentDebugger:
"""Agent调试工具"""
def __init__(self, agent):
self.agent = agent
self.traces = []
def trace_execution(self, task: str):
"""跟踪执行过程"""
import time
trace = {
"task": task,
"steps": [],
"start_time": time.time()
}
# 劫持Agent的执行
original_run = self.agent.run
def traced_run(*args, **kwargs):
step_start = time.time()
result = original_run(*args, **kwargs)
trace["steps"].append({
"input": args[0] if args else kwargs,
"output": result,
"duration": time.time() - step_start
})
return result
self.agent.run = traced_run
result = self.agent.run(task)
self.agent.run = original_run
trace["end_time"] = time.time()
trace["total_duration"] = trace["end_time"] - trace["start_time"]
trace["result"] = result
self.traces.append(trace)
return trace
def analyze_performance(self):
"""性能分析"""
if not self.traces:
return "无执行记录"
total_time = sum(t["total_duration"] for t in self.traces)
avg_time = total_time / len(self.traces)
# 找出最慢的步骤
slowest_steps = []
for trace in self.traces:
for step in trace["steps"]:
if step["duration"] > avg_time * 0.5:
slowest_steps.append({
"task": trace["task"],
"step": step["input"],
"duration": step["duration"]
})
return {
"总执行次数": len(self.traces),
"平均耗时": f"{avg_time:.2f}秒",
"最慢步骤": slowest_steps[:5]
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)