Event-Stream实时通信技术
WebSocket与Event-Stream(SSE)技术对比分析 WebSocket和Event-Stream(SSE)是两种常用的实时通信技术,各有特点。WebSocket支持全双工通信,适合复杂交互场景如聊天、游戏,但协议较复杂;SSE基于HTTP,仅支持服务器推送,适合简单场景如消息通知。在Java实现上,推荐采用Spring WebFlux的响应式方案,相比传统SseEmitter具有更
服务端
- websocket和event-stream的优缺点
WebSocket和Event-Stream(Server-Sent Events)都是实现实时通信的技术,但是它们各自有不同的优缺点。
⭐️ WebSocket
- 优点:
- 双向通信:WebSocket提供了一个全双工的通信通道,客户端和服务器可以同时发送和接收数据。
- 实时性:由于WebSocket是持久连接,所以它具有高实时性。
- 更少的数据传输量:WebSocket在建立连接后,数据传输时不需要包含HTTP头,因此数据传输量较小。
- websocket可传输较为复杂的数据结构,例如json、二进制字节等。
- websocket针对java、Python等语言支持较好
- websocket天然支持跨域问题(据说也有跨域问题,目前无法模拟出来)
- 缺点:
- 虽然大部分现代浏览器都支持WebSocket,但是一些老版本的浏览器可能不支持。
- 协议复杂:WebSocket的协议相对复杂,需要处理连接、断开连接、心跳等问题。
- 适用场景
- 适合较为复杂的业务场景,需要多次进行通讯,例如:聊天、游戏等
🌟 Event-Stream (Server-Sent Events):
- 优点:
- 简单易用:Event-Stream的API相对简单,易于使用和理解。
- 自动重连:如果连接断开,Event-Stream会自动尝试重连。
- 基于HTTP:Event-Stream基于HTTP协议,因此可以利用现有的HTTP基础设施,如负载均衡和缓存。
- 缺点:
- 单向通信:Event-Stream只支持服务器向客户端发送数据,不支持客户端向服务器发送数据。
- 实时性:由于Event-Stream是基于HTTP的,因此它的实时性可能不如WebSocket。
- 数据传输量:Event-Stream在每次发送数据时都需要包含HTTP头,因此数据传输量可能较大。
- 数据结构:Event-Stream传输仅仅支持字符形式,无法适应较复杂的场景
- java流式下发可能出现丢包、站包问题,需要自己实现编解码来对消息进一步处理
- 适用场景
- 适合较为简单的场景,例如:大模型客户端发一次消息后,服务端返回结果
event-stream形式
JAVA
java实现SSE的方式主要有2种,分别为
- 基于Spring mvc的Servlet协议下的的SseEmitter
- 基于Spring WebFlux的响应式web框架webFlux,推荐采用webFlux,目前JAVA大模型应用开发主流的实现方式。
主要区别
| 特性 | WebFlux | SseEmitter |
|---|---|---|
| 编程模型 | 响应式 (Reactive),推荐在spring-boot-starter-webflux下使用 | 传统 Servlet 异步,推荐在spring-boot-starter-web下使用 |
| 底层技术 | Netty/Undertow等非阻塞服务器 | Servlet容器(Tomcat/Jetty等) |
| 连接方式 | 支持多种(SSE, WebSocket等) | 专为SSE设计 |
| 背压支持 | 有 | 无 |
| 线程模型 | 事件循环,少量线程处理多请求 | 每个请求占用一个线程直到完成 |
| 适用场景 | 高并发、低延迟、流式数据处理 | 简单的服务器推送 |
| 优点 | 1. 高并发性能好,资源利用率高 2. 内置背压支持,防止消费者过载 3. 统一的响应式编程模型 4. 支持多种协议(HTTP, SSE, WebSocket等) |
1. 学习曲线陡峭(需要理解响应式编程) 2. 调试相对困难 3. 与传统阻塞库兼容性需要额外处理 |
| 缺点 | 1. 使用简单,API直观 2. 与传统Spring MVC集成无缝 3. 适合简单的服务器推送场景 4. 浏览器兼容性好(SSE是标准协议) |
1. 每个连接占用一个线程 2. 不支持背压 3. 功能相对单一(仅SSE) 4. 在极高并发下性能不如WebFlux |
WebFlux形式(推荐)
1. 引入依赖
- 需要说明的是,
spring-boot-starter-webflux和spring-boot-starter-web二者同时使用可能存在兼容性问题。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
- 如果在引入了webFlux前提下,还要用web相关的依赖,例如:使用openfeign调用接口时,需要使用
HttpMessageConverters(即spring-web下的依赖),则可单独引入spring-web
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
2. 具体使用
Spring WebFlux基于Reactor项目实现了Reactive Streams规范,其核心是两种响应式类型:
- Mono:表示包含0或1个元素的异步序列
- Flux:表示包含0到N个元素的异步序列
这两种类型都支持三种消息通知:
- onNext():正常包含元素的消息
- onComplete():序列结束的消息
- onError():序列出错的消息
@RestController
@RequestMapping("/api/user")
public class WebFluxController {
private Map<Long, User> map = new HashMap<>();
@GetMapping("/getAll")
public Flux<User> getAllUser() {
return Flux.fromIterable(map.values());
}
@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable Long id) {
return Mono.just(map.get(id));
}
@PostMapping("/save")
public Mono<ResponseEntity<String>> save(@RequestBody User user) {
map.put(user.getId(), user);
return Mono.just(ResponseEntity.ok("添加成功"));
}
}
数据处理和转换
WebFlux提供了丰富的操作符来处理数据流:
- map:转换元素而不改变序列结构
- flatMap:转换元素并返回新的Publisher
- filter:过滤元素
- zip:合并多个流
- reduce:聚合流中的元素
@GetMapping("/processed")
public Flux<String> getProcessedData() {
return userService.getAllUsers()
.map(User::getName)
.filter(name -> name.startsWith("A"))
.flatMap(name -> Mono.just(name.toUpperCase()))
.take(10);
}
SseEmitter
- SseEmitter种主要的方法
- send:发送消息
- complete():表示消息已结束
- completeWithError(): 发送错误并结束,多用于异常捕获中
@GetMapping(value = "sseEmitterTest")
public Object sseEmitterTest(@RequestParam(value = "name") String name) {
SseEmitter sseEmitter = new SseEmitter(0L);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 60L, TimeUnit.SECONDS, queue);
threadPoolExecutor.execute(() -> {
char[] charArray = name.toCharArray();
for (char charStr : charArray) {
try {
sseEmitter.send(new String(new char[]{charStr}));
Thread.sleep(100);
} catch (IOException | InterruptedException e) {
sseEmitter.completeWithError(e);
}
}
sseEmitter.complete();
});
return sseEmitter;
}
Python
安装依赖
pip install fastapi uvicorn
最简单的流式接口
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.get("/stream")
async def stream_response():
async def generate():
for i in range(10):
await asyncio.sleep(0.5) # 模拟处理延迟
yield f"数据块 {i}\n"
return StreamingResponse(generate(), media_type="text/plain")
模拟大模型输出方式
逐词输出模拟
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import time
app = FastAPI()
def mock_llm_generate(prompt: str):
"""模拟大模型生成文本的过程"""
full_response = "这是一个模拟的大语言模型响应。它会逐词输出结果,创造流式体验。"
for word in full_response.split():
yield word + " "
time.sleep(0.1) # 模拟每个词生成的延迟
@app.get("/llm-stream")
async def llm_stream(prompt: str = "你好"):
async def generate():
for chunk in mock_llm_generate(prompt):
yield chunk
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"}
)
用Server-Sent Events (SSE)
@app.get("/llm-sse")
async def llm_sse(prompt: str = "你好"):
async def event_generator():
for chunk in mock_llm_generate(prompt):
# SSE格式要求
yield f"data: {chunk}\n\n"
await asyncio.sleep(0.1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"}
)
客户端
JAVA
httpclient形式
引入依赖
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>3.5.2</version>
</dependency>
调用接口
消息处理类
public class DefaultMessageHandler implements AsyncHandler<Response> {
@Override
public State onStatusReceived(HttpResponseStatus httpResponseStatus) {
return null;
}
@Override
public State onHeadersReceived(HttpHeaders httpHeaders) {
return null;
}
@Override
public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) {
// 业务逻辑处理
return State.CONTINUE;
}
@Override
public void onThrowable(Throwable throwable) {
}
@Override
public Response onCompleted() {
return null;
}
}
主方法
@NoArgsConstructor
public class HttpStreamClient {
private AsyncHandler<Response> asyncHandler;
public HttpStreamClient(AsyncHandler<Response> asyncHandler) {
this.asyncHandler = asyncHandler;
}
public void sendMessage(String url, String message) {
// 组装okhttp请求
AsyncHttpClient client = Dsl.asyncHttpClient();
BoundRequestBuilder requestBuilder = client.preparePost(url);
requestBuilder.setHeader("Accept", "text/event-stream");
requestBuilder.setHeader("Content-Type", "application/json");
requestBuilder.setBody(message);
requestBuilder.execute(asyncHandler);
}
public static void main(String[] args) {
DefaultMessageHandler defaultMessageHandler = new DefaultMessageHandler();
HttpStreamClient httpStreamClient = new HttpStreamClient(defaultMessageHandler);
String message = "Hello WOrld!"
httpStreamClient.sendMessage("API", message);
}
}
weclient形式
WebClient client = WebClient.create();
Mono<String> response = client.get()
.uri("https://example.com/api/users")
.retrieve()
.bodyToMono(String.class);
response.subscribe(System.out::println);
带请求参数
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.build();
Mono<User> user = client.get()
.uri("/users/{id}", 123)
.header("Authorization", "Bearer token")
.retrieve()
.bodyToMono(User.class);
Python
- 引入依赖
pip install requests
- 编码部分
import requests
import json
def invokeStreamLlm():
requestBody = {
name: "Hello World!"
}
response = requests.post(
"API",
json=requestBody,
stream=True,
headers={"Content-Type": "application/json"},
)
if response.status_code != 200:
return
for line in response.iter_lines():
if line and line.strip():
content = json.loads(line[5:])
code = content['code']
if code == 200:
print(content['data'])
elif code == -200:
print('结束标识')
if __name__ == "__main__":
content = invokeStreamLlm()
vue
本地开发时,vue2开发环境下,可能会出现无法全部流式下发的情况,而是等消息完全发送完统一下发。在部署后,通过nginx代理后,不会有此问题。这可能是vue2的一些限制导致的。暂时未找到解决方法。
参数说明
1. reconnectInterval
-
作用:控制 SSE 连接断开后自动重连的时间间隔(毫秒)。
-
设置为
null:- 表示 禁用自动重连,连接断开后不会尝试重新连接。
- 适用于 一次性请求 或 需要手动控制重连逻辑 的场景。
-
默认行为:
- 如果不设置或设为
undefined,fetchEventSource会使用默认的重连策略(通常是 1-5 秒)。 - 如果设为
0,会立即重连(可能对服务器造成压力)。
- 如果不设置或设为
-
示例:
reconnectInterval: null, // 禁用自动重连 reconnectInterval: 5000, // 每 5 秒重试一次
2. openWhenHidden
-
作用:控制 当浏览器标签页处于后台(不可见)时是否保持 SSE 连接。
-
不配置可能出现的坑:在切片窗口后,请求自动断开了连接,导致请求无法正常响应;或者采用F12调试时,请求也会自动断开连接
-
设置为
true:- 表示 即使页面不可见(如切换到其他标签页或最小化浏览器),仍然保持连接。
- 适用于 需要实时更新的后台任务(如聊天应用、股票行情推送)。
-
设置为
false:- 表示 当页面不可见时,自动断开 SSE 连接,以减少资源消耗。
- 适用于 不关键的后台数据推送,或需要节省带宽/服务器资源的场景。
-
默认行为:
- 如果不设置,
fetchEventSource可能会默认false(不同实现可能不同)。
- 如果不设置,
-
示例:
openWhenHidden: true, // 即使页面不可见,仍然保持连接 openWhenHidden: false, // 页面不可见时自动断开
3. signal
- 作用:用于 手动控制 SSE 连接的终止(通过
AbortController)。 - 设置为
ctrl.signal:- 允许通过
ctrl.abort()主动关闭 SSE 连接。 - 适用于 用户手动取消请求 或 组件卸载时清理资源(如 React 的
useEffect清理函数)。
- 允许通过
- 设置为
null或undefined:- 表示 无法通过
AbortController手动终止连接,只能等待服务器关闭或网络错误。
- 表示 无法通过
- 示例:
const ctrl = new AbortController(); signal: ctrl.signal, // 允许通过 ctrl.abort() 关闭连接// 在需要时手动关闭: ctrl.abort(); // 触发 onclose 或 onerror
常见使用场景:
| 场景 | 推荐配置 |
|---|---|
| 实时聊天应用 | penWhenHidden: true, reconnectInterval: 1000 |
| 一次性数据流 | reconnectInterval: null |
| 节省资源的后台任务 | penWhenHidden: false |
| 需要手动取消的请求 | signal: ctrl.signal |
- 引入依赖
npm install @microsoft/fetch-event-source -D
- 编码部分
<script>
import { fetchEventSource } from '@microsoft/fetch-event-source';
export default {
data() {
return {
};
},
mounted() {
this.invokeSse();
},
methods: {
invokeSse() {
fetchEventSource('API', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: ['text/event-stream', 'application/json'],
},
reconnectInterval: null,
openWhenHidden:true,
body: JSON.stringify({
data: ''
}),
onopen(event) {
console.log(event);
},
onmessage(msg) {
// 消息接收
console.log(msg);
},
onclose(e) {
console.log(e);
},
onerror(err) {
console.log(err);
},
});
}
},
};
</script>
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)