目录

  1. 引言
  2. SSE 传输协议核心实现
  3. 双向通信机制分析
  4. 会话关联与路由机制
  5. DNS 重绑定防护安全中间件
  6. Web 服务器环境部署配置
  7. 结论

引言

SSE(Server-Sent Events)传输协议在FastMCP框架中实现了高效的双向通信机制。通过SseServerTransport类提供的两个ASGI应用,系统能够建立持久化的服务器到客户端消息推送通道,并接收客户端的指令请求。本文档深入解析该实现的核心机制,包括会话关联、路由配置和安全防护等关键方面。

SSE 传输协议核心实现

SseServerTransport类是SSE传输协议的核心实现,提供了两个ASGI应用来支持双向通信。该类通过内存流(memory object stream)机制管理客户端会话,并利用UUID作为会话标识符。

本节来源

双向通信机制分析

连接建立与消息推送

connect_sse方法负责建立SSE连接并推送服务器消息。当客户端发起GET请求时,该方法会创建一个新的会话,并通过EventSourceResponse向客户端发送事件流。

"客户端" "SseServerTransport" "内存流" GET /sse 验证请求头 生成UUID会话ID 创建读写内存流 存储会话写入器 发送endpoint事件 发送message事件 loop [消息推送] "客户端" "SseServerTransport" "内存流"

图源

客户端指令接收

handle_post_message方法处理客户端通过POST请求发送的指令。该方法会验证会话ID,并将客户端消息传递给相应的处理流。

"客户端" "SseServerTransport" "会话写入器" POST /messages/?session_id=... 验证请求头 解析session_id参数 查找会话写入器 验证消息格式 发送会话消息 返回202 Accepted "客户端" "SseServerTransport" "会话写入器"

图源

本节来源

会话关联与路由机制

Session ID 机制

SseServerTransport使用UUID作为会话标识符,确保每个SSE连接都有唯一的会话ID。该ID通过查询参数session_id与客户端POST请求关联,实现双向通信的会话绑定。

session_id = uuid4()
self._read_stream_writers[session_id] = read_stream_writer
client_post_uri_data = f"{quote(full_message_path_for_client)}?session_id={session_id.hex}"

Mount Path 路由影响

mount_path参数影响SSE端点的最终路由。通过_normalize_path方法,系统将挂载路径与消息端点路径组合,生成客户端使用的完整路径。

开始
获取mount_path
获取message_path
normalize_path(mount_path, message_path)
组合路径
返回完整路径

图源

本节来源

DNS 重绑定防护安全中间件

TransportSecurityMiddleware实现了DNS重绑定攻击防护,通过验证请求头确保安全性。

安全验证流程

失败
失败
失败
请求进入
是否为POST请求?
验证Content-Type
返回400错误
DNS重绑定防护启用?
通过验证
验证Host头
返回421错误
验证Origin头
返回403错误
结束

图源

配置选项

安全中间件支持以下配置:

  • enable_dns_rebinding_protection: 启用DNS重绑定防护
  • allowed_hosts: 允许的Host头值列表
  • allowed_origins: 允许的Origin头值列表

本节来源

Web 服务器环境部署配置

Starlette 集成示例

在Starlette应用中集成SSE传输的典型配置如下:

# 创建SSE传输实例
sse = SseServerTransport("/messages/")

# 配置路由
routes = [
    Route("/sse", endpoint=handle_sse, methods=["GET"]),
    Mount("/messages/", app=sse.handle_post_message),
]

# 处理SSE连接
async def handle_sse(request):
    async with sse.connect_sse(
        request.scope, request.receive, request._send
    ) as streams:
        await app.run(
            streams[0], streams[1], app.create_initialization_options()
        )
    return Response()  # 必须返回Response避免TypeError

FastMCP 集成示例

使用FastMCP框架的简化集成:

from mcp.server.fastmcp import FastMCP

# 创建MCP服务器
mcp = FastMCP("My App")

# 添加工具
@mcp.tool()
def hello() -> str:
    return "Hello from MCP!"

# 挂载StreamableHTTP服务器
app = Starlette(
    routes=[
        Mount("/", app=mcp.streamable_http_app()),
    ]
)

本节来源

结论

SSE传输协议在FastMCP中的实现提供了一套完整的双向通信解决方案。通过SseServerTransport类的两个ASGI应用,系统实现了服务器消息推送和客户端指令接收的分离。会话ID机制确保了通信的关联性,而安全中间件则提供了必要的防护。在Web服务器环境中,通过简单的路由配置即可完成集成,为实时通信应用提供了高效可靠的基础设施。

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐