Spring Boot实现SSE实时流式输出,数仓各层级设计总结。
Server-Sent Events (SSE) 是一种基于 HTTP 的轻量级协议,支持服务器向客户端单向推送数据。结合 Spring Boot 的异步处理能力,可以实现智能体的实时响应,适用于聊天机器人、实时监控等场景。这种实现方式在智能对话系统中表现出色,能够实现逐词输出的效果,同时保持良好的资源利用率。通过合理的线程控制和连接管理,可以支持高并发的实时数据推送需求。
Spring Boot SSE 流式输出实现实时响应
Server-Sent Events (SSE) 是一种基于 HTTP 的轻量级协议,支持服务器向客户端单向推送数据。结合 Spring Boot 的异步处理能力,可以实现智能体的实时响应,适用于聊天机器人、实时监控等场景。
依赖配置 在 pom.xml 中添加 Spring Web 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
核心实现步骤
创建 SSE 控制器 通过 SseEmitter 对象管理客户端连接,设置超时时间并返回 emitter 实例:
@RestController
@RequestMapping("/sse")
public class SseController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@GetMapping("/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId) {
SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时
emitters.put(clientId, emitter);
emitter.onCompletion(() -> emitters.remove(clientId));
emitter.onTimeout(() -> emitters.remove(clientId));
return emitter;
}
}
流式数据推送 使用异步方法向指定客户端发送分块数据:
@Async
public void sendData(String clientId, String data) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.data(data)
.name("message"));
} catch (IOException e) {
emitter.completeWithError(e);
}
}
}
智能体集成方案
处理长耗时任务 结合 CompletableFuture 实现非阻塞的智能体响应:
public CompletableFuture<Void> processAIRequest(String query, String clientId) {
return CompletableFuture.runAsync(() -> {
// 模拟AI分块响应
for (int i = 1; i <= 5; i++) {
String chunk = "AI Response Part " + i + " for: " + query;
sendData(clientId, chunk);
Thread.sleep(1000); // 模拟处理延迟
}
});
}
前端对接示例
JavaScript 通过 EventSource 接收数据:
const eventSource = new EventSource('/sse/connect/user123');
eventSource.addEventListener('message', (e) => {
console.log('Received:', e.data);
});
eventSource.onerror = (e) => console.error('SSE error:', e);
性能优化建议
- 使用线程池管理异步任务,避免资源耗尽
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(50);
executor.initialize();
return executor;
}
- 添加心跳机制保持连接活跃
@Scheduled(fixedRate = 25000)
public void sendHeartbeat() {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event().comment("ping"));
} catch (IOException e) {
emitter.complete();
}
});
}
异常处理策略
实现全局异常处理器应对连接问题:
@ControllerAdvice
public class SseExceptionHandler {
@ExceptionHandler(IOException.class)
public void handleIOException(IOException ex) {
// 记录连接异常日志
}
}
这种实现方式在智能对话系统中表现出色,能够实现逐词输出的效果,同时保持良好的资源利用率。通过合理的线程控制和连接管理,可以支持高并发的实时数据推送需求。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)