使用 SseEmitter 实现 Spring Boot 后端的流式传输和前端的数据接收
本文介绍了两种使用SSE(Server-Sent Events)实现流式数据传输的方案。第一种是简单的文本消息推送,后端通过SseEmitter分5次发送文本消息,前端使用fetch API接收并解码流数据。第二种方案针对大数据量场景,后端将数据分批处理为多个JSON文件流式传输,防止内存溢出,前端同样使用流式方式接收。两种方案都实现了服务器主动向客户端推送数据的功能,适用于实时消息推送和大文件分
·
1.普通文本消息的发送和接收
@GetMapping("/stream")
public SseEmitter streamResponse() {
SseEmitter emitter = new SseEmitter(0L); // 0L 表示永不超时
Executors.newSingleThreadExecutor().execute(() -> {
try {
for (int i = 1; i <= 5; i++) {
emitter.send("消息 " + i);
Thread.sleep(1000); // 模拟延迟
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
async function fetchStreamData() {
const response = await fetch("/api/chat/stream");
// 确保服务器支持流式数据
if (!response.ok) {
throw new Error(`HTTP 错误!状态码: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
// 读取流式数据
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 解码并输出数据
const text = decoder.decode(value, { stream: true });
console.log("收到数据:", text);
}
console.log("流式传输完成");
}
// 调用流式请求
fetchStreamData().catch(console.error);
2.使用流式消息发送多个文件流,实现多个文件的传输
//这里相当于每个drawCatalogue对象会创建一个文件流,然后发送过去,list中有几个对象就会发送几个文件
//之所以要每个属性都手动write一下,是因为我的每个ajaxResult数据量都特别大,容易内存溢出。如果没有我这种特殊情况的话,直接使用JSONObject.toJSONString(drawCatalogue)就可以,不需要去手动写入每个属性。
public SseEmitter getAllDrawDataThree(String cadCode) {
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE); // 设置超时时间为最大值,防止自动结束
try {
Long code = Long.parseLong(cadCode);
DrawExcelList drawExcelList = new DrawExcelList();
drawExcelList.setCadCode(code);
List<DrawCatalogue> drawCatalogueList = drawExcelListService.treeTableCatalogue(drawExcelList);
int splitSize = 20;
List<DrawCatalogue> newDrawCatalogueList = splitDrawCatalogueList(drawCatalogueList, splitSize);
for (int i = 0; i < newDrawCatalogueList.size(); i++) {
String filePath = "drawCatalogue" + i + ".json"; // 文件路径
DrawCatalogue drawCatalogue = newDrawCatalogueList.get(i);
try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
writer.write("["); // 开始写入最外层JSON数组
writer.write("{");
writer.write("\"id\": \"" + drawCatalogue.getId() + "\",");
writer.write("\"drawName\": \"" + drawCatalogue.getDrawName() + "\",");
writer.write("\"drawType\": \"" + drawCatalogue.getDrawType() + "\",");
writer.write("\"combineOutType\": \"" + drawCatalogue.getCombineOutType() + "\",");
writer.write("\"num\": \"" + drawCatalogue.getNum() + "\",");
writer.write("\"children\": ");
writer.write("["); // 开始写入childrenJSON数组
boolean first = true; // 用于判断是否是第一个元素
List<DrawCatalogue> children = drawCatalogue.getChildren();
for (DrawCatalogue child : children) {
DrawingMain drawingMain = new DrawingMain();
drawingMain.setCadCode(code);
drawingMain.setDrawName(child.getCombineOutType());
drawingMain.setDrawType(child.getDrawType());
AjaxResult ajaxResult = drawingMainService.imgListShow(drawingMain);
if (!first) {
writer.write(","); // 如果不是第一个元素,写入逗号分隔
}
String tabletJson = JSONObject.toJSONString(ajaxResult);
// 逐个属性写入文件流
writer.write("{");
writer.write("\"id\": \"" + child.getId() + "\",");
writer.write("\"drawName\": \"" + child.getDrawName() + "\",");
writer.write("\"combineOutType\": \"" + child.getCombineOutType() + "\",");
writer.write("\"drawType\": \"" + child.getDrawType() + "\",");
writer.write("\"tabletJson\": " + tabletJson);
writer.write("}");
first = false; // 标记已经写入了一个元素
}
writer.write("]"); // 结束children数组
writer.write("}");
writer.write("]"); // 结束最外层JSON数组
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
// 读取并发送文件流
//byte[] fileData = Files.readAllBytes(Paths.get(filePath));
// 分块读取文件并发送(防止一次性读取的文件过大导致内存溢出)
Path path = Paths.get(filePath);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[8192]; // 8KB buffer
try (InputStream in = Files.newInputStream(path)) {
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
byte[] fileData = outputStream.toByteArray();
sseEmitter.send(fileData, MediaType.APPLICATION_OCTET_STREAM);
}
sseEmitter.complete();
} catch (Exception e) {
sseEmitter.completeWithError(e);
} finally {
sseEmitter.complete();
}
return sseEmitter;
}
前端代码,在方法中调用,后端返回几个文件就会弹出几个下载窗口
const eventSource = new EventSource('http://127.0.0.1:1801/tablet/getAllDrawDataThree');
eventSource.onmessage = function(event) {
try {
const fileData = event.data;
const blob = new Blob([fileData], { type: 'application/octet-stream' });
const url = window.URL.createObjectURL(blob);
const a = document.createElement('a');
a.style.display = 'none';
a.href = url;
a.download = 'file.json'; // 设置下载文件名
document.body.appendChild(a);
a.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(a);
} catch (error) {
console.error('Error processing event data:', error);
}
};
eventSource.onerror = function(event) {
console.error('SSE error:', event);
};
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)