【LLM实操系列07】Agent开发:构建自主AI智能体

在开始之前,建议先完成第04篇(理解ReAct概念)和第03篇(API调用)。你需要理解工具调用和思考-行动-观察循环的基本概念,并安装langchain及相关工具库。

10分钟实现第一个Agent

什么是Agent?

Agent = LLM + 工具 + 记忆 + 规划

最简Agent实现

# pip install langchain openai wikipedia duckduckgo-search

from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain.chat_models import ChatOpenAI
from langchain.tools import DuckDuckGoSearchRun

# 1. 定义工具
search = DuckDuckGoSearchRun()

tools = [
    Tool(
        name="Search",
        func=search.run,
        description="搜索引擎,用于查找实时信息"
    )
]

# 2. 创建Agent
llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 3. 执行任务
result = agent.run("今天上海的天气如何?")
print(result)

ReAct框架:思考→行动→观察

基于第04篇的ReAct完整实现

# 注:第04篇介绍了ReAct的概念,这里实现完整的工具调用

class ReActAgent:
    """ReAct模式Agent实现(完整版)"""

    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = 5

    def run(self, task: str):
        """执行任务"""
        prompt = f"""
任务:{task}

可用工具:
{self._format_tools()}

使用以下格式:
Thought: 我需要思考下一步做什么
Action: 工具名称
Action Input: 工具输入参数
Observation: 工具返回结果
... (重复N次)
Thought: 我知道最终答案了
Final Answer: 最终答案

开始:
"""

        for i in range(self.max_iterations):
            # 获取LLM输出
            response = self.llm.predict(prompt)

            # 解析行动
            action = self._parse_action(response)

            if action["type"] == "Final Answer":
                return action["content"]

            # 执行工具
            tool = self.tools[action["tool"]]
            observation = tool.func(action["input"])

            # 添加观察结果
            prompt += f"""
Thought: {action["thought"]}
Action: {action["tool"]}
Action Input: {action["input"]}
Observation: {observation}
"""

        return "达到最大迭代次数"

    def _parse_action(self, text: str):
        """解析LLM输出"""
        if "Final Answer:" in text:
            return {
                "type": "Final Answer",
                "content": text.split("Final Answer:")[-1].strip()
            }

        # 提取Action和Input
        lines = text.strip().split('\n')
        thought = ""
        action = ""
        action_input = ""

        for line in lines:
            if line.startswith("Thought:"):
                thought = line[8:].strip()
            elif line.startswith("Action:"):
                action = line[7:].strip()
            elif line.startswith("Action Input:"):
                action_input = line[13:].strip()

        return {
            "type": "Action",
            "thought": thought,
            "tool": action,
            "input": action_input
        }

工具开发与集成

1. 自定义工具

from typing import Optional, Type
from pydantic import BaseModel, Field
from langchain.tools import BaseTool
import requests

class WeatherInput(BaseModel):
    """天气查询输入"""
    city: str = Field(description="城市名称")

class WeatherTool(BaseTool):
    """天气查询工具"""
    name = "weather"
    description = "查询指定城市的天气"
    args_schema: Type[BaseModel] = WeatherInput

    def _run(self, city: str) -> str:
        """同步执行"""
        # 实际调用天气API
        api_key = "YOUR_API_KEY"
        url = f"http://api.weatherapi.com/v1/current.json?key={api_key}&q={city}"
        response = requests.get(url)
        data = response.json()

        return f"{city}天气:{data['current']['condition']['text']},温度:{data['current']['temp_c']}°C"

    async def _arun(self, city: str) -> str:
        """异步执行"""
        # 异步版本实现
        pass

# 代码执行工具
class CodeExecutor(BaseTool):
    """Python代码执行工具"""
    name = "python"
    description = "执行Python代码并返回结果"

    def _run(self, code: str) -> str:
        """安全执行代码"""
        import sys
        from io import StringIO

        # 捕获输出
        old_stdout = sys.stdout
        sys.stdout = StringIO()

        try:
            # 限制执行环境
            exec(code, {"__builtins__": {
                "print": print,
                "len": len,
                "range": range,
                "str": str,
                "int": int,
                "float": float,
            }})
            output = sys.stdout.getvalue()
        except Exception as e:
            output = f"错误:{e}"
        finally:
            sys.stdout = old_stdout

        return output

2. 工具编排

class ToolOrchestrator:
    """工具编排器"""

    def __init__(self):
        self.tools = {}

    def register_tool(self, tool: BaseTool):
        """注册工具"""
        self.tools[tool.name] = tool

    def create_toolkit(self, task_type: str):
        """根据任务类型选择工具集"""
        toolkits = {
            "research": ["search", "wikipedia", "arxiv"],
            "coding": ["python", "shell", "file_ops"],
            "analysis": ["calculator", "pandas", "plot"],
            "communication": ["email", "slack", "calendar"]
        }

        selected_tools = []
        for tool_name in toolkits.get(task_type, []):
            if tool_name in self.tools:
                selected_tools.append(self.tools[tool_name])

        return selected_tools

Function Calling(OpenAI风格)

实现Function Calling

import json
from typing import List, Dict, Any

class FunctionCallingAgent:
    """支持Function Calling的Agent"""

    def __init__(self, model="gpt-3.5-turbo"):
        self.model = model
        self.functions = []

    def register_function(self, func, description: str, parameters: dict):
        """注册函数"""
        self.functions.append({
            "name": func.__name__,
            "description": description,
            "parameters": parameters,
            "func": func
        })

    def run(self, messages: List[Dict]):
        """执行对话"""
        import openai

        # 准备函数定义
        functions_def = [
            {
                "name": f["name"],
                "description": f["description"],
                "parameters": f["parameters"]
            }
            for f in self.functions
        ]

        # 调用OpenAI API
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=messages,
            functions=functions_def,
            function_call="auto"
        )

        message = response.choices[0].message

        # 检查是否需要调用函数
        if message.get("function_call"):
            function_name = message["function_call"]["name"]
            function_args = json.loads(message["function_call"]["arguments"])

            # 执行函数
            func = next(f["func"] for f in self.functions
                       if f["name"] == function_name)
            result = func(**function_args)

            # 将结果返回给模型
            messages.append(message)
            messages.append({
                "role": "function",
                "name": function_name,
                "content": str(result)
            })

            # 获取最终回答
            final_response = openai.ChatCompletion.create(
                model=self.model,
                messages=messages
            )

            return final_response.choices[0].message["content"]

        return message["content"]

# 使用示例
agent = FunctionCallingAgent()

# 注册函数
def get_weather(location: str, unit: str = "celsius"):
    """获取天气信息"""
    return f"{location}的温度是25°{unit[0].upper()}"

agent.register_function(
    get_weather,
    "获取指定地点的天气",
    {
        "type": "object",
        "properties": {
            "location": {"type": "string", "description": "城市名"},
            "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
        },
        "required": ["location"]
    }
)

# 执行
result = agent.run([
    {"role": "user", "content": "北京今天天气怎么样?"}
])

多Agent协作系统

1. Agent角色定义

class SpecializedAgent:
    """专业Agent基类"""

    def __init__(self, name: str, role: str, skills: List[str]):
        self.name = name
        self.role = role
        self.skills = skills
        self.llm = ChatOpenAI(temperature=0.7)

    def can_handle(self, task: str) -> bool:
        """判断是否能处理任务"""
        prompt = f"""
作为{self.role},我的技能包括:{', '.join(self.skills)}
判断我是否适合处理以下任务:{task}
回答:是/否
"""
        response = self.llm.predict(prompt)
        return "是" in response

    def process(self, task: str) -> str:
        """处理任务"""
        prompt = f"""
角色:{self.role}
技能:{', '.join(self.skills)}
任务:{task}

请完成任务:
"""
        return self.llm.predict(prompt)

# 创建专业Agent
researcher = SpecializedAgent(
    "研究员",
    "信息研究专家",
    ["搜索", "文献分析", "数据收集"]
)

coder = SpecializedAgent(
    "程序员",
    "软件开发专家",
    ["Python", "调试", "代码优化"]
)

analyst = SpecializedAgent(
    "分析师",
    "数据分析专家",
    ["统计分析", "可视化", "报告撰写"]
)

2. 协作框架

class MultiAgentSystem:
    """多Agent协作系统"""

    def __init__(self):
        self.agents = []
        self.coordinator = ChatOpenAI(temperature=0)
        self.conversation_history = []

    def add_agent(self, agent: SpecializedAgent):
        """添加Agent"""
        self.agents.append(agent)

    def execute(self, task: str):
        """执行复杂任务"""
        # 1. 任务分解
        subtasks = self._decompose_task(task)

        results = {}
        for subtask in subtasks:
            # 2. 分配Agent
            agent = self._assign_agent(subtask)

            if agent:
                # 3. 执行子任务
                result = agent.process(subtask)
                results[subtask] = {
                    "agent": agent.name,
                    "result": result
                }

                # 4. 记录对话
                self.conversation_history.append({
                    "agent": agent.name,
                    "task": subtask,
                    "result": result
                })

        # 5. 整合结果
        return self._synthesize_results(task, results)

    def _decompose_task(self, task: str) -> List[str]:
        """任务分解"""
        prompt = f"""
将以下复杂任务分解为3-5个子任务:
任务:{task}

输出格式:
1. 子任务1
2. 子任务2
...
"""
        response = self.coordinator.predict(prompt)
        subtasks = []
        for line in response.split('\n'):
            if line.strip() and line[0].isdigit():
                subtasks.append(line.split('.', 1)[1].strip())
        return subtasks

    def _assign_agent(self, subtask: str) -> SpecializedAgent:
        """分配最合适的Agent"""
        for agent in self.agents:
            if agent.can_handle(subtask):
                return agent
        return None

    def _synthesize_results(self, task: str, results: Dict):
        """整合结果"""
        prompt = f"""
原始任务:{task}

各Agent完成情况:
{json.dumps(results, ensure_ascii=False, indent=2)}

请整合以上结果,给出最终答案:
"""
        return self.coordinator.predict(prompt)

生产级Agent实战

完整的客服Agent系统

class CustomerServiceAgent:
    """客服Agent系统"""

    def __init__(self):
        self.llm = ChatOpenAI(temperature=0.3)
        self.memory = ConversationBufferWindowMemory(k=5)
        self.knowledge_base = None  # RAG系统
        self.tools = self._init_tools()

    def _init_tools(self):
        """初始化工具集"""
        return [
            Tool(name="查询订单", func=self._query_order),
            Tool(name="退款处理", func=self._process_refund),
            Tool(name="人工转接", func=self._transfer_human),
            Tool(name="知识库", func=self._search_kb),
        ]

    def handle_customer(self, message: str, customer_id: str):
        """处理客户消息"""
        # 1. 意图识别
        intent = self._identify_intent(message)

        # 2. 情绪检测
        emotion = self._detect_emotion(message)

        # 3. 获取客户历史
        history = self._get_customer_history(customer_id)

        # 4. 构建Agent
        agent = initialize_agent(
            self.tools,
            self.llm,
            agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
            memory=self.memory,
            verbose=True
        )

        # 5. 生成回复
        context = f"""
客户ID: {customer_id}
意图: {intent}
情绪: {emotion}
历史问题: {history[-3:] if history else '无'}

客户消息: {message}
"""

        response = agent.run(context)

        # 6. 记录交互
        self._log_interaction(customer_id, message, response)

        return response

    def _identify_intent(self, message: str) -> str:
        """意图识别"""
        intents = ["查询", "投诉", "退款", "咨询", "其他"]
        prompt = f"""
识别客户意图:
消息:{message}
可选意图:{', '.join(intents)}
输出意图:
"""
        return self.llm.predict(prompt).strip()

    def _detect_emotion(self, message: str) -> str:
        """情绪检测"""
        prompt = f"""
检测客户情绪(积极/中性/消极/愤怒):
消息:{message}
情绪:
"""
        return self.llm.predict(prompt).strip()

调试与优化

class AgentDebugger:
    """Agent调试工具"""

    def __init__(self, agent):
        self.agent = agent
        self.traces = []

    def trace_execution(self, task: str):
        """跟踪执行过程"""
        import time

        trace = {
            "task": task,
            "steps": [],
            "start_time": time.time()
        }

        # 劫持Agent的执行
        original_run = self.agent.run

        def traced_run(*args, **kwargs):
            step_start = time.time()
            result = original_run(*args, **kwargs)
            trace["steps"].append({
                "input": args[0] if args else kwargs,
                "output": result,
                "duration": time.time() - step_start
            })
            return result

        self.agent.run = traced_run
        result = self.agent.run(task)
        self.agent.run = original_run

        trace["end_time"] = time.time()
        trace["total_duration"] = trace["end_time"] - trace["start_time"]
        trace["result"] = result

        self.traces.append(trace)
        return trace

    def analyze_performance(self):
        """性能分析"""
        if not self.traces:
            return "无执行记录"

        total_time = sum(t["total_duration"] for t in self.traces)
        avg_time = total_time / len(self.traces)

        # 找出最慢的步骤
        slowest_steps = []
        for trace in self.traces:
            for step in trace["steps"]:
                if step["duration"] > avg_time * 0.5:
                    slowest_steps.append({
                        "task": trace["task"],
                        "step": step["input"],
                        "duration": step["duration"]
                    })

        return {
            "总执行次数": len(self.traces),
            "平均耗时": f"{avg_time:.2f}秒",
            "最慢步骤": slowest_steps[:5]
        }
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐