目录

  1. 引言
  2. 项目结构
  3. 核心组件
  4. 架构概述
  5. 详细组件分析
  6. 依赖分析
  7. 性能考虑
  8. 故障排除指南
  9. 结论

引言

本文档全面解析Streamable HTTP协议的流式传输机制,涵盖SSE(Server-Sent Events)和JSON双模式响应。详细说明服务端如何根据Accept头(text/event-stream和application/json)动态选择响应模式。深入分析EventSourceResponse的集成实现,包括内存流(MemoryObjectStream)在请求-响应生命周期中的管理,以及如何通过data_sender_callable实现异步数据推送。解释客户端EventSource的连接建立、事件解析和错误处理机制。通过代码示例展示初始化请求、通知消息和响应消息在流式通道中的传输过程。提供流式性能调优建议,如缓存控制、连接保持和超时设置。

项目结构

本项目采用分层架构设计,主要包含客户端和服务器端实现。核心流式传输功能位于src/mcp/server/streamable_http.pysrc/mcp/client/streamable_http.py文件中。服务器端通过StreamableHTTPServerTransport类处理HTTP请求,支持SSE流式响应和JSON响应两种模式。客户端通过StreamableHTTPTransport类发起请求并处理响应。示例服务器位于examples/servers/simple-streamablehttp目录,展示了完整的流式HTTP服务实现,包括事件存储和会话管理功能。

服务器端
客户端
StreamableHTTPServerTransport
StreamableHTTPSessionManager
InMemoryEventStore
StreamableHTTPTransport
RequestContext

图示来源

本节来源

核心组件

核心组件包括StreamableHTTPServerTransportStreamableHTTPTransport两个主要类,分别负责服务器端和客户端的流式传输功能。服务器端通过检查Accept头来决定响应模式,同时支持SSE流式传输和JSON响应。客户端通过streamablehttp_client上下文管理器建立连接,处理SSE事件流。会话管理由StreamableHTTPSessionManager负责,支持有状态和无状态两种模式。事件存储接口EventStore为连接恢复功能提供支持,允许客户端断线重连后继续接收消息。

本节来源

架构概述

系统采用ASGI兼容的架构设计,支持Starlette等现代Python Web框架。服务器端通过StreamableHTTPSessionManager管理会话生命周期,每个会话对应一个StreamableHTTPServerTransport实例。客户端使用httpx库进行HTTP通信,通过anyio库的内存流实现异步数据传输。SSE流通过EventSourceResponse类实现,利用data_sender_callable参数进行异步数据推送。整个架构支持双向通信,服务器可以主动向客户端推送通知消息。

服务器端
网络
客户端
ASGI应用
会话管理器
传输层
MCP服务器
SSE流
JSON请求
客户端应用
httpx.AsyncClient

图示来源

详细组件分析

服务器端传输分析

StreamableHTTPServerTransport类是服务器端的核心组件,负责处理HTTP请求和响应。它通过_check_accept_headers方法检查客户端的Accept头,确定是否支持SSE和JSON响应。POST请求处理逻辑在_handle_post_request方法中实现,根据is_json_response_enabled配置决定返回JSON响应还是SSE流。GET请求用于建立SSE连接,允许服务器主动向客户端推送消息。DELETE请求用于显式终止会话。

"使用"
"实现"
StreamableHTTPServerTransport
+mcp_session_id : str
+is_json_response_enabled : bool
+_event_store : EventStore
+_security : TransportSecurityMiddleware
+handle_request(scope, receive, send)
+_handle_post_request(scope, request, receive, send)
+_handle_get_request(request, send)
+_handle_delete_request(request, send)
+terminate()
«interface»
EventStore
+store_event(stream_id, message)
+replay_events_after(last_event_id, send_callback)
InMemoryEventStore
+max_events_per_stream : int
+streams : dict[StreamId, deque[EventEntry]]
+event_index : dict[EventId, EventEntry]
+store_event(stream_id, message)
+replay_events_after(last_event_id, send_callback)

图示来源

本节来源

客户端传输分析

StreamableHTTPTransport类负责客户端的流式传输功能。它通过_handle_sse_event方法处理SSE事件,解析服务器推送的消息。handle_get_stream方法用于处理服务器发起的SSE连接,允许服务器主动向客户端发送消息。post_writer方法处理POST请求的写入操作,通过任务组并发处理多个请求。客户端支持连接恢复功能,通过Last-Event-ID头实现断线重连后的消息续传。

"客户端" "StreamableHTTPTransport" "服务器" 发起请求 POST请求 建立SSE连接 SSE事件 解析消息 loop [消息推送] 关闭连接 DELETE请求 "客户端" "StreamableHTTPTransport" "服务器"

图示来源

本节来源

会话管理分析

StreamableHTTPSessionManager类负责管理服务器端的会话生命周期。在有状态模式下,它为每个会话创建独立的StreamableHTTPServerTransport实例,并通过会话ID进行跟踪。在无状态模式下,为每个请求创建新的传输实例。会话管理器通过run上下文管理器确保正确的生命周期管理,只能调用一次。它与MCP服务器实例集成,处理初始化选项和状态管理。

开始
无状态模式?
创建新传输实例
会话存在?
处理现有会话
创建新会话
启动服务器任务
处理请求
终止传输
结束

图示来源

本节来源

依赖分析

系统依赖关系清晰,各组件职责分明。服务器端核心依赖anyio库的内存流进行异步数据传输,依赖httpx库进行HTTP通信,依赖sse_starlette库实现SSE响应。客户端同样依赖这些库,并通过pydantic进行数据验证。会话管理器依赖服务器核心组件和传输层组件。事件存储作为可选组件,通过接口抽象支持不同的存储实现。整个系统设计遵循依赖倒置原则,高层模块不依赖低层模块的具体实现。

StreamableHTTPTransport
httpx
anyio
sse_starlette
StreamableHTTPServerTransport
StreamableHTTPSessionManager
InMemoryEventStore
MCP服务器
客户端应用
服务器应用

图示来源

本节来源

性能考虑

流式传输系统的性能优化主要集中在连接管理和内存使用上。SSE连接应配置适当的超时时间,避免长时间空闲连接占用服务器资源。内存流的缓冲区大小需要合理设置,过小会导致频繁的上下文切换,过大会增加内存消耗。事件存储的实现应考虑持久化需求,生产环境不应使用内存存储。服务器端应限制并发连接数,防止资源耗尽。客户端应实现重连机制,处理网络不稳定情况。HTTP头的缓存控制应设置为no-cache, no-transform,确保实时性。

本节来源

故障排除指南

常见问题包括SSE连接失败、消息丢失和会话超时。SSE连接失败通常由CORS配置不当或代理服务器缓冲引起,应确保响应头正确设置。消息丢失可能由于客户端未正确处理Last-Event-ID头,应检查事件存储实现。会话超时问题可通过调整超时设置和实现心跳机制解决。服务器端应记录详细的日志,便于排查连接问题。客户端应实现错误重试机制,处理临时性网络故障。对于生产环境,建议使用外部事件存储替代内存存储,确保消息可靠性。

本节来源

结论

Streamable HTTP协议提供了一种灵活的流式传输机制,支持SSE和JSON双模式响应。通过合理的架构设计和组件分离,实现了高效、可靠的双向通信。会话管理和事件存储功能为连接恢复提供了支持,增强了系统的健壮性。性能优化和错误处理机制确保了系统在各种网络条件下的稳定性。该实现为构建实时应用提供了坚实的基础,适用于需要服务器主动推送消息的场景。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐