在这里插入图片描述

大家好,我是Halcyon.平安

欢迎文末添加好友交流,共同进步!

在这里插入图片描述


一、本篇概述

上篇讲了 initModel()sendMessage()(全量请求),本篇聚焦 sendMessageStream()(流式请求)。这是整个 SDK 最复杂的函数,涉及 SSE 协议解析、数据缓冲、逐块回调。

sendMessageStream() 的工作方式:
  用户提问 → SDK 发起 HTTP 请求 → DeepSeek 边生成边返回
  → SDK 逐块解析 → 每解析出一段文字就通过 callback 通知上层
  → 上层(ChatServer)立即推送给前端 → 用户看到"打字机"效果

二、什么是 SSE?

在深入代码之前,需要先理解 SSE(Server-Sent Events,服务器推送事件)

2.1 SSE vs 普通请求

普通 HTTP 请求(sendMessage 全量):
  客户端 ──请求──→ 服务器
  客户端 ←──等待────────
  客户端 ←──────────── 完整响应(一次性)

SSE 流式请求(sendMessageStream):
  客户端 ──请求──→ 服务器
  客户端 ←──chunk1──  "你"
  客户端 ←──chunk2──  "好"
  客户端 ←──chunk3──  "!"
  客户端 ←──chunk4──  "我是AI助手"
  客户端 ←──[DONE]──  结束

2.2 SSE 数据格式

SSE 是一种文本协议,每条消息用 \n\n(两个换行)分隔。DeepSeek 返回的原始数据大概长这样:

data: {"choices":[{"delta":{"content":"你"}}]}\n\n
data: {"choices":[{"delta":{"content":"好"}}]}\n\n
data: {"choices":[{"delta":{"content":"!"}}]}\n\n
: comment\n\n
data: {"choices":[{"delta":{"content":"我是AI助手"}}]}\n\n
data: [DONE]\n\n

规则:

  • 每条消息以 data: 开头
  • : 开头的是注释行,需要忽略
  • data: [DONE]结束标记
  • 每条消息之间用 \n\n 分隔

三、与 sendMessage 的相同部分

sendMessageStream() 的前半部分(构造请求参数、历史消息、请求体)和 sendMessage() 几乎一样,只有两处区别:

3.1 请求体多了 “stream”: true

requestBody["stream"] = true;

这一行告诉 DeepSeek API:请用流式方式返回结果。如果设为 false 或不设,就返回完整结果。

3.2 读取超时更长

// sendMessage 全量
client.set_read_timeout(60, 0);    // 60秒

// sendMessageStream 流式
client.set_read_timeout(300, 0);   // 300秒

流式响应是边生成边传输的,模型生成一段文字就发一段,整个过程的持续时间远比全量请求长。如果设太短,长文本生成还没结束就超时了。

3.3 请求头多了 Accept

{"Accept", "text/event-stream"}

告诉服务器客户端期望接收 SSE 格式的流式数据。


四、流式处理变量

在发送请求之前,先声明一组用于流式处理的变量:

std::string buffer;          // 接受流式响应的数据块
bool gotError = false;       // 标记响应是否成功
std::string errorMsg;        // 错误描述符
int statusCode = 0;          // 响应状态码
bool streamFinish = false;   // 标记流式响应是否完成
std::string fullResponse;    // 累积完整的响应
变量 类型 作用
buffer string 数据缓冲区。网络传输中,一次 recv 可能收到不完整的数据,需要缓冲拼接
gotError bool 一旦 HTTP 状态码非 200,设为 true,后续接收器直接终止
errorMsg string 存储错误描述(如 "HTTP status code: 401"
statusCode int 存储响应状态码(本代码中声明了但未实际使用)
streamFinish bool 收到 [DONE]标记后设为 true,用于最后检查流是否正常结束
fullResponse string 累积所有增量文本,最终作为函数返回值返回给调用方

为什么需要 buffer?

网络传输的特点是:一次 recv 调用可能收到任意数量的数据。可能一次收到半个 SSE 事件,也可能一次收到三个完整事件。所以需要一个缓冲区来拼接数据,等凑够一个完整事件(找到 \n\n 分隔符) 再解析。

第 1 次 recv: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda"
                                                         ↑ 分隔符 ↑ 不完整

buffer 中: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda"
           ├────── 完整事件,可以解析 ──────┤└─ 留在 buffer 等下次 ─┘

第 2 次 recv: "ta: {\"choices\":[{\"delta\":{\"content\":\"好\"}}]}\n\n"
              与 buffer 中残留的 "da" 拼接成完整事件

五、构造 Request 对象

5.1 为什么不用 client.Post()?

上篇的全量请求直接用 client.Post(),它是一次性发送请求、等待完整响应。但流式请求需要边接收边处理,所以要用更底层的 httplib::Request 对象,手动设置两个回调:

httplib::Request req;
req.method = "POST";
req.path = "/v1/chat/completions";
req.headers = headers;
req.body = requestBodyStr;
字段 赋值 说明
method "POST" HTTP 方法
path "/v1/chat/completions" 请求路径
headers 之前构造的 headers 包含认证、内容类型、SSE 接受头
body 序列化后的 JSON 字符串 请求体

client.Post(path, headers, body, type) 的参数是对应的,只是拆成了结构体的字段。

5.2 response_handler — 响应头处理器

req.response_handler = [&](const httplib::Response& res) {
    if(res.status != 200){
        gotError = true;
        errorMsg = "HTTP status code: " + std::to_string(res.status);
        return false;    // 终止请求
    }
    return true;   // 继续接收后续数据
};

这个回调在 收到 HTTP 响应头时 被调用(注意:此时还没收到 body 数据):

  • [&] — Lambda 按引用捕获所有外部变量,这样可以修改 gotErrorerrorMsg
  • res.status != 200 — 如果状态码不是 200(比如 401 认证失败),说明请求出错了
  • gotError = true — 设置错误标记,后续 content_receiver 会检查这个标记
  • return false — 告诉 cpp-httplib 终止请求,不需要继续接收 body 数据了
  • return true — 状态码正常,继续接收后续的 body 数据

执行时序:

client.send(req)
  │
  ├── TCP 连接建立
  ├── 发送请求数据
  ├── 收到响应头 ──→ response_handler 被调用
  │     ├── status != 200 → gotError=true, return false → 终止
  │     └── status == 200 → return true → 继续
  │
  ├── 收到第 1 块 body ──→ content_receiver 被调用
  ├── 收到第 2 块 body ──→ content_receiver 被调用
  ├── ...
  └── 接收完毕 ──→ client.send() 返回

5.3 content_receiver — 数据接收处理器

这是流式处理的核心,每收到一块数据就会被 cpp-httplib 调用:

req.content_receiver = [&](const char* data, size_t len, size_t offset, size_t totalLength){

四个参数的含义:

参数 类型 说明
data const char* 指向本次收到的原始数据的指针
len size_t 本次收到数据的字节长度
offset size_t 当前数据在整个响应中的偏移量(本代码未使用)
totalLength size_t 响应体总长度(本代码未使用)

下面逐块解析 content_receiver 的内部逻辑。


六、content_receiver 逐行解析

6.1 错误检查

if(gotError){
    return false;
}

入口先检查 response_handler 是否标记了错误。如果 HTTP 状态码不是 200,gotError 已经是 true,直接返回 false 终止接收。

6.2 追加数据到 buffer

buffer.append(data, len);

std::string::append(const char* s, size_t n) — 将 data 指向的前 len 个字节追加到 buffer 末尾。

为什么不直接用 buffer += data?因为 data 是原始的字节指针,不一定以 \0 结尾。operator+= 遇到 \0 就停了,而 append(data, len) 精确地追加 len 个字节,即使中间有 \0

6.3 按 \n\n 分割并处理每个事件

size_t pos = 0;
while((pos = buffer.find("\n\n")) != std::string::npos){
    std::string chunk = buffer.substr(0, pos);
    buffer.erase(0, pos + 2);
  • buffer.find("\n\n") — 在缓冲区中查找 SSE 事件分隔符 \n\n
  • std::string::npos — 表示没找到,是 string::find 失败时的返回值(值为 -1,但类型是 size_t 即无符号最大值)
  • buffer.substr(0, pos) — 从位置 0 开始截取 pos 个字符,得到一个完整的 SSE 事件文本
  • buffer.erase(0, pos + 2) — 从 buffer 中删除已处理的部分,pos + 2 是数据长度加上 \n\n 的 2 个字符

用 while 而不是 if 的原因: 一次 recv 可能收到多个完整事件,需要循环处理直到 buffer 中没有完整的 \n\n 分隔的事件为止。

buffer 内容: "data: {...}\n\ndata: {...}\n\ndata: {...Incomplete"
              ↑ 第 1 个 \n\n  ↑ 第 2 个 \n\n
while 循环 3 次:
  第 1 次:提取 "data: {...}",删除已处理部分
  第 2 次:提取 "data: {...}",删除已处理部分
  第 3 次:find("\n\n") == npos → 退出循环
  buffer 中剩余 "data: {...Incomplete" 等待下次 recv 拼接

6.4 过滤无效事件

if(chunk.empty() || chunk[0] == ':'){
    continue;
}
  • chunk.empty() — 空事件,跳过
  • chunk[0] == ':' — SSE 协议规定以 : 开头的行是注释(comment),服务器可能用来维持连接(心跳),需要忽略
可能收到的数据:
  "\n\n"                 → chunk 为空,跳过
  ": keep-alive\n\n"     → chunk 以 ':' 开头,跳过
  "data: {...}\n\n"      → 有效数据,继续处理

6.5 提取 data 字段

if(chunk.compare(0, 6, "data: ") == 0){
    std::string modelData = chunk.substr(6);
  • chunk.compare(0, 6, "data: ") — 从 chunk 的第 0 个位置开始,取 6 个字符,与 "data: " 比较。等于 0 表示匹配
  • chunk.substr(6) — 去掉 "data: " 前缀(6 个字符),得到后面的 JSON 数据

等价于:

if(chunk.starts_with("data: ")){       // C++20 写法
    std::string modelData = chunk.substr(6);
}

compare 是 C++98 就有的方法,兼容性更好。

6.6 检查结束标记

if(modelData == "[DONE]"){
    callback("", true);
    streamFinish = true;
    return true;
}
  • modelData == "[DONE]" — DeepSeek 在流式数据最后会发送 data: [DONE],表示生成结束
  • callback("", true) — 调用回调通知上层:没有更多内容了(第一个参数为空),流式结束(第二个参数为 true
  • streamFinish = true — 标记流式正常结束
  • return true — 告诉 cpp-httplib 继续接收(虽然后面不会有什么有效数据了)

6.7 解析 JSON 增量数据

Json::Value modelDataJson;
Json::CharReaderBuilder reader;
std::string errors;
std::istringstream modelDataStream(modelData);
if(Json::parseFromStream(reader, modelDataStream, &modelDataJson, &errors)){

sendMessage() 中的 JSON 解析逻辑完全一样:

  • Json::CharReaderBuilder — JSON 解析器构建器
  • std::istringstream modelDataStream(modelData) — 将 data: 后面的 JSON 字符串包装成流
  • Json::parseFromStream() — 解析 JSON,成功返回 true

每个 SSE 事件中的 JSON 长这样:

{"choices":[{"delta":{"content":"你"}}]}

注意流式响应用的是 **delta** 而不是 message

对比 全量响应(sendMessage) 流式响应(sendMessageStream)
字段路径 choices[0].message.content choices[0].delta.content
含义 完整的回复内容 本次增量内容(一个字或几个字)
出现次数 一次 多次,逐段返回

6.8 多重检查提取增量内容

if(modelDataJson.isMember("choices") &&
    modelDataJson["choices"].isArray() &&
    !modelDataJson["choices"].empty() &&
    modelDataJson["choices"][0].isMember("delta") &&
    modelDataJson["choices"][0]["delta"].isMember("content")){
    std::string content = modelDataJson["choices"][0]["delta"]["content"].asString();

5 层检查,逐层深入:

  1. isMember("choices") — 有 choices 字段吗?
  2. isArray() — 是数组吗?
  3. !empty() — 数组不为空吗?
  4. choices[0].isMember("delta") — 第一个元素有 delta 字段吗?
  5. delta.isMember("content")delta 里有 content 字段吗?

这样做的原因:流式响应的某些事件可能没有 **content** 字段。比如模型生成结束前的最后一个事件可能只包含 finish_reason: "stop" 而没有 content。不做检查直接访问会崩溃。

6.9 累积并回调

fullResponse += content;
callback(content, false);
  • fullResponse += content — 将增量内容追加到完整回复字符串中,最终作为函数返回值
  • callback(content, false) — 调用回调通知上层:收到一段新内容(第一个参数),还没结束(第二个参数为 false

这两个操作是并行的:一边把碎片拼成完整回复(fullResponse),一边实时通知上层(callback)

6.10 JSON 解析失败的容错

}else{
    WARN("DeepSeekProvider sendMessageStream parse modelDataJson error: {}", errors);
}

如果某个 chunk 的 JSON 解析失败,只打一条 WARN 日志,不中断流式接收。这是合理的,因为个别事件格式异常不应影响整体流程,后续事件可能仍然正常。

6.11 content_receiver 返回

return true;

while 循环处理完 buffer 中所有完整事件后,返回 true 告诉 cpp-httplib 继续接收后续数据。如果 buffer 中还有不完整的数据(没找到 \n\n),会留在 buffer 中等下次 content_receiver 被调用时拼接。


七、发送请求与收尾

7.1 发送请求

auto result = client.send(req);
if(!result){
    ERR("Network error {}", to_string(result.error()));
    return "";
}
  • client.send(req) — 发送请求。与 client.Post() 不同,send() 接受一个 Request 对象,会使用我们设置的 response_handlercontent_receiver
  • resulthttplib::Result 类型,重载了 operator bool()
  • 网络失败时(DNS 解析失败、连接超时等)resultfalseresult.error() 返回具体的错误类型

7.2 确保流式正常结束

if(!streamFinish){
    WARN("stream ended without [DONE] marker");
    callback("", true);
}

如果整个请求结束了但 streamFinish 仍然是 false,说明没有收到 **[DONE]** 标记。可能的原因:

  • 网络中断,数据没传完
  • 服务器异常,提前关闭了连接

不管什么原因,都要调用 callback("", true) 通知上层流式已结束,避免上层一直在等。

7.3 返回完整回复

return fullResponse;

fullResponsecontent_receiver 中逐段累积,到这里包含了模型的完整回复文本,和 sendMessage() 返回的内容一样,只是获取方式不同(一个是直接从响应体提取,一个是逐步拼接)。


八、完整流程图

sendMessageStream(messages, requestParam, callback)
  │
  ├─ 1. 检查 isAvailable()
  │
  ├─ 2. 构造 JSON 请求体(stream: true)
  │
  ├─ 3. 创建 HTTP Client(超时 300 秒)
  │
  ├─ 4. 构造 Request,设置两个回调:
  │     ├── response_handler  → 检查 HTTP 状态码
  │     └── content_receiver  → 逐块处理 SSE 数据
  │
  ├─ 5. client.send(req) ──→ 请求发出
  │     │
  │     ├── 收到响应头 → response_handler
  │     │     └── status != 200 → gotError = true
  │     │
  │     └── 收到每块 body → content_receiver
  │           │
  │           ├── 检查 gotError → 有错则终止
  │           ├── 追加到 buffer
  │           ├── while (找到 \n\n)
  │           │     ├── 提取 chunk
  │           │     ├── 空行/注释 → 跳过
  │           │     ├── "data: [DONE]" → callback("", true), 结束
  │           │     └── "data: {json}" → 解析 JSON
  │           │           └── choices[0].delta.content
  │           │                 ├── fullResponse += content
  │           │                 └── callback(content, false)
  │           └── return true(继续接收)
  │
  ├─ 6. 检查 streamFinish
  │     └── 未收到 [DONE] → callback("", true) 兜底
  │
  └─ 7. return fullResponse(累积的完整回复)

九、全量 vs 流式对比总结

对比项 sendMessage(全量) sendMessageStream(流式)
请求体 stream 字段 "stream": true
读取超时 60 秒 300 秒
请求头 Authorization + Content-Type 多一个 Accept: text/event-stream
请求方式 client.Post() client.send(req) + 回调
响应格式 完整 JSON 多个 SSE 事件,\n\n 分隔
内容字段 choices[0].message.content choices[0].delta.content
结束标记 无(整个响应就是完整的) data: [DONE]
返回方式 一次性返回完整文本 callback 逐段回调 + 最终返回完整文本

核心区别就一句话:全量是等完再给,流式是边收边给


十、总结

sendMessageStream() 的难点在于:

  1. SSE 协议解析 — 需要理解 data: 前缀、\n\n 分隔符、[DONE] 结束标记
  2. Buffer 缓冲机制 — 网络传输的数据边界不确定,必须缓冲拼接后按 \n\n 分割
  3. 双重回调架构response_handler 检查状态码,content_receiver 逐块处理数据
  4. 容错处理 — 注释行过滤、JSON 解析失败不中断、缺少 [DONE] 的兜底回调

后续将实现 ChatGPT 和 Gemini 模型接入,它们的整体结构与 DeepSeek 类似,但在 API 端点、SSE 数据格式上有重要差异。

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐