写在开始

MCP原理与开发及与大模型交互流程这篇文章中我们已经介绍了MCP是什么,它与AI应用、大模型之间的关系。

并且,我们也借助Python的mcp库开发了一个MCP Server服务。

但有时候,我们要作为中间方这种特殊场景,需要我们自己了解协议的细节。

这篇文章,就主要介绍MCP的交互流程细节和传输协议。

主要是流程方法,这样如果你有需求,就可以按照方法去处理,不用反复去猜了,不会涉及长篇大论。

mcp服务器

我们还是使用Python的mcp库来作为我们的MCP服务器,我们去开发对应的客户端来理解协议。

uv init
uv add mcp[cli]
uv run main.py

不理解uv的可以参考:Python虚拟环境与包管理工具(uv、Conda)

main.py的内容:

from mcp.server.fastmcp import FastMCP

# Create an MCP server
mcp = FastMCP("Demo")


# Add an addition tool
@mcp.tool()
def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b


# Add a dynamic greeting resource
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
    """Get a personalized greeting"""
    return f"Hello, {name}!"


# Add a prompt
@mcp.prompt()
def greet_user(name: str, style: str = "friendly") -> str:
    """Generate a greeting prompt"""
    styles = {
        "friendly": "Please write a warm, friendly greeting",
        "formal": "Please write a formal, professional greeting",
        "casual": "Please write a casual, relaxed greeting",
    }

    return f"{styles.get(style, styles['friendly'])} for someone named {name}."
    

if __name__ == "__main__":
    # mcp.run(transport="stdio")
    # mcp.run(transport="sse")
    # streamable-http模式,默认port8000,path="/mcp"
    mcp.run(transport="streamable-http")

启动mcp服务端

参考客户端inspector

官方提供了一个调试服务器开发工具,inspector,我们可以用来请求服务器抓包看一下细节。

按照非常简单:

npx @modelcontextprotocol/inspector

要求:Node.js: ^22.7.5

mcp inspector后端服务

如果没有Node环境,或者版本过低,建议使用nvm下载来管理node非常方便。

inspector是TS开发的,它相当于是一个中间商,会启动一个前端服务,让我们可以通过浏览器去调用MCP服务。

mcp inspector前端服务

其实,它中间转了一道手,我们浏览器去请求inspector服务,inspector再去请求真正的MCP服务。

所以,抓包的时候需要注意区分。

因为inspector它是省略的,所以还得我们自己动手抓包来研究,抓包工具我们使用wireshark

Message协议

mcp初始化

Lifecycle

初始化

有些流程是必须的,比如客户端首先得发起一个initialize,告诉服务端协议版本、让服务端列举一下他的能力,json格式如下:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "roots": {
        "listChanged": true
      },
      "sampling": {},
      "elicitation": {}
    },
    "clientInfo": {
      "name": "ExampleClient",
      "title": "Example Client Display Name",
      "version": "1.0.0"
    }
  }
}

服务端必须影响客户端,告诉客户端协议版本,和自己的能力。

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "logging": {},
      "prompts": {
        "listChanged": true
      },
      "resources": {
        "subscribe": true,
        "listChanged": true
      },
      "tools": {
        "listChanged": true
      }
    },
    "serverInfo": {
      "name": "ExampleServer",
      "title": "Example Server Display Name",
      "version": "1.0.0"
    },
    "instructions": "Optional instructions for the client"
  }
}

能力包括:

类别 功能 描述
MCP客户端 roots 提供文件系统根目录的能力
MCP客户端 sampling 支持大语言模型采样请求
MCP客户端 elicitation 支持服务器启发请求
MCP客户端 experimental 描述对非标准实验性功能的支持
MCP服务端 prompts 提供提示词模板
MCP服务端 resources 提供可读取的资源
MCP服务端 tools 暴露可调用的工具
MCP服务端 logging 发出结构化的日志消息
MCP服务端 completions 支持参数自动补全
MCP服务端 experimental 描述对非标准实验性功能的支持

客户端还必须通知服务端,初始化成功了,后面可以交互了:

{
  "jsonrpc": "2.0",
  "method": "notifications/initialized"
}

以tool为例说明交互流程

接下来就可以让服务端列举具体的能力,和调用这些能力了:

例如,客户端就可以发送请求,问服务端,你支持哪些tool啊:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/list",
  "params": {
    "cursor": "optional-cursor-value"
  }
}

然后服务端,就可以告诉客户端,我有哪些tool可以调用,名字是啥、参数是啥、参数类型是啥,它是干啥的…

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "tools": [
      {
        "name": "get_weather",
        "title": "Weather Information Provider",
        "description": "Get current weather information for a location",
        "inputSchema": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string",
              "description": "City name or zip code"
            }
          },
          "required": ["location"]
        }
      }
    ],
    "nextCursor": "next-page-cursor"
  }
}

然后客户端就可以根据大模型(LLM)的返回决定调用那个tool:

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/call",
  "params": {
    "name": "get_weather",
    "arguments": {
      "location": "New York"
    }
  }
}

服务端调用对应工具,将结果返回给客户端:

{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "Current weather in New York:\nTemperature: 72°F\nConditions: Partly cloudy"
      }
    ],
    "isError": false
  }
}

客户端将服务端的结果再给大模型,大模型根据结果和上下文继续处理,直到最终完成。

tools协议

tools

传输协议

传输层协议

传输协议

stdio

这个是AI应用作为MCP的客户端,直接在本机启动服务,然后标准输入输出完成进场间通信,从而获取到结果。

本机必须先启动MCP Server服务。

sse

sse是基于HTTP的协议,它需要一个HTTP层的主连接,一直保持,用来接收MCP服务端的结果。

我们知道一个HTTP连接只能发送一次数据,客户端发送一次数据,服务端响应一次数据,服务端响应可以慢慢响应,只要不关闭流就可以一直发数据。

但是客户端发送了一次请求数据,这个HTTP连接客户端就没有办法再发送了数据了。

sse的办法就是客户端如果还需要发数据,就新开HTTP连接发送请求。

服务端接收到请求之后,返回accept这个连接就结束了,然后根据session_id把这次的请求需要响应的数据通过主HTTp连接发送给客户端(根据session_id找)。

是不是感觉很畸形?为什么要这么设计呢?

我猜是为了避免服务端挂大量HTTP连接,因为客户端的请求的响应资源可能需要传输很多数据,要等很久,使用主HTTP传输数据就只需要保持一个HTTP层的连接挂着就可以。

其他的请求HTTP,请求确认就关闭,避免占用大量服务器资源。

sse流程

我们来抓包看看客户端和服务端的交互

inspector sse抓包http数据

这个是inspector的,它还包含它自己浏览器页面和自己服务端的交互连接,不适很清楚。

我们用自己写的客户端来看看,代码参考后面。

sse http交互包

StreamableHTTP

sse显然不够优雅,所以MCP最新的协议已经弃用了sse,而是使用StreamableHTTP,那什么又是StreamableHTTP呢?

StreamableHTTP听名字就知道肯定也是基于HTTP,其实就是名字唬人,没有扩展什么功能,Streamable,HTTP本来就是Streamable。

当下,它最重要的是2点:

  1. header中通过mcp-session-id来标识客户端身份
  2. 必须支持application/json和text/event-stream这2中类型

当然,还有很多其他协议要求细节。

StreamHTTP流程

我们来看看StreamHTTP的抓包结果:

StreamHTTP http包

我们可以看到StreamHTTP就是直接在一个HTTP连接中返回了结果,注意结果类型是:text/event-stream。

文本流,需要我们自己解析,大致格式如下:

event: message
data: {"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"6"}],"structuredContent":{"result":6},"isError":false}}

event: message告诉客户端这是一个消息,data: 开头的这部分是响应结果数据,json格式,json部分就是我们前面的介绍的message协议部分。

sse与StreamHTTP客户端示例代码

import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class MCPClientStudy {


    private static final String BASE_URL = "http://127.0.0.1:8000";

    private static final String SSE_URL = BASE_URL + "/sse";

    private static final String STREAM_HTTP_URL = BASE_URL + "/mcp";

    private static final RequestConfig REQUEST_CONFIG = RequestConfig.custom()
            .setProtocolUpgradeEnabled(false)
            .setConnectionRequestTimeout(Timeout.ofMinutes(1))
            .setConnectionKeepAlive(TimeValue.ofMinutes(10))
            .build();

    private static final CloseableHttpClient client = HttpClients.custom()
            .setDefaultRequestConfig(REQUEST_CONFIG).build();


    private static final String initialize = """
            {
              "jsonrpc": "2.0",
              "id": 1,
              "method": "initialize",
              "params": {
                "protocolVersion": "2024-11-05",
                "capabilities": {
                  "roots": {
                    "listChanged": true
                  },
                  "sampling": {}
                },
                "clientInfo": {
                  "name": "ExampleClient",
                  "version": "1.0.0"
                }
              }
            }
            """;

    private static final String initialized = """
            {
              "jsonrpc": "2.0",
              "method": "notifications/initialized"
            }
            """;

    private static final String tools_list = """
            {
              "jsonrpc": "2.0",
              "id": 1,
              "method": "tools/list",
              "params": {
                "cursor": "optional-cursor-value"
              }
            }
            """;

    private static final String tool_call = """
            {
               "jsonrpc": "2.0",
               "id": 2,
               "method": "tools/call",
               "params": {
                 "_meta": {
                   "progressToken": 2
                 },
                 "name": "add",
                 "arguments": {
                   "a": 2,
                   "b": 4
                 }
               }
             }
            """;


    public static void main(String[] args) throws Exception {
        streamHttp();
//        sse();
    }

    private static void streamHttp() throws ProtocolException, IOException {
        flow(client, STREAM_HTTP_URL);
    }


    private static void sse() throws Exception {
        // sse需要一个单独的http连接保持来接受数据交互流
        HttpGet request = new HttpGet(SSE_URL);
        request.setHeader("Accept", "text/event-stream");

        HttpClientResponseHandler<Void> responseHandler = response -> {
            int statusCode = response.getCode();
            if (statusCode != 200) {
                log.error("SSE连接失败: {},{}", statusCode, response);
                return null;
            }

            try (InputStream is = response.getEntity().getContent();
                 BufferedReader reader = new BufferedReader(
                         new InputStreamReader(is, StandardCharsets.UTF_8))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (line.isEmpty()) {
                        continue;
                    }
//                    log.info("SSE数据流消息:{}", line);
                    // 服务端告诉的请求端点,就是请求连接信息
                    if ("event: endpoint".equals(line)) {
                        line = reader.readLine();
                        if (line == null) {
                            break; // 不应该,只是为了处理意外
                        }

                        // 数据不会以data:开头
                        String data = line.replaceFirst("data:\\s*", "");
                        // sse只需要session_id相同就可以
                        String trigger_url = BASE_URL + data;
                        log.info("endpoint 触发url:{}", trigger_url);
                        CompletableFuture.runAsync(() -> {
                            try {
                                flow(HttpClients.createDefault(), trigger_url);
//                                    flow(client, trigger_url);
                            } catch (ProtocolException | IOException e) {
                                log.error("流读取中断: {}", e.getMessage(), e);
                            }
                        }).whenComplete((_, e) -> {
                            if (e != null) {
                                log.error("执行请求出错:{}", e.getMessage(), e);
                            }
                        });
                    } else if ("event: message".equals(line)) {
                        line = reader.readLine();
                        if (line == null) {
                            break; // 不应该,只是为了处理意外
                        }
                        line = line.replaceFirst("data:\\s*", "");
                        log.info("MCP SSE 服务端返回到message信息:{}", line);
                    }
                }
            } catch (IOException e) {
                log.error("流读取中断: {}", e.getMessage(), e);
            }
            return null;
        };
        client.execute(request, responseHandler);
    }

    private static void flow(CloseableHttpClient client, String url) throws ProtocolException, IOException {
        String sid = post(client, url, initialize, null);
        post(client, url, initialized, sid);
        post(client, url, tools_list, sid);
        post(client, url, tool_call, sid);
    }

    public static String post(CloseableHttpClient httpClient, String url, String content, String sid) throws IOException {
        HttpPost httpPost = new HttpPost(url);
        httpPost.addHeader("Accept", "application/json, text/event-stream");
        httpPost.addHeader("content-type", "application/json");
        if (sid != null) {
            httpPost.addHeader("mcp-session-id", sid);
        }
        StringEntity entity = new StringEntity(content, ContentType.APPLICATION_JSON);
        httpPost.setEntity(entity);
        return httpClient.execute(httpPost, r -> {
            HttpEntity resultEntity = r.getEntity();
            String postResult = EntityUtils.toString(resultEntity);
            Header header = r.getHeader("mcp-session-id");
            String msid = null;
            if (header != null) {
                msid = header.getValue();
            }
            log.info("\n---------独立请求post---------\nurl:{}\nrBody:{}\nrSid:{}\nresponseBody:{}\n---------独立请求post---------",
                    url, content, msid, postResult);
            return msid;
        });
    }
}

如果你对Python比较熟悉可以看看mcp的实现:

mcp python实现

它这个就是完整实现,比较复杂,初学或者只是为了了解流程,可以自己上手试一下,不然容易陷入到跟代码的海洋之中。

参考

找文档的时候注意,官方是io这个,还有一个info这个不知道是啥,李鬼?

inspector

inspector-github

MCP初始化

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐