DeerFlow多智能体项目分析-Agent模块实现源码分析
toolresearch_topic: Annotated[str, "要移交的研究任务主题"],locale: Annotated[str, "用户检测到的语言区域设置"],):"""移交给规划器智能体进行计划制定"""return@toollocale: Annotated[str, "用户检测到的语言区域设置"],):"""澄清轮次完成后移交给规划器"""return专业化智能体系统:每个智
·
DeerFlow Agent模块实现逻辑代码分析
概述
DeerFlow的Agent模块是基于LangGraph构建的多智能体协作系统,采用状态驱动的工作流架构。该模块实现了协调器、规划器、研究员、编程员、报告员等多个专业化智能体,通过统一的工厂模式创建和管理。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
1. 核心架构设计
1.1 Agent工厂模式
def create_agent(
agent_name: str,
agent_type: str,
tools: list,
prompt_template: str,
pre_model_hook: callable = None,
):
"""工厂函数,创建具有一致配置的agents"""
return create_react_agent(
name=agent_name,
model=get_llm_by_type(AGENT_LLM_MAP[agent_type]),
tools=tools,
prompt=lambda state: apply_prompt_template(prompt_template, state),
pre_model_hook=pre_model_hook,
)
1.2 Agent-LLM映射配置
AGENT_LLM_MAP: dict[str, LLMType] = {
"coordinator": "basic", # 协调器 - 用户交互管理
"planner": "basic", # 规划器 - 研究计划生成
"researcher": "basic", # 研究员 - 信息搜索与分析
"coder": "basic", # 编程员 - 代码分析与执行
"reporter": "basic", # 报告员 - 最终报告生成
"podcast_script_writer": "basic", # 播客脚本编写
"ppt_composer": "basic", # PPT制作
"prose_writer": "basic", # 散文写作
"prompt_enhancer": "basic", # 提示词增强
}
2. 核心节点实现
2.1 协调器节点 (Coordinator Node)
协调器是系统的入口点,负责用户交互管理和工作流路由:
def coordinator_node(state: State, config: RunnableConfig):
"""协调器节点,处理用户交互和澄清功能"""
enable_clarification = state.get("enable_clarification", False)
# 传统模式:澄清功能禁用
if not enable_clarification:
messages = apply_prompt_template("coordinator", state)
messages.append({
"role": "system",
"content": "澄清功能已禁用。必须立即调用handoff_to_planner工具。"
})
tools = [handoff_to_planner]
# 澄清模式:支持多轮对话
else:
clarification_rounds = state.get("clarification_rounds", 0)
max_clarification_rounds = state.get("max_clarification_rounds", 3)
if clarification_rounds == 0:
# 首轮澄清
messages.append({
"role": "system",
"content": "澄清模式已启用。遵循澄清过程指导原则。"
})
elif clarification_rounds > 0:
# 继续澄清对话
clarification_history = state.get("clarification_history", [])
# 添加用户响应到澄清历史
last_message = state.get("messages", [])[-1] if state.get("messages") else None
if last_message and hasattr(last_message, "content"):
clarification_history.append(last_message.content)
tools = [handoff_to_planner, handoff_after_clarification]
澄清功能工具定义
@tool
def handoff_to_planner(
research_topic: Annotated[str, "要移交的研究任务主题"],
locale: Annotated[str, "用户检测到的语言区域设置"],
):
"""移交给规划器智能体进行计划制定"""
return
@tool
def handoff_after_clarification(
locale: Annotated[str, "用户检测到的语言区域设置"],
):
"""澄清轮次完成后移交给规划器"""
return
2.2 规划器节点 (Planner Node)
规划器负责生成和管理研究计划:
def planner_node(state: State, config: RunnableConfig):
"""规划器节点,生成完整的研究计划"""
configurable = Configuration.from_runnable_config(config)
plan_iterations = state.get("plan_iterations", 0)
# 澄清模式:仅使用澄清后的问题
if state.get("enable_clarification") and state.get("clarified_question"):
clean_state = {
"messages": [{"role": "user", "content": state["clarified_question"]}],
"locale": state.get("locale", "en-US"),
"research_topic": state["clarified_question"],
}
messages = apply_prompt_template("planner", clean_state, configurable)
else:
# 正常模式:使用完整对话历史
messages = apply_prompt_template("planner", state, configurable)
# 背景调查结果集成
if state.get("enable_background_investigation") and state.get("background_investigation_results"):
messages += [{
"role": "user",
"content": f"背景调查结果:\n{state['background_investigation_results']}\n"
}]
# LLM选择策略
if configurable.enable_deep_thinking:
llm = get_llm_by_type("reasoning")
elif AGENT_LLM_MAP["planner"] == "basic":
llm = get_llm_by_type("basic").with_structured_output(Plan, method="json_mode")
else:
llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
# 计划迭代限制检查
if plan_iterations >= configurable.max_plan_iterations:
return Command(goto="reporter")
2.3 背景调查节点
def background_investigation_node(state: State, config: RunnableConfig):
"""背景调查节点,进行初步信息收集"""
configurable = Configuration.from_runnable_config(config)
query = state.get("research_topic")
if SELECTED_SEARCH_ENGINE == SearchEngine.TAVILY.value:
searched_content = LoggedTavilySearch(
max_results=configurable.max_search_results
).invoke(query)
if isinstance(searched_content, list):
background_investigation_results = [
f"## {elem['title']}\n\n{elem['content']}"
for elem in searched_content
]
return {
"background_investigation_results": "\n\n".join(background_investigation_results)
}
else:
background_investigation_results = get_web_search_tool(
configurable.max_search_results
).invoke(query)
return {
"background_investigation_results": json.dumps(
background_investigation_results, ensure_ascii=False
)
}
2.4 人机协作节点
def human_feedback_node(state):
"""人机协作节点,处理计划审核和反馈"""
current_plan = state.get("current_plan", "")
auto_accepted_plan = state.get("auto_accepted_plan", False)
if not auto_accepted_plan:
feedback = interrupt("请审核计划。")
# 处理编辑计划反馈
if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
return Command(
update={
"messages": [HumanMessage(content=feedback, name="feedback")]
},
goto="planner",
)
# 处理接受计划反馈
elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
logger.info("用户接受了计划。")
else:
raise TypeError(f"不支持的中断值:{feedback}")
# 计划解析和验证
plan_iterations = state.get("plan_iterations", 0) + 1
try:
current_plan = repair_json_output(current_plan)
new_plan = json.loads(current_plan)
except json.JSONDecodeError:
logger.warning("规划器响应不是有效的JSON")
if plan_iterations > 1:
return Command(goto="reporter")
else:
return Command(goto="__end__")
return Command(
update={
"current_plan": Plan.model_validate(new_plan),
"plan_iterations": plan_iterations,
"locale": new_plan["locale"],
},
goto="research_team",
)
3. 研究团队节点架构
3.1 动态工具配置系统
async def _setup_and_execute_agent_step(
state: State,
config: RunnableConfig,
agent_type: str,
default_tools: list,
):
"""设置智能体工具并执行步骤的辅助函数"""
configurable = Configuration.from_runnable_config(config)
mcp_servers = {}
enabled_tools = {}
# 提取MCP服务器配置
if configurable.mcp_settings:
for server_name, server_config in configurable.mcp_settings["servers"].items():
if (server_config["enabled_tools"] and
agent_type in server_config["add_to_agents"]):
mcp_servers[server_name] = {
k: v for k, v in server_config.items()
if k in ("transport", "command", "args", "url", "env", "headers")
}
for tool_name in server_config["enabled_tools"]:
enabled_tools[tool_name] = server_name
# 创建并执行带有MCP工具的智能体
if mcp_servers:
client = MultiServerMCPClient(mcp_servers)
loaded_tools = default_tools[:]
all_tools = await client.get_tools()
for tool in all_tools:
if tool.name in enabled_tools:
tool.description = f"由'{enabled_tools[tool.name]}'提供支持。\n{tool.description}"
loaded_tools.append(tool)
# 上下文压缩钩子
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type])
pre_model_hook = partial(ContextManager(llm_token_limit, 3).compress_messages)
agent = create_agent(agent_type, agent_type, loaded_tools, agent_type, pre_model_hook)
return await _execute_agent_step(state, agent, agent_type)
3.2 研究员节点
async def researcher_node(state: State, config: RunnableConfig):
"""研究员节点,执行信息搜索和分析"""
configurable = Configuration.from_runnable_config(config)
# 默认工具配置
tools = [
get_web_search_tool(configurable.max_search_results),
crawl_tool
]
# 检索工具集成
retriever_tool = get_retriever_tool(state.get("resources", []))
if retriever_tool:
tools.insert(0, retriever_tool)
return await _setup_and_execute_agent_step(state, config, "researcher", tools)
3.3 编程员节点
async def coder_node(state: State, config: RunnableConfig):
"""编程员节点,执行代码分析和处理"""
return await _setup_and_execute_agent_step(
state, config, "coder", [python_repl_tool]
)
3.4 步骤执行引擎
async def _execute_agent_step(state: State, agent, agent_name: str):
"""执行智能体步骤的辅助函数"""
current_plan = state.get("current_plan")
observations = state.get("observations", [])
# 查找第一个未执行的步骤
current_step = None
completed_steps = []
for step in current_plan.steps:
if not step.execution_res:
current_step = step
break
else:
completed_steps.append(step)
if not current_step:
return Command(goto="research_team")
# 格式化已完成步骤信息
completed_steps_info = ""
if completed_steps:
completed_steps_info = "# 已完成的研究步骤\n\n"
for i, step in enumerate(completed_steps):
completed_steps_info += f"## 已完成步骤 {i + 1}: {step.title}\n\n"
completed_steps_info += f"<finding>\n{step.execution_res}\n</finding>\n\n"
# 准备智能体输入
agent_input = {
"messages": [HumanMessage(
content=f"# 研究主题\n\n{current_plan.title}\n\n"
f"{completed_steps_info}"
f"# 当前步骤\n\n## 标题\n\n{current_step.title}\n\n"
f"## 描述\n\n{current_step.description}\n\n"
f"## 语言区域\n\n{state.get('locale', 'en-US')}"
)]
}
# 递归限制配置
recursion_limit = int(os.getenv("AGENT_RECURSION_LIMIT", "25"))
# 执行智能体
result = await agent.ainvoke(
input=agent_input,
config={"recursion_limit": recursion_limit}
)
# 处理结果
response_content = result["messages"][-1].content
current_step.execution_res = response_content
return Command(
update={
"messages": [HumanMessage(content=response_content, name=agent_name)],
"observations": observations + [response_content],
},
goto="research_team",
)
4. 报告员节点
def reporter_node(state: State, config: RunnableConfig):
"""报告员节点,生成最终研究报告"""
configurable = Configuration.from_runnable_config(config)
current_plan = state.get("current_plan")
# 构建报告输入
input_ = {
"messages": [HumanMessage(
f"# 研究要求\n\n## 任务\n\n{current_plan.title}\n\n"
f"## 描述\n\n{current_plan.thought}"
)],
"locale": state.get("locale", "en-US"),
}
invoke_messages = apply_prompt_template("reporter", input_, configurable)
observations = state.get("observations", [])
# 添加格式和引用提醒
invoke_messages.append(HumanMessage(
content="重要提示:按照提示中的格式构建报告。记住包含:\n\n"
"1. 关键要点 - 最重要发现的要点列表\n"
"2. 概述 - 主题的简要介绍\n"
"3. 详细分析 - 组织成逻辑部分\n"
"4. 调查说明(可选)- 用于更全面的报告\n"
"5. 关键引用 - 在末尾列出所有参考文献\n\n"
"优先使用MARKDOWN表格进行数据展示和比较。",
name="system",
))
# 观察消息处理
observation_messages = [
HumanMessage(
content=f"以下是研究任务的一些观察结果:\n\n{observation}",
name="observation",
)
for observation in observations
]
# 上下文压缩
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP["reporter"])
compressed_state = ContextManager(llm_token_limit).compress_messages(
{"messages": observation_messages}
)
invoke_messages += compressed_state.get("messages", [])
# 生成报告
response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
return {"final_report": response.content}
5. 工作流控制系统
5.1 状态驱动路由
def continue_to_running_research_team(state: State):
"""决定研究团队的下一步行动"""
current_plan = state.get("current_plan")
if not current_plan or not current_plan.steps:
return "planner"
# 检查是否所有步骤都已完成
if all(step.execution_res for step in current_plan.steps):
return "planner"
# 查找第一个未完成的步骤
for step in current_plan.steps:
if not step.execution_res:
if step.step_type == StepType.RESEARCH:
return "researcher"
if step.step_type == StepType.PROCESSING:
return "coder"
return "planner"
5.2 澄清功能控制
def needs_clarification(state: dict) -> bool:
"""检查是否需要澄清"""
if not state.get("enable_clarification", False):
return False
clarification_rounds = state.get("clarification_rounds", 0)
is_clarification_complete = state.get("is_clarification_complete", False)
max_clarification_rounds = state.get("max_clarification_rounds", 3)
return (
clarification_rounds > 0
and not is_clarification_complete
and clarification_rounds <= max_clarification_rounds
)
6. 图构建器
def _build_base_graph():
"""构建并返回包含所有节点和边的基础状态图"""
builder = StateGraph(State)
# 添加节点
builder.add_edge(START, "coordinator")
builder.add_node("coordinator", coordinator_node)
builder.add_node("background_investigator", background_investigation_node)
builder.add_node("planner", planner_node)
builder.add_node("reporter", reporter_node)
builder.add_node("research_team", research_team_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("human_feedback", human_feedback_node)
# 添加边
builder.add_edge("background_investigator", "planner")
builder.add_edge("reporter", END)
# 条件边
builder.add_conditional_edges(
"research_team",
continue_to_running_research_team,
["planner", "researcher", "coder"],
)
builder.add_conditional_edges(
"coordinator",
lambda state: state.get("goto", "planner"),
["planner", "background_investigator", "coordinator", END],
)
return builder
def build_graph():
"""构建并返回不带内存的智能体工作流图"""
builder = _build_base_graph()
return builder.compile()
7. 架构优势
7.1 模块化设计
- 职责分离:每个智能体专注于特定任务
- 松耦合:节点间通过状态通信,易于维护
- 可扩展性:新增智能体只需添加节点和配置
7.2 状态驱动工作流
- 智能路由:基于当前状态自动决定下一步
- 条件分支:支持复杂的业务逻辑
- 错误恢复:具备容错和重试机制
7.3 工具集成能力
- MCP协议支持:无缝集成外部工具和服务
- 动态工具加载:运行时配置工具集合
- 工具描述增强:自动标注工具来源
7.4 人机协作
- 计划审核:支持人工审核和修改研究计划
- 澄清对话:多轮对话澄清模糊需求
- 中断恢复:支持工作流中断和恢复
8. 使用示例
# 创建基础智能体
agent = create_agent(
agent_name="researcher",
agent_type="researcher",
tools=[search_tool, crawl_tool],
prompt_template="researcher",
pre_model_hook=context_compressor
)
# 构建工作流图
graph = build_graph()
# 执行工作流
result = await graph.ainvoke({
"messages": [HumanMessage(content="研究人工智能的发展趋势")],
"research_topic": "人工智能发展趋势",
"locale": "zh-CN"
})
总结
DeerFlow的Agent模块通过精心设计的架构实现了:
- 专业化智能体系统:每个智能体专注特定领域
- 灵活的工作流控制:状态驱动的智能路由
- 强大的工具集成:MCP协议和动态工具加载
- 优秀的人机协作:澄清对话和计划审核
- 高度可扩展性:模块化设计易于扩展
这种架构使得DeerFlow能够处理复杂的研究任务,同时保持系统的灵活性和可维护性。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)