OpenManus代码详解(六): 代码调用流程
当代理执行完成(或达到最大步数),控制权返回到 main.py 的循环,等待下一个用户输入。
·

往期推荐:
1. OpenManus架构解析
2. OpenManus代码详解(一):app/agent
3. OpenManus代码详解(二):app/flow
4. OpenManus代码详解(三):app/tool
5. OpenManus代码详解(四):app/schema.py
6. OpenManus代码详解(五):app/tool/planning.py
7. OpenManus代码详解(六):代码调用流程
官方地址:https://github.com/mannaandpoem/OpenManus
1. 程序入口与初始化 (main.py)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main()):- 创建新的事件循环
- 运行 main() 协程直到完成
- 关闭事件循环
2. 主函数执行 (main.py)
async def main():
# 初始化代理
agent = Manus()
# 进入交互循环
while True:
# ... 用户输入处理逻辑
2.1. 代理初始化详解
当执行 agent = Manus() 时,系统按照继承链依次初始化:
Manus.__init__() → ToolCallAgent.__init__() → ReActAgent.__init__() → BaseAgent.__init__()
具体初始化流程:
-
BaseAgent 初始化:
# 基础属性设置 self.name = "manus" # 来自 Manus 类定义 self.description = "a general-purpose AI assistant" self.system_prompt = "你是一个助手..." self.llm = LLM() self.memory = Memory() self.state = AgentState.IDLE self.max_steps = 10 self.current_step = 0 -
BaseAgent.initialize_agent 验证器:
@model_validator(mode="after") def initialize_agent(self) -> "BaseAgent": # 确保 LLM 实例正确初始化 if self.llm is None or not isinstance(self.llm, LLM): self.llm = LLM(config_name=self.name.lower()) # 确保 Memory 实例正确初始化 if not isinstance(self.memory, Memory): self.memory = Memory() return self -
ReActAgent 初始化:
# 初始化反应系统组件 self.last_tool_calls = [] -
ToolCallAgent 初始化:
# 初始化工具系统 self.tool_calls = [] self.available_tools = ToolCollection(...) self.tool_choices = None -
Manus 特定初始化:
# 初始化特定工具集 self.available_tools = ToolCollection( CreateChatCompletion(), Terminate(), # ... 更多工具 )
3. 用户输入处理 (main.py)
prompt = input("Enter your prompt (or 'exit'/'quit' to quit): ")
prompt_lower = prompt.lower()
# 处理退出命令
if prompt_lower in ["exit", "quit"]:
logger.info("Goodbye!")
break
# 处理空输入
if not prompt.strip():
logger.warning("Skipping empty prompt.")
continue
4. 执行代理任务
await agent.run(prompt)
4.1. BaseAgent.run 方法详解
async def run(self, request: Optional[str] = None) -> str:
# 1. 状态检查
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
# 2. 处理用户请求(核心部分)
if request:
self.update_memory("user", request)
# 3. 初始化结果列表
results: List[str] = []
# 4. 设置运行状态(使用上下文管理器)
async with self.state_context(AgentState.RUNNING):
# 5. 主执行循环
while (self.current_step < self.max_steps and
self.state != AgentState.FINISHED):
# 5.1 步骤计数更新
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
# 5.2 执行单步(关键调用)
step_result = await self.step()
# 5.3 检查是否陷入循环
if self.is_stuck():
self.handle_stuck_state()
# 5.4 记录步骤结果
results.append(f"Step {self.current_step}: {step_result}")
# 6. 达到最大步数处理
if self.current_step >= self.max_steps:
self.current_step = 0 # 重置步数计数器
self.state = AgentState.IDLE # 恢复空闲状态
results.append(f"Terminated: Reached max steps ({self.max_steps})")
# 7. 返回执行结果
return "\n".join(results) if results else "No steps executed"
4.1.1. update_memory 方法详解
def update_memory(self, role: Literal["user", "system", "assistant", "tool"], content: str, **kwargs) -> None:
# 1. 定义消息创建函数映射
message_map = {
"user": Message.user_message,
"system": Message.system_message,
"assistant": Message.assistant_message,
"tool": lambda content, **kw: Message.tool_message(content, **kw),
}
# 2. 验证角色类型
if role not in message_map:
raise ValueError(f"Unsupported message role: {role}")
# 3. 获取消息创建函数
msg_factory = message_map[role]
# 4. 创建消息对象(根据角色区分处理方式)
msg = msg_factory(content, **kwargs) if role == "tool" else msg_factory(content)
# 5. 将消息添加到内存
self.memory.add_message(msg)
4.1.2. 状态上下文管理器
@asynccontextmanager
async def state_context(self, new_state: AgentState):
# 1. 验证状态类型
if not isinstance(new_state, AgentState):
raise ValueError(f"Invalid state: {new_state}")
# 2. 保存之前的状态并设置新状态
previous_state = self.state
self.state = new_state
try:
# 3. 让出控制权,继续执行上下文中的代码
yield
except Exception as e:
# 4. 发生异常时,将状态设为错误
self.state = AgentState.ERROR
raise e
finally:
# 5. 无论如何,最终恢复之前的状态
self.state = previous_state
5. 单步执行 (ReActAgent.step)
async def step(self) -> str:
# 1. 思考阶段 - 决定下一步行动
should_act = await self.think()
# 2. 执行阶段 - 基于思考结果执行操作
if should_act:
return await self.act()
# 3. 如果无需执行,返回信息
return "Thinking complete, no action needed"
5.1. 思考阶段 (ToolCallAgent.think)
async def think(self) -> bool:
# 1. 处理下一步提示
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
self.next_step_prompt = None # 清除已使用的提示
# 2. 准备系统消息
system_msgs = []
if self.system_prompt:
system_msgs = [Message.system_message(self.system_prompt)]
# 3. 准备工具参数
tool_params = self.available_tools.to_params()
# 4. 调用语言模型获取响应
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=system_msgs,
tools=tool_params,
tool_choice=self.tool_choices,
)
# 5. 处理工具调用
self.tool_calls = response.tool_calls
# 6. 创建并存储助手消息
assistant_msg = Message.from_tool_calls(
content=response.content,
tool_calls=self.tool_calls
)
self.memory.add_message(assistant_msg)
# 7. 返回是否有工具需要执行
return bool(self.tool_calls)
5.1.1. LLM.ask_tool 方法
async def ask_tool(self, messages, system_msgs=None, tools=None, tool_choice=None):
# 1. 准备请求参数
params = {
"model": self.model,
"messages": self._prepare_messages(messages, system_msgs),
"tools": tools,
"tool_choice": tool_choice,
}
# 2. 调用API获取响应
response = await self._post_request("/chat/completions", params)
# 3. 解析返回结果
content = response["choices"][0]["message"]["content"]
tool_calls = response["choices"][0]["message"].get("tool_calls", [])
# 4. 创建并返回响应对象
return ToolResponse(content=content, tool_calls=tool_calls)
5.2. 执行阶段 (ToolCallAgent.act)
async def act(self) -> str:
# 1. 检查工具调用
if not self.tool_calls:
if self.tool_choices == "required":
raise ValueError("Tool call was required but none was provided")
return self.messages[-1].content
# 2. 执行工具调用
results = []
for command in self.tool_calls:
# 2.1 执行单个工具
try:
result = await self.execute_tool(command)
# 2.2 创建工具消息
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name
)
# 2.3 更新内存
self.memory.add_message(tool_msg)
results.append(result)
except Exception as e:
error_msg = f"Error executing tool {command.function.name}: {str(e)}"
logger.error(error_msg)
results.append(error_msg)
# 3. 合并结果
return "\n\n".join(results)
5.2.1. 工具执行详解
async def execute_tool(self, command: ToolCall) -> str:
# 1. 获取工具名称
name = command.function.name
# 2. 验证工具存在
if not name in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
try:
# 3. 解析工具参数
args = json.loads(command.function.arguments or "{}")
# 4. 执行工具调用
result = await self.available_tools.execute(
name=name,
tool_input=args
)
# 5. 处理特殊工具
await self._handle_special_tool(name=name, result=result)
# 6. 返回执行结果
return f"Observed output of cmd `{name}` executed:\n{str(result)}"
except Exception as e:
# 7. 处理执行错误
logger.error(f"Error executing tool {name}: {str(e)}")
return f"Error executing {name}: {str(e)}"
5.2.2. 特殊工具处理
async def _handle_special_tool(self, name: str, result: Any) -> None:
# 处理终止工具
if name.lower() == "terminate":
self.state = AgentState.FINISHED
logger.info("Agent terminated by tool call")
6. 循环检测与处理
def is_stuck(self) -> bool:
# 1. 检查消息数量
if len(self.memory.messages) < 2:
return False
# 2. 获取最新消息
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# 3. 计算重复次数
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
# 4. 判断是否超过阈值
return duplicate_count >= self.duplicate_threshold
def handle_stuck_state(self):
# 1. 创建提示词鼓励改变策略
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
# 2. 更新下一步提示词
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
# 3. 记录警告日志
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
7. 执行完成
当代理执行完成(或达到最大步数),控制权返回到 main.py 的循环,等待下一个用户输入。
8. 消息流转全流程
- 用户输入 → main.py
- main.py → BaseAgent.run(prompt)
- BaseAgent.run → update_memory(“user”, prompt)
- run → ReActAgent.step()
- step → ToolCallAgent.think()
- think → LLM.ask_tool()
- think → update_memory(“assistant”, response)
- step → ToolCallAgent.act()
- act → execute_tool(command)
- act → update_memory(“tool”, result)
- 返回结果 → BaseAgent.run
- 返回到 main.py → 下一次循环
9. 错误处理机制
-
用户输入错误:
try: # 处理用户输入 except KeyboardInterrupt: logger.warning("Goodbye!") break -
运行时错误:
async with self.state_context(AgentState.RUNNING): # 如果发生异常,状态将设为 ERROR -
工具执行错误:
try: result = await self.execute_tool(command) except Exception as e: error_msg = f"Error executing tool: {str(e)}" results.append(error_msg)
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)