DeerFlow Agent模块实现逻辑代码分析

概述

DeerFlow的Agent模块是基于LangGraph构建的多智能体协作系统,采用状态驱动的工作流架构。该模块实现了协调器、规划器、研究员、编程员、报告员等多个专业化智能体,通过统一的工厂模式创建和管理。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/

1. 核心架构设计

1.1 Agent工厂模式

def create_agent(
    agent_name: str,
    agent_type: str,
    tools: list,
    prompt_template: str,
    pre_model_hook: callable = None,
):
    """工厂函数,创建具有一致配置的agents"""
    return create_react_agent(
        name=agent_name,
        model=get_llm_by_type(AGENT_LLM_MAP[agent_type]),
        tools=tools,
        prompt=lambda state: apply_prompt_template(prompt_template, state),
        pre_model_hook=pre_model_hook,
    )

1.2 Agent-LLM映射配置

AGENT_LLM_MAP: dict[str, LLMType] = {
    "coordinator": "basic",           # 协调器 - 用户交互管理
    "planner": "basic",              # 规划器 - 研究计划生成
    "researcher": "basic",           # 研究员 - 信息搜索与分析
    "coder": "basic",               # 编程员 - 代码分析与执行
    "reporter": "basic",            # 报告员 - 最终报告生成
    "podcast_script_writer": "basic", # 播客脚本编写
    "ppt_composer": "basic",         # PPT制作
    "prose_writer": "basic",         # 散文写作
    "prompt_enhancer": "basic",      # 提示词增强
}

2. 核心节点实现

2.1 协调器节点 (Coordinator Node)

协调器是系统的入口点,负责用户交互管理和工作流路由:

def coordinator_node(state: State, config: RunnableConfig):
    """协调器节点,处理用户交互和澄清功能"""
    enable_clarification = state.get("enable_clarification", False)
    
    # 传统模式:澄清功能禁用
    if not enable_clarification:
        messages = apply_prompt_template("coordinator", state)
        messages.append({
            "role": "system",
            "content": "澄清功能已禁用。必须立即调用handoff_to_planner工具。"
        })
        tools = [handoff_to_planner]
        
    # 澄清模式:支持多轮对话
    else:
        clarification_rounds = state.get("clarification_rounds", 0)
        max_clarification_rounds = state.get("max_clarification_rounds", 3)
        
        if clarification_rounds == 0:
            # 首轮澄清
            messages.append({
                "role": "system",
                "content": "澄清模式已启用。遵循澄清过程指导原则。"
            })
        elif clarification_rounds > 0:
            # 继续澄清对话
            clarification_history = state.get("clarification_history", [])
            # 添加用户响应到澄清历史
            last_message = state.get("messages", [])[-1] if state.get("messages") else None
            if last_message and hasattr(last_message, "content"):
                clarification_history.append(last_message.content)
        
        tools = [handoff_to_planner, handoff_after_clarification]
澄清功能工具定义
@tool
def handoff_to_planner(
    research_topic: Annotated[str, "要移交的研究任务主题"],
    locale: Annotated[str, "用户检测到的语言区域设置"],
):
    """移交给规划器智能体进行计划制定"""
    return

@tool  
def handoff_after_clarification(
    locale: Annotated[str, "用户检测到的语言区域设置"],
):
    """澄清轮次完成后移交给规划器"""
    return

2.2 规划器节点 (Planner Node)

规划器负责生成和管理研究计划:

def planner_node(state: State, config: RunnableConfig):
    """规划器节点,生成完整的研究计划"""
    configurable = Configuration.from_runnable_config(config)
    plan_iterations = state.get("plan_iterations", 0)
    
    # 澄清模式:仅使用澄清后的问题
    if state.get("enable_clarification") and state.get("clarified_question"):
        clean_state = {
            "messages": [{"role": "user", "content": state["clarified_question"]}],
            "locale": state.get("locale", "en-US"),
            "research_topic": state["clarified_question"],
        }
        messages = apply_prompt_template("planner", clean_state, configurable)
    else:
        # 正常模式:使用完整对话历史
        messages = apply_prompt_template("planner", state, configurable)
    
    # 背景调查结果集成
    if state.get("enable_background_investigation") and state.get("background_investigation_results"):
        messages += [{
            "role": "user",
            "content": f"背景调查结果:\n{state['background_investigation_results']}\n"
        }]
    
    # LLM选择策略
    if configurable.enable_deep_thinking:
        llm = get_llm_by_type("reasoning")
    elif AGENT_LLM_MAP["planner"] == "basic":
        llm = get_llm_by_type("basic").with_structured_output(Plan, method="json_mode")
    else:
        llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
    
    # 计划迭代限制检查
    if plan_iterations >= configurable.max_plan_iterations:
        return Command(goto="reporter")

2.3 背景调查节点

def background_investigation_node(state: State, config: RunnableConfig):
    """背景调查节点,进行初步信息收集"""
    configurable = Configuration.from_runnable_config(config)
    query = state.get("research_topic")
    
    if SELECTED_SEARCH_ENGINE == SearchEngine.TAVILY.value:
        searched_content = LoggedTavilySearch(
            max_results=configurable.max_search_results
        ).invoke(query)
        
        if isinstance(searched_content, list):
            background_investigation_results = [
                f"## {elem['title']}\n\n{elem['content']}" 
                for elem in searched_content
            ]
            return {
                "background_investigation_results": "\n\n".join(background_investigation_results)
            }
    else:
        background_investigation_results = get_web_search_tool(
            configurable.max_search_results
        ).invoke(query)
    
    return {
        "background_investigation_results": json.dumps(
            background_investigation_results, ensure_ascii=False
        )
    }

2.4 人机协作节点

def human_feedback_node(state):
    """人机协作节点,处理计划审核和反馈"""
    current_plan = state.get("current_plan", "")
    auto_accepted_plan = state.get("auto_accepted_plan", False)
    
    if not auto_accepted_plan:
        feedback = interrupt("请审核计划。")
        
        # 处理编辑计划反馈
        if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
            return Command(
                update={
                    "messages": [HumanMessage(content=feedback, name="feedback")]
                },
                goto="planner",
            )
        # 处理接受计划反馈
        elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
            logger.info("用户接受了计划。")
        else:
            raise TypeError(f"不支持的中断值:{feedback}")
    
    # 计划解析和验证
    plan_iterations = state.get("plan_iterations", 0) + 1
    try:
        current_plan = repair_json_output(current_plan)
        new_plan = json.loads(current_plan)
    except json.JSONDecodeError:
        logger.warning("规划器响应不是有效的JSON")
        if plan_iterations > 1:
            return Command(goto="reporter")
        else:
            return Command(goto="__end__")
    
    return Command(
        update={
            "current_plan": Plan.model_validate(new_plan),
            "plan_iterations": plan_iterations,
            "locale": new_plan["locale"],
        },
        goto="research_team",
    )

3. 研究团队节点架构

3.1 动态工具配置系统

async def _setup_and_execute_agent_step(
    state: State,
    config: RunnableConfig,
    agent_type: str,
    default_tools: list,
):
    """设置智能体工具并执行步骤的辅助函数"""
    configurable = Configuration.from_runnable_config(config)
    mcp_servers = {}
    enabled_tools = {}
    
    # 提取MCP服务器配置
    if configurable.mcp_settings:
        for server_name, server_config in configurable.mcp_settings["servers"].items():
            if (server_config["enabled_tools"] and 
                agent_type in server_config["add_to_agents"]):
                
                mcp_servers[server_name] = {
                    k: v for k, v in server_config.items()
                    if k in ("transport", "command", "args", "url", "env", "headers")
                }
                
                for tool_name in server_config["enabled_tools"]:
                    enabled_tools[tool_name] = server_name
    
    # 创建并执行带有MCP工具的智能体
    if mcp_servers:
        client = MultiServerMCPClient(mcp_servers)
        loaded_tools = default_tools[:]
        all_tools = await client.get_tools()
        
        for tool in all_tools:
            if tool.name in enabled_tools:
                tool.description = f"由'{enabled_tools[tool.name]}'提供支持。\n{tool.description}"
                loaded_tools.append(tool)
        
        # 上下文压缩钩子
        llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type])
        pre_model_hook = partial(ContextManager(llm_token_limit, 3).compress_messages)
        
        agent = create_agent(agent_type, agent_type, loaded_tools, agent_type, pre_model_hook)
        return await _execute_agent_step(state, agent, agent_type)

3.2 研究员节点

async def researcher_node(state: State, config: RunnableConfig):
    """研究员节点,执行信息搜索和分析"""
    configurable = Configuration.from_runnable_config(config)
    
    # 默认工具配置
    tools = [
        get_web_search_tool(configurable.max_search_results), 
        crawl_tool
    ]
    
    # 检索工具集成
    retriever_tool = get_retriever_tool(state.get("resources", []))
    if retriever_tool:
        tools.insert(0, retriever_tool)
    
    return await _setup_and_execute_agent_step(state, config, "researcher", tools)

3.3 编程员节点

async def coder_node(state: State, config: RunnableConfig):
    """编程员节点,执行代码分析和处理"""
    return await _setup_and_execute_agent_step(
        state, config, "coder", [python_repl_tool]
    )

3.4 步骤执行引擎

async def _execute_agent_step(state: State, agent, agent_name: str):
    """执行智能体步骤的辅助函数"""
    current_plan = state.get("current_plan")
    observations = state.get("observations", [])
    
    # 查找第一个未执行的步骤
    current_step = None
    completed_steps = []
    for step in current_plan.steps:
        if not step.execution_res:
            current_step = step
            break
        else:
            completed_steps.append(step)
    
    if not current_step:
        return Command(goto="research_team")
    
    # 格式化已完成步骤信息
    completed_steps_info = ""
    if completed_steps:
        completed_steps_info = "# 已完成的研究步骤\n\n"
        for i, step in enumerate(completed_steps):
            completed_steps_info += f"## 已完成步骤 {i + 1}: {step.title}\n\n"
            completed_steps_info += f"<finding>\n{step.execution_res}\n</finding>\n\n"
    
    # 准备智能体输入
    agent_input = {
        "messages": [HumanMessage(
            content=f"# 研究主题\n\n{current_plan.title}\n\n"
                   f"{completed_steps_info}"
                   f"# 当前步骤\n\n## 标题\n\n{current_step.title}\n\n"
                   f"## 描述\n\n{current_step.description}\n\n"
                   f"## 语言区域\n\n{state.get('locale', 'en-US')}"
        )]
    }
    
    # 递归限制配置
    recursion_limit = int(os.getenv("AGENT_RECURSION_LIMIT", "25"))
    
    # 执行智能体
    result = await agent.ainvoke(
        input=agent_input, 
        config={"recursion_limit": recursion_limit}
    )
    
    # 处理结果
    response_content = result["messages"][-1].content
    current_step.execution_res = response_content
    
    return Command(
        update={
            "messages": [HumanMessage(content=response_content, name=agent_name)],
            "observations": observations + [response_content],
        },
        goto="research_team",
    )

4. 报告员节点

def reporter_node(state: State, config: RunnableConfig):
    """报告员节点,生成最终研究报告"""
    configurable = Configuration.from_runnable_config(config)
    current_plan = state.get("current_plan")
    
    # 构建报告输入
    input_ = {
        "messages": [HumanMessage(
            f"# 研究要求\n\n## 任务\n\n{current_plan.title}\n\n"
            f"## 描述\n\n{current_plan.thought}"
        )],
        "locale": state.get("locale", "en-US"),
    }
    
    invoke_messages = apply_prompt_template("reporter", input_, configurable)
    observations = state.get("observations", [])
    
    # 添加格式和引用提醒
    invoke_messages.append(HumanMessage(
        content="重要提示:按照提示中的格式构建报告。记住包含:\n\n"
               "1. 关键要点 - 最重要发现的要点列表\n"
               "2. 概述 - 主题的简要介绍\n"
               "3. 详细分析 - 组织成逻辑部分\n"
               "4. 调查说明(可选)- 用于更全面的报告\n"
               "5. 关键引用 - 在末尾列出所有参考文献\n\n"
               "优先使用MARKDOWN表格进行数据展示和比较。",
        name="system",
    ))
    
    # 观察消息处理
    observation_messages = [
        HumanMessage(
            content=f"以下是研究任务的一些观察结果:\n\n{observation}",
            name="observation",
        )
        for observation in observations
    ]
    
    # 上下文压缩
    llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP["reporter"])
    compressed_state = ContextManager(llm_token_limit).compress_messages(
        {"messages": observation_messages}
    )
    invoke_messages += compressed_state.get("messages", [])
    
    # 生成报告
    response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
    return {"final_report": response.content}

5. 工作流控制系统

5.1 状态驱动路由

def continue_to_running_research_team(state: State):
    """决定研究团队的下一步行动"""
    current_plan = state.get("current_plan")
    
    if not current_plan or not current_plan.steps:
        return "planner"
    
    # 检查是否所有步骤都已完成
    if all(step.execution_res for step in current_plan.steps):
        return "planner"
    
    # 查找第一个未完成的步骤
    for step in current_plan.steps:
        if not step.execution_res:
            if step.step_type == StepType.RESEARCH:
                return "researcher"
            if step.step_type == StepType.PROCESSING:
                return "coder"
    
    return "planner"

5.2 澄清功能控制

def needs_clarification(state: dict) -> bool:
    """检查是否需要澄清"""
    if not state.get("enable_clarification", False):
        return False
    
    clarification_rounds = state.get("clarification_rounds", 0)
    is_clarification_complete = state.get("is_clarification_complete", False)
    max_clarification_rounds = state.get("max_clarification_rounds", 3)
    
    return (
        clarification_rounds > 0
        and not is_clarification_complete
        and clarification_rounds <= max_clarification_rounds
    )

6. 图构建器

def _build_base_graph():
    """构建并返回包含所有节点和边的基础状态图"""
    builder = StateGraph(State)
    
    # 添加节点
    builder.add_edge(START, "coordinator")
    builder.add_node("coordinator", coordinator_node)
    builder.add_node("background_investigator", background_investigation_node)
    builder.add_node("planner", planner_node)
    builder.add_node("reporter", reporter_node)
    builder.add_node("research_team", research_team_node)
    builder.add_node("researcher", researcher_node)
    builder.add_node("coder", coder_node)
    builder.add_node("human_feedback", human_feedback_node)
    
    # 添加边
    builder.add_edge("background_investigator", "planner")
    builder.add_edge("reporter", END)
    
    # 条件边
    builder.add_conditional_edges(
        "research_team",
        continue_to_running_research_team,
        ["planner", "researcher", "coder"],
    )
    
    builder.add_conditional_edges(
        "coordinator",
        lambda state: state.get("goto", "planner"),
        ["planner", "background_investigator", "coordinator", END],
    )
    
    return builder

def build_graph():
    """构建并返回不带内存的智能体工作流图"""
    builder = _build_base_graph()
    return builder.compile()

7. 架构优势

7.1 模块化设计

  • 职责分离:每个智能体专注于特定任务
  • 松耦合:节点间通过状态通信,易于维护
  • 可扩展性:新增智能体只需添加节点和配置

7.2 状态驱动工作流

  • 智能路由:基于当前状态自动决定下一步
  • 条件分支:支持复杂的业务逻辑
  • 错误恢复:具备容错和重试机制

7.3 工具集成能力

  • MCP协议支持:无缝集成外部工具和服务
  • 动态工具加载:运行时配置工具集合
  • 工具描述增强:自动标注工具来源

7.4 人机协作

  • 计划审核:支持人工审核和修改研究计划
  • 澄清对话:多轮对话澄清模糊需求
  • 中断恢复:支持工作流中断和恢复

8. 使用示例

# 创建基础智能体
agent = create_agent(
    agent_name="researcher",
    agent_type="researcher", 
    tools=[search_tool, crawl_tool],
    prompt_template="researcher",
    pre_model_hook=context_compressor
)

# 构建工作流图
graph = build_graph()

# 执行工作流
result = await graph.ainvoke({
    "messages": [HumanMessage(content="研究人工智能的发展趋势")],
    "research_topic": "人工智能发展趋势",
    "locale": "zh-CN"
})

总结

DeerFlow的Agent模块通过精心设计的架构实现了:

  1. 专业化智能体系统:每个智能体专注特定领域
  2. 灵活的工作流控制:状态驱动的智能路由
  3. 强大的工具集成:MCP协议和动态工具加载
  4. 优秀的人机协作:澄清对话和计划审核
  5. 高度可扩展性:模块化设计易于扩展

这种架构使得DeerFlow能够处理复杂的研究任务,同时保持系统的灵活性和可维护性。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐