Spring Boot SSE 流式输出实现实时响应

Server-Sent Events (SSE) 是一种基于 HTTP 的轻量级协议,支持服务器向客户端单向推送数据。结合 Spring Boot 的异步处理能力,可以实现智能体的实时响应,适用于聊天机器人、实时监控等场景。

依赖配置pom.xml 中添加 Spring Web 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

核心实现步骤

创建 SSE 控制器 通过 SseEmitter 对象管理客户端连接,设置超时时间并返回 emitter 实例:

@RestController
@RequestMapping("/sse")
public class SseController {
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    @GetMapping("/connect/{clientId}")
    public SseEmitter connect(@PathVariable String clientId) {
        SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时
        emitters.put(clientId, emitter);
        emitter.onCompletion(() -> emitters.remove(clientId));
        emitter.onTimeout(() -> emitters.remove(clientId));
        return emitter;
    }
}

流式数据推送 使用异步方法向指定客户端发送分块数据:

@Async
public void sendData(String clientId, String data) {
    SseEmitter emitter = emitters.get(clientId);
    if (emitter != null) {
        try {
            emitter.send(SseEmitter.event()
                .data(data)
                .name("message"));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    }
}

智能体集成方案

处理长耗时任务 结合 CompletableFuture 实现非阻塞的智能体响应:

public CompletableFuture<Void> processAIRequest(String query, String clientId) {
    return CompletableFuture.runAsync(() -> {
        // 模拟AI分块响应
        for (int i = 1; i <= 5; i++) {
            String chunk = "AI Response Part " + i + " for: " + query;
            sendData(clientId, chunk);
            Thread.sleep(1000); // 模拟处理延迟
        }
    });
}

前端对接示例

JavaScript 通过 EventSource 接收数据:

const eventSource = new EventSource('/sse/connect/user123');
eventSource.addEventListener('message', (e) => {
    console.log('Received:', e.data);
});
eventSource.onerror = (e) => console.error('SSE error:', e);

性能优化建议

  • 使用线程池管理异步任务,避免资源耗尽
@Bean
public Executor asyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(50);
    executor.initialize();
    return executor;
}
  • 添加心跳机制保持连接活跃
@Scheduled(fixedRate = 25000)
public void sendHeartbeat() {
    emitters.forEach((id, emitter) -> {
        try {
            emitter.send(SseEmitter.event().comment("ping"));
        } catch (IOException e) {
            emitter.complete();
        }
    });
}

异常处理策略

实现全局异常处理器应对连接问题:

@ControllerAdvice
public class SseExceptionHandler {
    @ExceptionHandler(IOException.class)
    public void handleIOException(IOException ex) {
        // 记录连接异常日志
    }
}

这种实现方式在智能对话系统中表现出色,能够实现逐词输出的效果,同时保持良好的资源利用率。通过合理的线程控制和连接管理,可以支持高并发的实时数据推送需求。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐