请添加图片描述

往期推荐:

1. OpenManus架构解析
2. OpenManus代码详解(一):app/agent
3. OpenManus代码详解(二):app/flow
4. OpenManus代码详解(三):app/tool
5. OpenManus代码详解(四):app/schema.py
6. OpenManus代码详解(五):app/tool/planning.py
7. OpenManus代码详解(六):代码调用流程

官方地址:https://github.com/mannaandpoem/OpenManus


1. 程序入口与初始化 (main.py)

if __name__ == "__main__":
    asyncio.run(main())
  • asyncio.run(main()):
    • 创建新的事件循环
    • 运行 main() 协程直到完成
    • 关闭事件循环

2. 主函数执行 (main.py)

async def main():
    # 初始化代理
    agent = Manus()
    
    # 进入交互循环
    while True:
        # ... 用户输入处理逻辑

2.1. 代理初始化详解

当执行 agent = Manus() 时,系统按照继承链依次初始化:

Manus.__init__() → ToolCallAgent.__init__() → ReActAgent.__init__() → BaseAgent.__init__()
具体初始化流程:
  1. BaseAgent 初始化:

    # 基础属性设置
    self.name = "manus"  # 来自 Manus 类定义
    self.description = "a general-purpose AI assistant"
    self.system_prompt = "你是一个助手..."
    self.llm = LLM()
    self.memory = Memory()
    self.state = AgentState.IDLE
    self.max_steps = 10
    self.current_step = 0
    
  2. BaseAgent.initialize_agent 验证器:

    @model_validator(mode="after")
    def initialize_agent(self) -> "BaseAgent":
        # 确保 LLM 实例正确初始化
        if self.llm is None or not isinstance(self.llm, LLM):
            self.llm = LLM(config_name=self.name.lower())
        # 确保 Memory 实例正确初始化
        if not isinstance(self.memory, Memory):
            self.memory = Memory()
        return self
    
  3. ReActAgent 初始化:

    # 初始化反应系统组件
    self.last_tool_calls = []
    
  4. ToolCallAgent 初始化:

    # 初始化工具系统
    self.tool_calls = []
    self.available_tools = ToolCollection(...)
    self.tool_choices = None
    
  5. Manus 特定初始化:

    # 初始化特定工具集
    self.available_tools = ToolCollection(
        CreateChatCompletion(),
        Terminate(),
        # ... 更多工具
    )
    

3. 用户输入处理 (main.py)

prompt = input("Enter your prompt (or 'exit'/'quit' to quit): ")
prompt_lower = prompt.lower()

# 处理退出命令
if prompt_lower in ["exit", "quit"]:
    logger.info("Goodbye!")
    break
    
# 处理空输入
if not prompt.strip():
    logger.warning("Skipping empty prompt.")
    continue

4. 执行代理任务

await agent.run(prompt)

4.1. BaseAgent.run 方法详解

async def run(self, request: Optional[str] = None) -> str:
    # 1. 状态检查
    if self.state != AgentState.IDLE:
        raise RuntimeError(f"Cannot run agent from state: {self.state}")

    # 2. 处理用户请求(核心部分)
    if request:
        self.update_memory("user", request)

    # 3. 初始化结果列表
    results: List[str] = []
    
    # 4. 设置运行状态(使用上下文管理器)
    async with self.state_context(AgentState.RUNNING):
        # 5. 主执行循环
        while (self.current_step < self.max_steps and 
               self.state != AgentState.FINISHED):
            
            # 5.1 步骤计数更新
            self.current_step += 1
            logger.info(f"Executing step {self.current_step}/{self.max_steps}")
            
            # 5.2 执行单步(关键调用)
            step_result = await self.step()

            # 5.3 检查是否陷入循环
            if self.is_stuck():
                self.handle_stuck_state()

            # 5.4 记录步骤结果
            results.append(f"Step {self.current_step}: {step_result}")

        # 6. 达到最大步数处理
        if self.current_step >= self.max_steps:
            self.current_step = 0  # 重置步数计数器
            self.state = AgentState.IDLE  # 恢复空闲状态
            results.append(f"Terminated: Reached max steps ({self.max_steps})")

    # 7. 返回执行结果
    return "\n".join(results) if results else "No steps executed"
4.1.1. update_memory 方法详解
def update_memory(self, role: Literal["user", "system", "assistant", "tool"], content: str, **kwargs) -> None:
    # 1. 定义消息创建函数映射
    message_map = {
        "user": Message.user_message,
        "system": Message.system_message,
        "assistant": Message.assistant_message,
        "tool": lambda content, **kw: Message.tool_message(content, **kw),
    }

    # 2. 验证角色类型
    if role not in message_map:
        raise ValueError(f"Unsupported message role: {role}")

    # 3. 获取消息创建函数
    msg_factory = message_map[role]
    
    # 4. 创建消息对象(根据角色区分处理方式)
    msg = msg_factory(content, **kwargs) if role == "tool" else msg_factory(content)
    
    # 5. 将消息添加到内存
    self.memory.add_message(msg)
4.1.2. 状态上下文管理器
@asynccontextmanager
async def state_context(self, new_state: AgentState):
    # 1. 验证状态类型
    if not isinstance(new_state, AgentState):
        raise ValueError(f"Invalid state: {new_state}")

    # 2. 保存之前的状态并设置新状态
    previous_state = self.state
    self.state = new_state
    
    try:
        # 3. 让出控制权,继续执行上下文中的代码
        yield
    except Exception as e:
        # 4. 发生异常时,将状态设为错误
        self.state = AgentState.ERROR
        raise e
    finally:
        # 5. 无论如何,最终恢复之前的状态
        self.state = previous_state

5. 单步执行 (ReActAgent.step)

async def step(self) -> str:
    # 1. 思考阶段 - 决定下一步行动
    should_act = await self.think()
    
    # 2. 执行阶段 - 基于思考结果执行操作
    if should_act:
        return await self.act()
    
    # 3. 如果无需执行,返回信息
    return "Thinking complete, no action needed"

5.1. 思考阶段 (ToolCallAgent.think)

async def think(self) -> bool:
    # 1. 处理下一步提示
    if self.next_step_prompt:
        user_msg = Message.user_message(self.next_step_prompt)
        self.messages += [user_msg]
        self.next_step_prompt = None  # 清除已使用的提示

    # 2. 准备系统消息
    system_msgs = []
    if self.system_prompt:
        system_msgs = [Message.system_message(self.system_prompt)]
    
    # 3. 准备工具参数
    tool_params = self.available_tools.to_params()
    
    # 4. 调用语言模型获取响应
    response = await self.llm.ask_tool(
        messages=self.messages,
        system_msgs=system_msgs,
        tools=tool_params,
        tool_choice=self.tool_choices,
    )
    
    # 5. 处理工具调用
    self.tool_calls = response.tool_calls
    
    # 6. 创建并存储助手消息
    assistant_msg = Message.from_tool_calls(
        content=response.content, 
        tool_calls=self.tool_calls
    )
    self.memory.add_message(assistant_msg)
    
    # 7. 返回是否有工具需要执行
    return bool(self.tool_calls)
5.1.1. LLM.ask_tool 方法
async def ask_tool(self, messages, system_msgs=None, tools=None, tool_choice=None):
    # 1. 准备请求参数
    params = {
        "model": self.model,
        "messages": self._prepare_messages(messages, system_msgs),
        "tools": tools,
        "tool_choice": tool_choice,
    }
    
    # 2. 调用API获取响应
    response = await self._post_request("/chat/completions", params)
    
    # 3. 解析返回结果
    content = response["choices"][0]["message"]["content"]
    tool_calls = response["choices"][0]["message"].get("tool_calls", [])
    
    # 4. 创建并返回响应对象
    return ToolResponse(content=content, tool_calls=tool_calls)

5.2. 执行阶段 (ToolCallAgent.act)

async def act(self) -> str:
    # 1. 检查工具调用
    if not self.tool_calls:
        if self.tool_choices == "required":
            raise ValueError("Tool call was required but none was provided")
        return self.messages[-1].content

    # 2. 执行工具调用
    results = []
    for command in self.tool_calls:
        # 2.1 执行单个工具
        try:
            result = await self.execute_tool(command)
            
            # 2.2 创建工具消息
            tool_msg = Message.tool_message(
                content=result,
                tool_call_id=command.id,
                name=command.function.name
            )
            
            # 2.3 更新内存
            self.memory.add_message(tool_msg)
            results.append(result)
            
        except Exception as e:
            error_msg = f"Error executing tool {command.function.name}: {str(e)}"
            logger.error(error_msg)
            results.append(error_msg)

    # 3. 合并结果
    return "\n\n".join(results)
5.2.1. 工具执行详解
async def execute_tool(self, command: ToolCall) -> str:
    # 1. 获取工具名称
    name = command.function.name
    
    # 2. 验证工具存在
    if not name in self.available_tools.tool_map:
        return f"Error: Unknown tool '{name}'"

    try:
        # 3. 解析工具参数
        args = json.loads(command.function.arguments or "{}")
        
        # 4. 执行工具调用
        result = await self.available_tools.execute(
            name=name,
            tool_input=args
        )
        
        # 5. 处理特殊工具
        await self._handle_special_tool(name=name, result=result)
        
        # 6. 返回执行结果
        return f"Observed output of cmd `{name}` executed:\n{str(result)}"
    except Exception as e:
        # 7. 处理执行错误
        logger.error(f"Error executing tool {name}: {str(e)}")
        return f"Error executing {name}: {str(e)}"
5.2.2. 特殊工具处理
async def _handle_special_tool(self, name: str, result: Any) -> None:
    # 处理终止工具
    if name.lower() == "terminate":
        self.state = AgentState.FINISHED
        logger.info("Agent terminated by tool call")

6. 循环检测与处理

def is_stuck(self) -> bool:
    # 1. 检查消息数量
    if len(self.memory.messages) < 2:
        return False

    # 2. 获取最新消息
    last_message = self.memory.messages[-1]
    if not last_message.content:
        return False

    # 3. 计算重复次数
    duplicate_count = sum(
        1
        for msg in reversed(self.memory.messages[:-1])
        if msg.role == "assistant" and msg.content == last_message.content
    )

    # 4. 判断是否超过阈值
    return duplicate_count >= self.duplicate_threshold

def handle_stuck_state(self):
    # 1. 创建提示词鼓励改变策略
    stuck_prompt = "\
    Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
    
    # 2. 更新下一步提示词
    self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
    
    # 3. 记录警告日志
    logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")

7. 执行完成

当代理执行完成(或达到最大步数),控制权返回到 main.py 的循环,等待下一个用户输入。

8. 消息流转全流程

  1. 用户输入 → main.py
  2. main.py → BaseAgent.run(prompt)
  3. BaseAgent.run → update_memory(“user”, prompt)
  4. run → ReActAgent.step()
  5. step → ToolCallAgent.think()
  6. think → LLM.ask_tool()
  7. think → update_memory(“assistant”, response)
  8. step → ToolCallAgent.act()
  9. act → execute_tool(command)
  10. act → update_memory(“tool”, result)
  11. 返回结果 → BaseAgent.run
  12. 返回到 main.py → 下一次循环

9. 错误处理机制

  1. 用户输入错误:

    try:
        # 处理用户输入
    except KeyboardInterrupt:
        logger.warning("Goodbye!")
        break
    
  2. 运行时错误:

    async with self.state_context(AgentState.RUNNING):
        # 如果发生异常,状态将设为 ERROR
    
  3. 工具执行错误:

    try:
        result = await self.execute_tool(command)
    except Exception as e:
        error_msg = f"Error executing tool: {str(e)}"
        results.append(error_msg)
    

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐