深入理解Spring AI Alibaba的MCP Node:构建智能AI应用的核心组件
MCP Node是Spring AI Alibaba图计算框架中的一个关键节点,它充当AI大语言模型与外部工具之间的智能协调器。通过MCP Node,我们可以轻松地将自然语言查询转换为对多个工具的协调调用,并将结果整合成统一的响应。配置驱动:通过YAML配置灵活定义节点和工具关系动态发现:运行时自动发现和加载相关工具智能协调:AI模型智能识别和协调多个工具调用状态管理:统一的状态管理和更新机制可观
在现代AI应用开发中,如何有效地整合大语言模型(LLM)与各种外部工具已成为关键挑战。Spring AI Alibaba提供了一套优雅的解决方案,其中MCP Node(Multi-Model Control Plane Node)作为核心组件,为我们构建复杂的AI应用提供了强大的支撑。
什么是MCP Node?
MCP Node是Spring AI Alibaba图计算框架中的一个关键节点,它充当AI大语言模型与外部工具之间的智能协调器。通过MCP Node,我们可以轻松地将自然语言查询转换为对多个工具的协调调用,并将结果整合成统一的响应。
核心架构解析
1. 配置驱动的设计
MCP Node的强大之处首先体现在其配置驱动的设计上。通过McpNodeProperties类,我们可以灵活地定义节点与服务器的映射关系:
spring:
ai:
graph:
nodes:
weather-node:
- "beijing-server"
- "shanghai-server"
finance-node:
- "stock-server"
- "fund-server"
这种设计让我们可以通过简单的配置文件调整,而无需修改代码就能重新组织工具的调用关系。
2. 动态工具发现机制
MCP Node通过McpClientToolCallbackProvider实现了智能的工具发现机制:
public Set<ToolCallback> findToolCallbacks(String nodeName) {
Set<String> mcpClients = mcpNodeProperties.getNode2servers().get(nodeName);
// 通过前缀匹配动态筛选相关工具
// weather_client_beijing_server_getWeather
// weather_client_shanghai_server_getWeather
}
这种前缀匹配机制确保了每个节点只能访问其配置范围内的工具,实现了良好的隔离性和安全性。
3. 核心执行逻辑
MCP Node的核心执行逻辑封装在apply方法中:
@Override
public Map<String, Object> apply(OverAllState state) {
// 1. 从状态中获取用户查询
String query = state.value("query", "");
// 2. 调用AI模型(集成相关工具)
Flux<String> streamResult = chatClient.prompt(query).stream().content();
// 3. 处理流式响应
String result = streamResult.reduce("", (acc, item) -> acc + item).block();
// 4. 返回结果状态
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("mcp_content", result);
return resultMap;
}
完整的工作流程
让我们通过一个具体的例子来理解MCP Node的完整工作流程:
场景:智能天气查询系统
用户发起请求:"查询北京和上海今天的天气"
1. 系统启动阶段
配置文件加载
# application.yml
spring:
ai:
mcp:
client:
name: "weather-client"
graph:
nodes:
weather-node: # 节点名称
- "beijing-server" # 逻辑服务器标识符
- "shanghai-server" # 逻辑服务器标识符
工具类注册
// 北京天气工具实现
@Component
public class BeijingWeatherTool {
@Autowired
private BeijingWeatherService service;
@Tool(name = "weather_client_beijing_server_getCurrentWeather")
public String getCurrentWeather(String city) {
logger.info("调用北京天气工具,城市: {}", city);
WeatherData data = service.getWeather(city);
return String.format("北京天气:%s,温度:%s°C",
data.getDescription(), data.getTemperature());
}
}
// 上海天气工具实现
@Component
public class ShanghaiWeatherTool {
@Autowired
private ShanghaiWeatherService service;
@Tool(name = "weather_client_shanghai_server_getCurrentWeather")
public String getCurrentWeather(String city) {
logger.info("调用上海天气工具,城市: {}", city);
WeatherData data = service.getWeather(city);
return String.format("上海天气:%s,温度:%s°C",
data.getDescription(), data.getTemperature());
}
}
2. MCP Node初始化阶段
McpGraphConfiguration配置类执行
@Configuration
@EnableConfigurationProperties({ McpNodeProperties.class })
public class McpGraphConfiguration {
@Bean
public StateGraph mcpGraph(ChatClient.Builder chatClientBuilder,
McpClientToolCallbackProvider callbackProvider) {
// 定义状态更新策略
KeyStrategyFactory keyStrategyFactory = () -> {
HashMap<String, KeyStrategy> strategies = new HashMap<>();
strategies.put("query", new ReplaceStrategy()); // 查询替换策略
strategies.put("mcp_content", new ReplaceStrategy()); // 结果替换策略
return strategies;
};
// 构建图流程 - 这里触发MCP Node初始化
StateGraph stateGraph = new StateGraph(keyStrategyFactory)
.addNode("mcp", node_async(new McpNode(chatClientBuilder, callbackProvider)))
.addEdge(StateGraph.START, "mcp")
.addEdge("mcp", StateGraph.END);
return stateGraph;
}
}
MCP Node构造函数执行
public class McpNode implements NodeAction {
private static final String NODE_NAME = "mcp-node";
private final ChatClient chatClient;
public McpNode(ChatClient.Builder chatClientBuilder,
McpClientToolCallbackProvider mcpClientToolCallbackProvider) {
logger.info("开始初始化MCP Node: {}", NODE_NAME);
// 关键步骤1:动态加载工具回调
Set<ToolCallback> toolCallbacks = mcpClientToolCallbackProvider.findToolCallbacks(NODE_NAME);
// 记录加载的工具
for (ToolCallback toolCallback : toolCallbacks) {
String toolName = toolCallback.getToolDefinition().name();
logger.info("MCP Node加载工具回调: {}", toolName);
// 输出:MCP Node加载工具回调: weather_client_beijing_server_getCurrentWeather
// 输出:MCP Node加载工具回调: weather_client_shanghai_server_getCurrentWeather
}
// 关键步骤2:构建聊天客户端,集成工具
this.chatClient = chatClientBuilder
.defaultToolCallbacks(toolCallbacks.toArray(ToolCallback[]::new))
.build();
logger.info("MCP Node初始化完成,共加载 {} 个工具", toolCallbacks.size());
}
}
工具回调查找过程
@Service
public class McpClientToolCallbackProvider {
public Set<ToolCallback> findToolCallbacks(String nodeName) {
logger.debug("开始查找节点 {} 的工具回调", nodeName);
Set<ToolCallback> defineCallback = Sets.newHashSet();
// 步骤1:根据节点名称获取配置的服务器列表
Set<String> mcpClients = mcpNodeProperties.getNode2servers().get(nodeName);
// 结果:["beijing-server", "shanghai-server"]
if (mcpClients == null || mcpClients.isEmpty()) {
logger.warn("节点 {} 未配置服务器", nodeName);
return defineCallback;
}
logger.debug("节点 {} 配置的服务器: {}", nodeName, mcpClients);
// 步骤2:构建期望的工具前缀列表
List<String> exceptMcpClientNames = Lists.newArrayList();
for (String mcpClient : mcpClients) {
String name = commonProperties.getName(); // "weather-client"
String prefixedMcpClientName = McpToolUtils.prefixedToolName(name, mcpClient);
// 结果:["weather_client_beijing_server", "weather_client_shanghai_server"]
exceptMcpClientNames.add(prefixedMcpClientName);
logger.debug("生成工具前缀: {}", prefixedMcpClientName);
}
// 步骤3:过滤匹配的工具回调
ToolCallback[] toolCallbacks = toolCallbackProvider.getToolCallbacks();
logger.debug("系统中共有 {} 个工具回调", toolCallbacks.length);
for (ToolCallback toolCallback : toolCallbacks) {
ToolDefinition toolDefinition = toolCallback.getToolDefinition();
String toolName = toolDefinition.name();
logger.debug("检查工具: {}", toolName);
for (String exceptMcpClientName : exceptMcpClientNames) {
if (toolName.startsWith(exceptMcpClientName)) {
logger.debug("工具 {} 匹配前缀 {}", toolName, exceptMcpClientName);
defineCallback.add(toolCallback);
break;
}
}
}
logger.info("为节点 {} 找到 {} 个匹配的工具回调", nodeName, defineCallback.size());
return defineCallback;
}
}
3. HTTP请求处理阶段
Controller接收请求
@RestController
@RequestMapping("/graph/mcp")
public class McpController {
@GetMapping("/call")
public Map<String, Object> call(
@RequestParam(value = "query", defaultValue = "北京时间现在几点钟") String query,
@RequestParam(value = "thread_id", defaultValue = "default") String threadId)
throws GraphRunnerException {
logger.info("接收到天气查询请求 - 查询: {}, 线程ID: {}", query, threadId);
// 构建运行配置
RunnableConfig runnableConfig = RunnableConfig.builder()
.threadId(threadId)
.build();
// 构建输入状态
Map<String, Object> inputState = new HashMap<>();
inputState.put("query", query); // "查询北京和上海今天的天气"
logger.debug("准备调用图计算引擎,输入状态: {}", inputState);
// 调用编译后的图执行
Optional<OverAllState> result = this.compiledGraph.invoke(inputState, runnableConfig);
// 返回结果
Map<String, Object> response = result.map(OverAllState::data)
.orElse(new HashMap<>());
logger.info("天气查询完成,返回结果大小: {}", response.size());
return response;
}
}
4. 图计算执行阶段
StateGraph编译和执行
// CompiledGraph.invoke方法内部执行流程
public Optional<OverAllState> invoke(Map<String, Object> input, RunnableConfig config) {
logger.debug("开始执行图计算,线程ID: {}", config.getThreadId());
// 初始化状态
OverAllState currentState = new OverAllState(input);
logger.debug("初始状态: {}", currentState.data());
// 按照边的顺序执行节点
// START → mcp → END
// 执行MCP节点
NodeAction mcpNode = getNode("mcp");
Map<String, Object> nodeResult = mcpNode.apply(currentState);
// 更新状态
currentState = currentState.merge(nodeResult);
logger.debug("节点执行完成,更新后状态: {}", currentState.data());
return Optional.of(currentState);
}
5. MCP Node核心执行阶段
apply方法执行
@Override
public Map<String, Object> apply(OverAllState state) {
logger.info("MCP Node开始执行");
// 步骤1:从状态中获取查询内容
String query = state.value("query", "");
logger.info("获取到查询内容: {}", query); // "查询北京和上海今天的天气"
// 步骤2:调用AI模型(此时已集成相关工具)
logger.debug("开始调用AI模型处理查询");
Flux<String> streamResult = chatClient.prompt(query).stream().content();
// AI模型处理过程(内部机制):
// 1. 分析用户查询:"查询北京和上海今天的天气"
// 2. 识别意图:需要获取两个城市的天气信息
// 3. 确定需要调用的工具:
// - weather_client_beijing_server_getCurrentWeather
// - weather_client_shanghai_server_getCurrentWeather
// 4. 并行调用这两个工具
// 工具调用日志示例:
// [BeijingWeatherTool] 调用北京天气工具,城市: 北京
// [ShanghaiWeatherTool] 调用上海天气工具,城市: 上海
// 工具返回结果:
// 北京天气:晴朗,温度:25°C
// 上海天气:多云,温度:28°C
// 5. AI模型整合工具结果生成最终回答
// 步骤3:处理流式响应
logger.debug("处理AI模型的流式响应");
String result = streamResult.reduce("", (acc, item) -> {
logger.trace("接收到流片段: {}", item);
return acc + item;
}).block();
logger.info("AI模型生成的最终结果: {}", result);
// 示例结果:"根据查询,北京当前天气晴朗,温度25°C;上海当前天气多云,温度28°C"
// 步骤4:封装结果状态
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("mcp_content", result);
logger.info("MCP Node执行完成,返回结果状态");
return resultMap;
}
6. 工具调用详细过程
AI模型内部工具协调(简化示例)
// 这是Spring AI内部的工具调用机制
public class ToolCallingProcessor {
public List<ToolResponse> processToolCalls(List<ToolCall> toolCalls) {
List<ToolResponse> responses = new ArrayList<>();
// 并行处理工具调用
toolCalls.parallelStream().forEach(toolCall -> {
String toolName = toolCall.getName();
Map<String, Object> arguments = toolCall.getArguments();
logger.info("调用工具: {}, 参数: {}", toolName, arguments);
// 根据工具名称找到对应的工具回调
ToolCallback callback = findToolCallback(toolName);
if (callback != null) {
try {
// 执行工具调用
Object result = callback.execute(arguments);
responses.add(new ToolResponse(toolCall.getId(), result.toString()));
logger.info("工具 {} 调用成功,结果长度: {}", toolName, result.toString().length());
} catch (Exception e) {
logger.error("工具 {} 调用失败", toolName, e);
responses.add(new ToolResponse(toolCall.getId(), "工具调用失败: " + e.getMessage()));
}
}
});
return responses;
}
}
7. 状态更新和返回阶段
状态合并过程
// OverAllState.merge方法
public OverAllState merge(Map<String, Object> updates) {
logger.debug("合并状态更新,原状态大小: {}, 更新大小: {}",
this.data.size(), updates.size());
HashMap<String, Object> newData = new HashMap<>(this.data);
for (Map.Entry<String, Object> entry : updates.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// 根据配置的策略更新状态
KeyStrategy strategy = keyStrategyFactory.getStrategy(key);
if (strategy != null) {
Object mergedValue = strategy.merge(newData.get(key), value);
newData.put(key, mergedValue);
logger.debug("使用策略 {} 更新键 {}: {}",
strategy.getClass().getSimpleName(), key, mergedValue);
} else {
newData.put(key, value);
logger.debug("直接更新键 {}: {}", key, value);
}
}
return new OverAllState(newData);
}
最终响应返回
// Controller最终返回结果
{
"query": "查询北京和上海今天的天气",
"mcp_content": "根据查询,北京当前天气晴朗,温度25°C;上海当前天气多云,温度28°C"
}
8. 完整的日志输出示例
2025-07-25 14:30:00 INFO McpGraphConfiguration - 开始初始化MCP图配置
2025-07-25 14:30:01 INFO McpNode - 开始初始化MCP Node: mcp-node
2025-07-25 14:30:01 INFO McpClientToolCallbackProvider - 开始查找节点 mcp-node 的工具回调
2025-07-25 14:30:01 DEBUG McpClientToolCallbackProvider - 节点 mcp-node 配置的服务器: [beijing-server, shanghai-server]
2025-07-25 14:30:01 DEBUG McpClientToolCallbackProvider - 生成工具前缀: weather_client_beijing_server
2025-07-25 14:30:01 DEBUG McpClientToolCallbackProvider - 生成工具前缀: weather_client_shanghai_server
2025-07-25 14:30:01 INFO McpClientToolCallbackProvider - 为节点 mcp-node 找到 2 个匹配的工具回调
2025-07-25 14:30:01 INFO McpNode - MCP Node加载工具回调: weather_client_beijing_server_getCurrentWeather
2025-07-25 14:30:01 INFO McpNode - MCP Node加载工具回调: weather_client_shanghai_server_getCurrentWeather
2025-07-25 14:30:01 INFO McpNode - MCP Node初始化完成,共加载 2 个工具
2025-07-25 14:30:01 INFO McpController - 接收到天气查询请求 - 查询: 查询北京和上海今天的天气, 线程ID: default
2025-07-25 14:30:02 INFO McpNode - MCP Node开始执行
2025-07-25 14:30:02 INFO McpNode - 获取到查询内容: 查询北京和上海今天的天气
2025-07-25 14:30:02 DEBUG McpNode - 开始调用AI模型处理查询
2025-07-25 14:30:03 INFO BeijingWeatherTool - 调用北京天气工具,城市: 北京
2025-07-25 14:30:03 INFO ShanghaiWeatherTool - 调用上海天气工具,城市: 上海
2025-07-25 14:30:04 INFO McpNode - AI模型生成的最终结果: 根据查询,北京当前天气晴朗,温度25°C;上海当前天气多云,温度28°C
2025-07-25 14:30:04 INFO McpNode - MCP Node执行完成,返回结果状态
2025-07-25 14:30:04 INFO McpController - 天气查询完成,返回结果大小: 2
总结
整个MCP Node的工作流程体现了现代AI应用开发的核心理念:
- 配置驱动:通过YAML配置灵活定义节点和工具关系
- 动态发现:运行时自动发现和加载相关工具
- 智能协调:AI模型智能识别和协调多个工具调用
- 状态管理:统一的状态管理和更新机制
- 可观测性:完整的日志记录和监控支持
这种设计使得开发者可以专注于业务逻辑的实现,而框架负责处理复杂的工具协调和状态管理,真正实现了"让AI应用开发更简单"的目标。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)