在现代AI应用开发中,如何有效地整合大语言模型(LLM)与各种外部工具已成为关键挑战。Spring AI Alibaba提供了一套优雅的解决方案,其中MCP Node(Multi-Model Control Plane Node)作为核心组件,为我们构建复杂的AI应用提供了强大的支撑。

什么是MCP Node?

MCP Node是Spring AI Alibaba图计算框架中的一个关键节点,它充当AI大语言模型与外部工具之间的智能协调器。通过MCP Node,我们可以轻松地将自然语言查询转换为对多个工具的协调调用,并将结果整合成统一的响应。

核心架构解析

1. 配置驱动的设计

MCP Node的强大之处首先体现在其配置驱动的设计上。通过McpNodeProperties类,我们可以灵活地定义节点与服务器的映射关系:

spring:
  ai:
    graph:
      nodes:
        weather-node:
          - "beijing-server"
          - "shanghai-server"
        finance-node:
          - "stock-server"
          - "fund-server"

这种设计让我们可以通过简单的配置文件调整,而无需修改代码就能重新组织工具的调用关系。

2. 动态工具发现机制

MCP Node通过McpClientToolCallbackProvider实现了智能的工具发现机制:

public Set<ToolCallback> findToolCallbacks(String nodeName) {
    Set<String> mcpClients = mcpNodeProperties.getNode2servers().get(nodeName);
    // 通过前缀匹配动态筛选相关工具
    // weather_client_beijing_server_getWeather
    // weather_client_shanghai_server_getWeather
}

这种前缀匹配机制确保了每个节点只能访问其配置范围内的工具,实现了良好的隔离性和安全性。

3. 核心执行逻辑

MCP Node的核心执行逻辑封装在apply方法中:

@Override
public Map<String, Object> apply(OverAllState state) {
    // 1. 从状态中获取用户查询
    String query = state.value("query", "");
    
    // 2. 调用AI模型(集成相关工具)
    Flux<String> streamResult = chatClient.prompt(query).stream().content();
    
    // 3. 处理流式响应
    String result = streamResult.reduce("", (acc, item) -> acc + item).block();
    
    // 4. 返回结果状态
    HashMap<String, Object> resultMap = new HashMap<>();
    resultMap.put("mcp_content", result);
    return resultMap;
}

完整的工作流程

让我们通过一个具体的例子来理解MCP Node的完整工作流程:

场景:智能天气查询系统

用户发起请求:"查询北京和上海今天的天气"

1. 系统启动阶段

配置文件加载

# application.yml
spring:
  ai:
    mcp:
      client:
        name: "weather-client"
    graph:
      nodes:
        weather-node:           # 节点名称
          - "beijing-server"    # 逻辑服务器标识符
          - "shanghai-server"   # 逻辑服务器标识符

工具类注册

// 北京天气工具实现
@Component
public class BeijingWeatherTool {
    @Autowired
    private BeijingWeatherService service;
    
    @Tool(name = "weather_client_beijing_server_getCurrentWeather")
    public String getCurrentWeather(String city) {
        logger.info("调用北京天气工具,城市: {}", city);
        WeatherData data = service.getWeather(city);
        return String.format("北京天气:%s,温度:%s°C", 
                           data.getDescription(), data.getTemperature());
    }
}

// 上海天气工具实现  
@Component
public class ShanghaiWeatherTool {
    @Autowired
    private ShanghaiWeatherService service;
    
    @Tool(name = "weather_client_shanghai_server_getCurrentWeather")
    public String getCurrentWeather(String city) {
        logger.info("调用上海天气工具,城市: {}", city);
        WeatherData data = service.getWeather(city);
        return String.format("上海天气:%s,温度:%s°C", 
                           data.getDescription(), data.getTemperature());
    }
}

2. MCP Node初始化阶段

McpGraphConfiguration配置类执行

@Configuration
@EnableConfigurationProperties({ McpNodeProperties.class })
public class McpGraphConfiguration {
    
    @Bean
    public StateGraph mcpGraph(ChatClient.Builder chatClientBuilder, 
                              McpClientToolCallbackProvider callbackProvider) {
        
        // 定义状态更新策略
        KeyStrategyFactory keyStrategyFactory = () -> {
            HashMap<String, KeyStrategy> strategies = new HashMap<>();
            strategies.put("query", new ReplaceStrategy());      // 查询替换策略
            strategies.put("mcp_content", new ReplaceStrategy()); // 结果替换策略
            return strategies;
        };
        
        // 构建图流程 - 这里触发MCP Node初始化
        StateGraph stateGraph = new StateGraph(keyStrategyFactory)
                .addNode("mcp", node_async(new McpNode(chatClientBuilder, callbackProvider)))
                .addEdge(StateGraph.START, "mcp")
                .addEdge("mcp", StateGraph.END);
                
        return stateGraph;
    }
}

MCP Node构造函数执行

public class McpNode implements NodeAction {
    private static final String NODE_NAME = "mcp-node";
    private final ChatClient chatClient;
    
    public McpNode(ChatClient.Builder chatClientBuilder, 
                   McpClientToolCallbackProvider mcpClientToolCallbackProvider) {
        
        logger.info("开始初始化MCP Node: {}", NODE_NAME);
        
        // 关键步骤1:动态加载工具回调
        Set<ToolCallback> toolCallbacks = mcpClientToolCallbackProvider.findToolCallbacks(NODE_NAME);
        
        // 记录加载的工具
        for (ToolCallback toolCallback : toolCallbacks) {
            String toolName = toolCallback.getToolDefinition().name();
            logger.info("MCP Node加载工具回调: {}", toolName);
            // 输出:MCP Node加载工具回调: weather_client_beijing_server_getCurrentWeather
            // 输出:MCP Node加载工具回调: weather_client_shanghai_server_getCurrentWeather
        }
        
        // 关键步骤2:构建聊天客户端,集成工具
        this.chatClient = chatClientBuilder
                .defaultToolCallbacks(toolCallbacks.toArray(ToolCallback[]::new))
                .build();
                
        logger.info("MCP Node初始化完成,共加载 {} 个工具", toolCallbacks.size());
    }
}

工具回调查找过程

@Service
public class McpClientToolCallbackProvider {
    
    public Set<ToolCallback> findToolCallbacks(String nodeName) {
        logger.debug("开始查找节点 {} 的工具回调", nodeName);
        
        Set<ToolCallback> defineCallback = Sets.newHashSet();
        
        // 步骤1:根据节点名称获取配置的服务器列表
        Set<String> mcpClients = mcpNodeProperties.getNode2servers().get(nodeName);
        // 结果:["beijing-server", "shanghai-server"]
        
        if (mcpClients == null || mcpClients.isEmpty()) {
            logger.warn("节点 {} 未配置服务器", nodeName);
            return defineCallback;
        }
        
        logger.debug("节点 {} 配置的服务器: {}", nodeName, mcpClients);
        
        // 步骤2:构建期望的工具前缀列表
        List<String> exceptMcpClientNames = Lists.newArrayList();
        for (String mcpClient : mcpClients) {
            String name = commonProperties.getName(); // "weather-client"
            String prefixedMcpClientName = McpToolUtils.prefixedToolName(name, mcpClient);
            // 结果:["weather_client_beijing_server", "weather_client_shanghai_server"]
            exceptMcpClientNames.add(prefixedMcpClientName);
            logger.debug("生成工具前缀: {}", prefixedMcpClientName);
        }
        
        // 步骤3:过滤匹配的工具回调
        ToolCallback[] toolCallbacks = toolCallbackProvider.getToolCallbacks();
        logger.debug("系统中共有 {} 个工具回调", toolCallbacks.length);
        
        for (ToolCallback toolCallback : toolCallbacks) {
            ToolDefinition toolDefinition = toolCallback.getToolDefinition();
            String toolName = toolDefinition.name();
            logger.debug("检查工具: {}", toolName);
            
            for (String exceptMcpClientName : exceptMcpClientNames) {
                if (toolName.startsWith(exceptMcpClientName)) {
                    logger.debug("工具 {} 匹配前缀 {}", toolName, exceptMcpClientName);
                    defineCallback.add(toolCallback);
                    break;
                }
            }
        }
        
        logger.info("为节点 {} 找到 {} 个匹配的工具回调", nodeName, defineCallback.size());
        return defineCallback;
    }
}

3. HTTP请求处理阶段

Controller接收请求

@RestController
@RequestMapping("/graph/mcp")
public class McpController {
    
    @GetMapping("/call")
    public Map<String, Object> call(
            @RequestParam(value = "query", defaultValue = "北京时间现在几点钟") String query,
            @RequestParam(value = "thread_id", defaultValue = "default") String threadId) 
            throws GraphRunnerException {
        
        logger.info("接收到天气查询请求 - 查询: {}, 线程ID: {}", query, threadId);
        
        // 构建运行配置
        RunnableConfig runnableConfig = RunnableConfig.builder()
                .threadId(threadId)
                .build();
        
        // 构建输入状态
        Map<String, Object> inputState = new HashMap<>();
        inputState.put("query", query);  // "查询北京和上海今天的天气"
        
        logger.debug("准备调用图计算引擎,输入状态: {}", inputState);
        
        // 调用编译后的图执行
        Optional<OverAllState> result = this.compiledGraph.invoke(inputState, runnableConfig);
        
        // 返回结果
        Map<String, Object> response = result.map(OverAllState::data)
                                           .orElse(new HashMap<>());
        
        logger.info("天气查询完成,返回结果大小: {}", response.size());
        return response;
    }
}

4. 图计算执行阶段

StateGraph编译和执行

// CompiledGraph.invoke方法内部执行流程
public Optional<OverAllState> invoke(Map<String, Object> input, RunnableConfig config) {
    logger.debug("开始执行图计算,线程ID: {}", config.getThreadId());
    
    // 初始化状态
    OverAllState currentState = new OverAllState(input);
    logger.debug("初始状态: {}", currentState.data());
    
    // 按照边的顺序执行节点
    // START → mcp → END
    
    // 执行MCP节点
    NodeAction mcpNode = getNode("mcp");
    Map<String, Object> nodeResult = mcpNode.apply(currentState);
    
    // 更新状态
    currentState = currentState.merge(nodeResult);
    logger.debug("节点执行完成,更新后状态: {}", currentState.data());
    
    return Optional.of(currentState);
}

5. MCP Node核心执行阶段

apply方法执行

@Override
public Map<String, Object> apply(OverAllState state) {
    logger.info("MCP Node开始执行");
    
    // 步骤1:从状态中获取查询内容
    String query = state.value("query", "");
    logger.info("获取到查询内容: {}", query); // "查询北京和上海今天的天气"
    
    // 步骤2:调用AI模型(此时已集成相关工具)
    logger.debug("开始调用AI模型处理查询");
    Flux<String> streamResult = chatClient.prompt(query).stream().content();
    
    // AI模型处理过程(内部机制):
    // 1. 分析用户查询:"查询北京和上海今天的天气"
    // 2. 识别意图:需要获取两个城市的天气信息
    // 3. 确定需要调用的工具:
    //    - weather_client_beijing_server_getCurrentWeather
    //    - weather_client_shanghai_server_getCurrentWeather
    // 4. 并行调用这两个工具
    
    // 工具调用日志示例:
    // [BeijingWeatherTool] 调用北京天气工具,城市: 北京
    // [ShanghaiWeatherTool] 调用上海天气工具,城市: 上海
    
    // 工具返回结果:
    // 北京天气:晴朗,温度:25°C
    // 上海天气:多云,温度:28°C
    
    // 5. AI模型整合工具结果生成最终回答
    
    // 步骤3:处理流式响应
    logger.debug("处理AI模型的流式响应");
    String result = streamResult.reduce("", (acc, item) -> {
        logger.trace("接收到流片段: {}", item);
        return acc + item;
    }).block();
    
    logger.info("AI模型生成的最终结果: {}", result);
    // 示例结果:"根据查询,北京当前天气晴朗,温度25°C;上海当前天气多云,温度28°C"
    
    // 步骤4:封装结果状态
    HashMap<String, Object> resultMap = new HashMap<>();
    resultMap.put("mcp_content", result);
    
    logger.info("MCP Node执行完成,返回结果状态");
    return resultMap;
}

6. 工具调用详细过程

AI模型内部工具协调(简化示例)

// 这是Spring AI内部的工具调用机制
public class ToolCallingProcessor {
    
    public List<ToolResponse> processToolCalls(List<ToolCall> toolCalls) {
        List<ToolResponse> responses = new ArrayList<>();
        
        // 并行处理工具调用
        toolCalls.parallelStream().forEach(toolCall -> {
            String toolName = toolCall.getName();
            Map<String, Object> arguments = toolCall.getArguments();
            
            logger.info("调用工具: {}, 参数: {}", toolName, arguments);
            
            // 根据工具名称找到对应的工具回调
            ToolCallback callback = findToolCallback(toolName);
            if (callback != null) {
                try {
                    // 执行工具调用
                    Object result = callback.execute(arguments);
                    responses.add(new ToolResponse(toolCall.getId(), result.toString()));
                    logger.info("工具 {} 调用成功,结果长度: {}", toolName, result.toString().length());
                } catch (Exception e) {
                    logger.error("工具 {} 调用失败", toolName, e);
                    responses.add(new ToolResponse(toolCall.getId(), "工具调用失败: " + e.getMessage()));
                }
            }
        });
        
        return responses;
    }
}

7. 状态更新和返回阶段

状态合并过程

// OverAllState.merge方法
public OverAllState merge(Map<String, Object> updates) {
    logger.debug("合并状态更新,原状态大小: {}, 更新大小: {}", 
                this.data.size(), updates.size());
    
    HashMap<String, Object> newData = new HashMap<>(this.data);
    
    for (Map.Entry<String, Object> entry : updates.entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        
        // 根据配置的策略更新状态
        KeyStrategy strategy = keyStrategyFactory.getStrategy(key);
        if (strategy != null) {
            Object mergedValue = strategy.merge(newData.get(key), value);
            newData.put(key, mergedValue);
            logger.debug("使用策略 {} 更新键 {}: {}", 
                        strategy.getClass().getSimpleName(), key, mergedValue);
        } else {
            newData.put(key, value);
            logger.debug("直接更新键 {}: {}", key, value);
        }
    }
    
    return new OverAllState(newData);
}

最终响应返回

// Controller最终返回结果
{
    "query": "查询北京和上海今天的天气",
    "mcp_content": "根据查询,北京当前天气晴朗,温度25°C;上海当前天气多云,温度28°C"
}

8. 完整的日志输出示例

2025-07-25 14:30:00 INFO  McpGraphConfiguration - 开始初始化MCP图配置
2025-07-25 14:30:01 INFO  McpNode - 开始初始化MCP Node: mcp-node
2025-07-25 14:30:01 INFO  McpClientToolCallbackProvider - 开始查找节点 mcp-node 的工具回调
2025-07-25 14:30:01 DEBUG McpClientToolCallbackProvider - 节点 mcp-node 配置的服务器: [beijing-server, shanghai-server]
2025-07-25 14:30:01 DEBUG McpClientToolCallbackProvider - 生成工具前缀: weather_client_beijing_server
2025-07-25 14:30:01 DEBUG McpClientToolCallbackProvider - 生成工具前缀: weather_client_shanghai_server
2025-07-25 14:30:01 INFO  McpClientToolCallbackProvider - 为节点 mcp-node 找到 2 个匹配的工具回调
2025-07-25 14:30:01 INFO  McpNode - MCP Node加载工具回调: weather_client_beijing_server_getCurrentWeather
2025-07-25 14:30:01 INFO  McpNode - MCP Node加载工具回调: weather_client_shanghai_server_getCurrentWeather
2025-07-25 14:30:01 INFO  McpNode - MCP Node初始化完成,共加载 2 个工具
2025-07-25 14:30:01 INFO  McpController - 接收到天气查询请求 - 查询: 查询北京和上海今天的天气, 线程ID: default
2025-07-25 14:30:02 INFO  McpNode - MCP Node开始执行
2025-07-25 14:30:02 INFO  McpNode - 获取到查询内容: 查询北京和上海今天的天气
2025-07-25 14:30:02 DEBUG McpNode - 开始调用AI模型处理查询
2025-07-25 14:30:03 INFO  BeijingWeatherTool - 调用北京天气工具,城市: 北京
2025-07-25 14:30:03 INFO  ShanghaiWeatherTool - 调用上海天气工具,城市: 上海
2025-07-25 14:30:04 INFO  McpNode - AI模型生成的最终结果: 根据查询,北京当前天气晴朗,温度25°C;上海当前天气多云,温度28°C
2025-07-25 14:30:04 INFO  McpNode - MCP Node执行完成,返回结果状态
2025-07-25 14:30:04 INFO  McpController - 天气查询完成,返回结果大小: 2

总结

整个MCP Node的工作流程体现了现代AI应用开发的核心理念:

  1. 配置驱动:通过YAML配置灵活定义节点和工具关系
  2. 动态发现:运行时自动发现和加载相关工具
  3. 智能协调:AI模型智能识别和协调多个工具调用
  4. 状态管理:统一的状态管理和更新机制
  5. 可观测性:完整的日志记录和监控支持

这种设计使得开发者可以专注于业务逻辑的实现,而框架负责处理复杂的工具协调和状态管理,真正实现了"让AI应用开发更简单"的目标。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐