【C++ AI 大模型接入 SDK】 - DeepSeek 模型接入(下)

欢迎文末添加好友交流,共同进步!

一、本篇概述
上篇讲了 initModel() 和 sendMessage()(全量请求),本篇聚焦 sendMessageStream()(流式请求)。这是整个 SDK 最复杂的函数,涉及 SSE 协议解析、数据缓冲、逐块回调。
sendMessageStream() 的工作方式:
用户提问 → SDK 发起 HTTP 请求 → DeepSeek 边生成边返回
→ SDK 逐块解析 → 每解析出一段文字就通过 callback 通知上层
→ 上层(ChatServer)立即推送给前端 → 用户看到"打字机"效果
二、什么是 SSE?
在深入代码之前,需要先理解 SSE(Server-Sent Events,服务器推送事件)。
2.1 SSE vs 普通请求
普通 HTTP 请求(sendMessage 全量):
客户端 ──请求──→ 服务器
客户端 ←──等待────────
客户端 ←──────────── 完整响应(一次性)
SSE 流式请求(sendMessageStream):
客户端 ──请求──→ 服务器
客户端 ←──chunk1── "你"
客户端 ←──chunk2── "好"
客户端 ←──chunk3── "!"
客户端 ←──chunk4── "我是AI助手"
客户端 ←──[DONE]── 结束
2.2 SSE 数据格式
SSE 是一种文本协议,每条消息用 \n\n(两个换行)分隔。DeepSeek 返回的原始数据大概长这样:
data: {"choices":[{"delta":{"content":"你"}}]}\n\n
data: {"choices":[{"delta":{"content":"好"}}]}\n\n
data: {"choices":[{"delta":{"content":"!"}}]}\n\n
: comment\n\n
data: {"choices":[{"delta":{"content":"我是AI助手"}}]}\n\n
data: [DONE]\n\n
规则:
- 每条消息以
data:开头 - 以
:开头的是注释行,需要忽略 data: [DONE]是结束标记- 每条消息之间用
\n\n分隔
三、与 sendMessage 的相同部分
sendMessageStream() 的前半部分(构造请求参数、历史消息、请求体)和 sendMessage() 几乎一样,只有两处区别:
3.1 请求体多了 “stream”: true
requestBody["stream"] = true;
这一行告诉 DeepSeek API:请用流式方式返回结果。如果设为 false 或不设,就返回完整结果。
3.2 读取超时更长
// sendMessage 全量
client.set_read_timeout(60, 0); // 60秒
// sendMessageStream 流式
client.set_read_timeout(300, 0); // 300秒
流式响应是边生成边传输的,模型生成一段文字就发一段,整个过程的持续时间远比全量请求长。如果设太短,长文本生成还没结束就超时了。
3.3 请求头多了 Accept
{"Accept", "text/event-stream"}
告诉服务器客户端期望接收 SSE 格式的流式数据。
四、流式处理变量
在发送请求之前,先声明一组用于流式处理的变量:
std::string buffer; // 接受流式响应的数据块
bool gotError = false; // 标记响应是否成功
std::string errorMsg; // 错误描述符
int statusCode = 0; // 响应状态码
bool streamFinish = false; // 标记流式响应是否完成
std::string fullResponse; // 累积完整的响应
| 变量 | 类型 | 作用 |
|---|---|---|
buffer |
string |
数据缓冲区。网络传输中,一次 recv 可能收到不完整的数据,需要缓冲拼接 |
gotError |
bool |
一旦 HTTP 状态码非 200,设为 true,后续接收器直接终止 |
errorMsg |
string |
存储错误描述(如 "HTTP status code: 401") |
statusCode |
int |
存储响应状态码(本代码中声明了但未实际使用) |
streamFinish |
bool |
收到 [DONE]标记后设为 true,用于最后检查流是否正常结束 |
fullResponse |
string |
累积所有增量文本,最终作为函数返回值返回给调用方 |
为什么需要 buffer?
网络传输的特点是:一次 recv 调用可能收到任意数量的数据。可能一次收到半个 SSE 事件,也可能一次收到三个完整事件。所以需要一个缓冲区来拼接数据,等凑够一个完整事件(找到 \n\n 分隔符) 再解析。
第 1 次 recv: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda"
↑ 分隔符 ↑ 不完整
buffer 中: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda"
├────── 完整事件,可以解析 ──────┤└─ 留在 buffer 等下次 ─┘
第 2 次 recv: "ta: {\"choices\":[{\"delta\":{\"content\":\"好\"}}]}\n\n"
与 buffer 中残留的 "da" 拼接成完整事件
五、构造 Request 对象
5.1 为什么不用 client.Post()?
上篇的全量请求直接用 client.Post(),它是一次性发送请求、等待完整响应。但流式请求需要边接收边处理,所以要用更底层的 httplib::Request 对象,手动设置两个回调:
httplib::Request req;
req.method = "POST";
req.path = "/v1/chat/completions";
req.headers = headers;
req.body = requestBodyStr;
| 字段 | 赋值 | 说明 |
|---|---|---|
method |
"POST" |
HTTP 方法 |
path |
"/v1/chat/completions" |
请求路径 |
headers |
之前构造的 headers | 包含认证、内容类型、SSE 接受头 |
body |
序列化后的 JSON 字符串 | 请求体 |
和 client.Post(path, headers, body, type) 的参数是对应的,只是拆成了结构体的字段。
5.2 response_handler — 响应头处理器
req.response_handler = [&](const httplib::Response& res) {
if(res.status != 200){
gotError = true;
errorMsg = "HTTP status code: " + std::to_string(res.status);
return false; // 终止请求
}
return true; // 继续接收后续数据
};
这个回调在 收到 HTTP 响应头时 被调用(注意:此时还没收到 body 数据):
[&]— Lambda 按引用捕获所有外部变量,这样可以修改gotError和errorMsgres.status != 200— 如果状态码不是 200(比如 401 认证失败),说明请求出错了gotError = true— 设置错误标记,后续content_receiver会检查这个标记return false— 告诉 cpp-httplib 终止请求,不需要继续接收 body 数据了return true— 状态码正常,继续接收后续的 body 数据
执行时序:
client.send(req)
│
├── TCP 连接建立
├── 发送请求数据
├── 收到响应头 ──→ response_handler 被调用
│ ├── status != 200 → gotError=true, return false → 终止
│ └── status == 200 → return true → 继续
│
├── 收到第 1 块 body ──→ content_receiver 被调用
├── 收到第 2 块 body ──→ content_receiver 被调用
├── ...
└── 接收完毕 ──→ client.send() 返回
5.3 content_receiver — 数据接收处理器
这是流式处理的核心,每收到一块数据就会被 cpp-httplib 调用:
req.content_receiver = [&](const char* data, size_t len, size_t offset, size_t totalLength){
四个参数的含义:
| 参数 | 类型 | 说明 |
|---|---|---|
data |
const char* |
指向本次收到的原始数据的指针 |
len |
size_t |
本次收到数据的字节长度 |
offset |
size_t |
当前数据在整个响应中的偏移量(本代码未使用) |
totalLength |
size_t |
响应体总长度(本代码未使用) |
下面逐块解析 content_receiver 的内部逻辑。
六、content_receiver 逐行解析
6.1 错误检查
if(gotError){
return false;
}
入口先检查 response_handler 是否标记了错误。如果 HTTP 状态码不是 200,gotError 已经是 true,直接返回 false 终止接收。
6.2 追加数据到 buffer
buffer.append(data, len);
std::string::append(const char* s, size_t n) — 将 data 指向的前 len 个字节追加到 buffer 末尾。
为什么不直接用 buffer += data?因为 data 是原始的字节指针,不一定以 \0 结尾。operator+= 遇到 \0 就停了,而 append(data, len) 精确地追加 len 个字节,即使中间有 \0。
6.3 按 \n\n 分割并处理每个事件
size_t pos = 0;
while((pos = buffer.find("\n\n")) != std::string::npos){
std::string chunk = buffer.substr(0, pos);
buffer.erase(0, pos + 2);
buffer.find("\n\n")— 在缓冲区中查找 SSE 事件分隔符\n\nstd::string::npos— 表示没找到,是string::find失败时的返回值(值为-1,但类型是size_t即无符号最大值)buffer.substr(0, pos)— 从位置 0 开始截取pos个字符,得到一个完整的 SSE 事件文本buffer.erase(0, pos + 2)— 从 buffer 中删除已处理的部分,pos + 2是数据长度加上\n\n的 2 个字符
用 while 而不是 if 的原因: 一次 recv 可能收到多个完整事件,需要循环处理直到 buffer 中没有完整的 \n\n 分隔的事件为止。
buffer 内容: "data: {...}\n\ndata: {...}\n\ndata: {...Incomplete"
↑ 第 1 个 \n\n ↑ 第 2 个 \n\n
while 循环 3 次:
第 1 次:提取 "data: {...}",删除已处理部分
第 2 次:提取 "data: {...}",删除已处理部分
第 3 次:find("\n\n") == npos → 退出循环
buffer 中剩余 "data: {...Incomplete" 等待下次 recv 拼接
6.4 过滤无效事件
if(chunk.empty() || chunk[0] == ':'){
continue;
}
chunk.empty()— 空事件,跳过chunk[0] == ':'— SSE 协议规定以:开头的行是注释(comment),服务器可能用来维持连接(心跳),需要忽略
可能收到的数据:
"\n\n" → chunk 为空,跳过
": keep-alive\n\n" → chunk 以 ':' 开头,跳过
"data: {...}\n\n" → 有效数据,继续处理
6.5 提取 data 字段
if(chunk.compare(0, 6, "data: ") == 0){
std::string modelData = chunk.substr(6);
chunk.compare(0, 6, "data: ")— 从 chunk 的第 0 个位置开始,取 6 个字符,与"data: "比较。等于 0 表示匹配chunk.substr(6)— 去掉"data: "前缀(6 个字符),得到后面的 JSON 数据
等价于:
if(chunk.starts_with("data: ")){ // C++20 写法
std::string modelData = chunk.substr(6);
}
compare 是 C++98 就有的方法,兼容性更好。
6.6 检查结束标记
if(modelData == "[DONE]"){
callback("", true);
streamFinish = true;
return true;
}
modelData == "[DONE]"— DeepSeek 在流式数据最后会发送data: [DONE],表示生成结束callback("", true)— 调用回调通知上层:没有更多内容了(第一个参数为空),流式结束(第二个参数为true)streamFinish = true— 标记流式正常结束return true— 告诉 cpp-httplib 继续接收(虽然后面不会有什么有效数据了)
6.7 解析 JSON 增量数据
Json::Value modelDataJson;
Json::CharReaderBuilder reader;
std::string errors;
std::istringstream modelDataStream(modelData);
if(Json::parseFromStream(reader, modelDataStream, &modelDataJson, &errors)){
和 sendMessage() 中的 JSON 解析逻辑完全一样:
Json::CharReaderBuilder— JSON 解析器构建器std::istringstream modelDataStream(modelData)— 将data:后面的 JSON 字符串包装成流Json::parseFromStream()— 解析 JSON,成功返回true
每个 SSE 事件中的 JSON 长这样:
{"choices":[{"delta":{"content":"你"}}]}
注意流式响应用的是 **delta** 而不是 message:
| 对比 | 全量响应(sendMessage) | 流式响应(sendMessageStream) |
|---|---|---|
| 字段路径 | choices[0].message.content |
choices[0].delta.content |
| 含义 | 完整的回复内容 | 本次增量内容(一个字或几个字) |
| 出现次数 | 一次 | 多次,逐段返回 |
6.8 多重检查提取增量内容
if(modelDataJson.isMember("choices") &&
modelDataJson["choices"].isArray() &&
!modelDataJson["choices"].empty() &&
modelDataJson["choices"][0].isMember("delta") &&
modelDataJson["choices"][0]["delta"].isMember("content")){
std::string content = modelDataJson["choices"][0]["delta"]["content"].asString();
5 层检查,逐层深入:
isMember("choices")— 有choices字段吗?isArray()— 是数组吗?!empty()— 数组不为空吗?choices[0].isMember("delta")— 第一个元素有delta字段吗?delta.isMember("content")—delta里有content字段吗?
这样做的原因:流式响应的某些事件可能没有 **content** 字段。比如模型生成结束前的最后一个事件可能只包含 finish_reason: "stop" 而没有 content。不做检查直接访问会崩溃。
6.9 累积并回调
fullResponse += content;
callback(content, false);
fullResponse += content— 将增量内容追加到完整回复字符串中,最终作为函数返回值callback(content, false)— 调用回调通知上层:收到一段新内容(第一个参数),还没结束(第二个参数为false)
这两个操作是并行的:一边把碎片拼成完整回复(fullResponse),一边实时通知上层(callback)。
6.10 JSON 解析失败的容错
}else{
WARN("DeepSeekProvider sendMessageStream parse modelDataJson error: {}", errors);
}
如果某个 chunk 的 JSON 解析失败,只打一条 WARN 日志,不中断流式接收。这是合理的,因为个别事件格式异常不应影响整体流程,后续事件可能仍然正常。
6.11 content_receiver 返回
return true;
while 循环处理完 buffer 中所有完整事件后,返回 true 告诉 cpp-httplib 继续接收后续数据。如果 buffer 中还有不完整的数据(没找到 \n\n),会留在 buffer 中等下次 content_receiver 被调用时拼接。
七、发送请求与收尾
7.1 发送请求
auto result = client.send(req);
if(!result){
ERR("Network error {}", to_string(result.error()));
return "";
}
client.send(req)— 发送请求。与client.Post()不同,send()接受一个Request对象,会使用我们设置的response_handler和content_receiverresult—httplib::Result类型,重载了operator bool()- 网络失败时(DNS 解析失败、连接超时等)
result为false,result.error()返回具体的错误类型
7.2 确保流式正常结束
if(!streamFinish){
WARN("stream ended without [DONE] marker");
callback("", true);
}
如果整个请求结束了但 streamFinish 仍然是 false,说明没有收到 **[DONE]** 标记。可能的原因:
- 网络中断,数据没传完
- 服务器异常,提前关闭了连接
不管什么原因,都要调用 callback("", true) 通知上层流式已结束,避免上层一直在等。
7.3 返回完整回复
return fullResponse;
fullResponse 在 content_receiver 中逐段累积,到这里包含了模型的完整回复文本,和 sendMessage() 返回的内容一样,只是获取方式不同(一个是直接从响应体提取,一个是逐步拼接)。
八、完整流程图
sendMessageStream(messages, requestParam, callback)
│
├─ 1. 检查 isAvailable()
│
├─ 2. 构造 JSON 请求体(stream: true)
│
├─ 3. 创建 HTTP Client(超时 300 秒)
│
├─ 4. 构造 Request,设置两个回调:
│ ├── response_handler → 检查 HTTP 状态码
│ └── content_receiver → 逐块处理 SSE 数据
│
├─ 5. client.send(req) ──→ 请求发出
│ │
│ ├── 收到响应头 → response_handler
│ │ └── status != 200 → gotError = true
│ │
│ └── 收到每块 body → content_receiver
│ │
│ ├── 检查 gotError → 有错则终止
│ ├── 追加到 buffer
│ ├── while (找到 \n\n)
│ │ ├── 提取 chunk
│ │ ├── 空行/注释 → 跳过
│ │ ├── "data: [DONE]" → callback("", true), 结束
│ │ └── "data: {json}" → 解析 JSON
│ │ └── choices[0].delta.content
│ │ ├── fullResponse += content
│ │ └── callback(content, false)
│ └── return true(继续接收)
│
├─ 6. 检查 streamFinish
│ └── 未收到 [DONE] → callback("", true) 兜底
│
└─ 7. return fullResponse(累积的完整回复)
九、全量 vs 流式对比总结
| 对比项 | sendMessage(全量) | sendMessageStream(流式) |
|---|---|---|
| 请求体 | 无 stream 字段 |
"stream": true |
| 读取超时 | 60 秒 | 300 秒 |
| 请求头 | Authorization + Content-Type |
多一个 Accept: text/event-stream |
| 请求方式 | client.Post() |
client.send(req) + 回调 |
| 响应格式 | 完整 JSON | 多个 SSE 事件,\n\n 分隔 |
| 内容字段 | choices[0].message.content |
choices[0].delta.content |
| 结束标记 | 无(整个响应就是完整的) | data: [DONE] |
| 返回方式 | 一次性返回完整文本 | callback 逐段回调 + 最终返回完整文本 |
核心区别就一句话:全量是等完再给,流式是边收边给。
十、总结
sendMessageStream() 的难点在于:
- SSE 协议解析 — 需要理解
data:前缀、\n\n分隔符、[DONE]结束标记 - Buffer 缓冲机制 — 网络传输的数据边界不确定,必须缓冲拼接后按
\n\n分割 - 双重回调架构 —
response_handler检查状态码,content_receiver逐块处理数据 - 容错处理 — 注释行过滤、JSON 解析失败不中断、缺少
[DONE]的兜底回调
后续将实现 ChatGPT 和 Gemini 模型接入,它们的整体结构与 DeepSeek 类似,但在 API 端点、SSE 数据格式上有重要差异。
更多推荐


所有评论(0)