服务端

  1. websocket和event-stream的优缺点

WebSocket和Event-Stream(Server-Sent Events)都是实现实时通信的技术,但是它们各自有不同的优缺点。

⭐️ WebSocket

  • 优点:
  1. 双向通信:WebSocket提供了一个全双工的通信通道,客户端和服务器可以同时发送和接收数据。
  2. 实时性:由于WebSocket是持久连接,所以它具有高实时性。
  3. 更少的数据传输量:WebSocket在建立连接后,数据传输时不需要包含HTTP头,因此数据传输量较小。
  4. websocket可传输较为复杂的数据结构,例如json、二进制字节等。
  5. websocket针对java、Python等语言支持较好
  6. websocket天然支持跨域问题(据说也有跨域问题,目前无法模拟出来)
  • 缺点:
  1. 虽然大部分现代浏览器都支持WebSocket,但是一些老版本的浏览器可能不支持。
  2. 协议复杂:WebSocket的协议相对复杂,需要处理连接、断开连接、心跳等问题。
  • 适用场景
  1. 适合较为复杂的业务场景,需要多次进行通讯,例如:聊天、游戏等

🌟 Event-Stream (Server-Sent Events):

  • 优点:
  1. 简单易用:Event-Stream的API相对简单,易于使用和理解。
  2. 自动重连:如果连接断开,Event-Stream会自动尝试重连。
  3. 基于HTTP:Event-Stream基于HTTP协议,因此可以利用现有的HTTP基础设施,如负载均衡和缓存。
  • 缺点:
  1. 单向通信:Event-Stream只支持服务器向客户端发送数据,不支持客户端向服务器发送数据。
  2. 实时性:由于Event-Stream是基于HTTP的,因此它的实时性可能不如WebSocket。
  3. 数据传输量:Event-Stream在每次发送数据时都需要包含HTTP头,因此数据传输量可能较大。
  4. 数据结构:Event-Stream传输仅仅支持字符形式,无法适应较复杂的场景
  5. java流式下发可能出现丢包、站包问题,需要自己实现编解码来对消息进一步处理
  • 适用场景
  1. 适合较为简单的场景,例如:大模型客户端发一次消息后,服务端返回结果

event-stream形式

JAVA

java实现SSE的方式主要有2种,分别为

  1. 基于Spring mvc的Servlet协议下的的SseEmitter
  2. 基于Spring WebFlux的响应式web框架webFlux,推荐采用webFlux,目前JAVA大模型应用开发主流的实现方式。
主要区别
特性 WebFlux SseEmitter
编程模型 响应式 (Reactive),推荐在spring-boot-starter-webflux下使用 传统 Servlet 异步,推荐在spring-boot-starter-web下使用
底层技术 Netty/Undertow等非阻塞服务器 Servlet容器(Tomcat/Jetty等)
连接方式 支持多种(SSE, WebSocket等) 专为SSE设计
背压支持
线程模型 事件循环,少量线程处理多请求 每个请求占用一个线程直到完成
适用场景 高并发、低延迟、流式数据处理 简单的服务器推送
优点 1. 高并发性能好,资源利用率高
2. 内置背压支持,防止消费者过载
3. 统一的响应式编程模型
4. 支持多种协议(HTTP, SSE, WebSocket等)
1. 学习曲线陡峭(需要理解响应式编程)
2. 调试相对困难
3. 与传统阻塞库兼容性需要额外处理
缺点 1. 使用简单,API直观
2. 与传统Spring MVC集成无缝
3. 适合简单的服务器推送场景
4. 浏览器兼容性好(SSE是标准协议)
1. 每个连接占用一个线程
2. 不支持背压
3. 功能相对单一(仅SSE)
4. 在极高并发下性能不如WebFlux
WebFlux形式(推荐)
1. 引入依赖
  • 需要说明的是,spring-boot-starter-webfluxspring-boot-starter-web二者同时使用可能存在兼容性问题。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
  • 如果在引入了webFlux前提下,还要用web相关的依赖,例如:使用openfeign调用接口时,需要使用HttpMessageConverters(即spring-web下的依赖),则可单独引入spring-web
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <scope>compile</scope>
    <optional>true</optional>
</dependency>
2. 具体使用

Spring WebFlux基于Reactor项目实现了Reactive Streams规范,其核心是两种响应式类型:

  • Mono:表示包含0或1个元素的异步序列
  • Flux:表示包含0到N个元素的异步序列

这两种类型都支持三种消息通知:

  • onNext():正常包含元素的消息
  • onComplete():序列结束的消息
  • onError():序列出错的消息
@RestController
@RequestMapping("/api/user")
public class WebFluxController {
    
    private Map<Long, User> map = new HashMap<>();
    
    @GetMapping("/getAll")
    public Flux<User> getAllUser() {
        return Flux.fromIterable(map.values());
    }
    
    @GetMapping("/{id}")
    public Mono<User> getUserById(@PathVariable Long id) {
        return Mono.just(map.get(id));
    }
    
    @PostMapping("/save")
    public Mono<ResponseEntity<String>> save(@RequestBody User user) {
        map.put(user.getId(), user);
        return Mono.just(ResponseEntity.ok("添加成功"));
    }
}
数据处理和转换

WebFlux提供了丰富的操作符来处理数据流:

  • map:转换元素而不改变序列结构
  • flatMap:转换元素并返回新的Publisher
  • filter:过滤元素
  • zip:合并多个流
  • reduce:聚合流中的元素
@GetMapping("/processed")
public Flux<String> getProcessedData() {
    return userService.getAllUsers()
            .map(User::getName)
            .filter(name -> name.startsWith("A"))
            .flatMap(name -> Mono.just(name.toUpperCase()))
            .take(10);
}
SseEmitter
  • SseEmitter种主要的方法
  1. send:发送消息
  2. complete():表示消息已结束
  3. completeWithError(): 发送错误并结束,多用于异常捕获中
   @GetMapping(value = "sseEmitterTest")
    public Object sseEmitterTest(@RequestParam(value = "name") String name) {
        SseEmitter sseEmitter = new SseEmitter(0L);
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 60L, TimeUnit.SECONDS, queue);
        threadPoolExecutor.execute(() -> {
            char[] charArray = name.toCharArray();
            for (char charStr : charArray) {
                try {
                    sseEmitter.send(new String(new char[]{charStr}));
                    Thread.sleep(100);
                } catch (IOException | InterruptedException e) {
                    sseEmitter.completeWithError(e);
                }
            }
            sseEmitter.complete();
        });

        return sseEmitter;
    }

Python

安装依赖

pip install fastapi uvicorn

最简单的流式接口

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.get("/stream")
async def stream_response():
    async def generate():
        for i in range(10):
            await asyncio.sleep(0.5)  # 模拟处理延迟
            yield f"数据块 {i}\n"
    
    return StreamingResponse(generate(), media_type="text/plain")

模拟大模型输出方式

逐词输出模拟
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import time

app = FastAPI()

def mock_llm_generate(prompt: str):
    """模拟大模型生成文本的过程"""
    full_response = "这是一个模拟的大语言模型响应。它会逐词输出结果,创造流式体验。"
    for word in full_response.split():
        yield word + " "
        time.sleep(0.1)  # 模拟每个词生成的延迟

@app.get("/llm-stream")
async def llm_stream(prompt: str = "你好"):
    async def generate():
        for chunk in mock_llm_generate(prompt):
            yield chunk
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"}
    )
用Server-Sent Events (SSE)
@app.get("/llm-sse")
async def llm_sse(prompt: str = "你好"):
    async def event_generator():
        for chunk in mock_llm_generate(prompt):
            # SSE格式要求
            yield f"data: {chunk}\n\n"
            await asyncio.sleep(0.1)
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"}
    )

客户端

JAVA

httpclient形式

引入依赖
 <dependency>
    <groupId>org.asynchttpclient</groupId>
    <artifactId>async-http-client</artifactId>
 	<version>2.12.3</version>
 </dependency>

<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>3.5.2</version>
</dependency>
调用接口
消息处理类
public class DefaultMessageHandler implements AsyncHandler<Response> {

    @Override
    public State onStatusReceived(HttpResponseStatus httpResponseStatus) {
        return null;
    }


    @Override
    public State onHeadersReceived(HttpHeaders httpHeaders) {
        return null;
    }

    @Override
    public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) {
 		// 业务逻辑处理
        return State.CONTINUE;
    }

    @Override
    public void onThrowable(Throwable throwable) {

    }

    @Override
    public Response onCompleted() {
        return null;
    }

}
主方法
@NoArgsConstructor
public class HttpStreamClient {

    private AsyncHandler<Response> asyncHandler;

    public HttpStreamClient(AsyncHandler<Response> asyncHandler) {
        this.asyncHandler = asyncHandler;
    }


    public void sendMessage(String url, String message) {
        // 组装okhttp请求
        AsyncHttpClient client = Dsl.asyncHttpClient();
        BoundRequestBuilder requestBuilder = client.preparePost(url);
        requestBuilder.setHeader("Accept", "text/event-stream");
        requestBuilder.setHeader("Content-Type", "application/json");
        requestBuilder.setBody(message);
        requestBuilder.execute(asyncHandler);
    }

    public static void main(String[] args) {
        DefaultMessageHandler defaultMessageHandler = new DefaultMessageHandler();
        HttpStreamClient httpStreamClient = new HttpStreamClient(defaultMessageHandler);
        String message = "Hello WOrld!"
        httpStreamClient.sendMessage("API", message);
    }
}

weclient形式

WebClient client = WebClient.create();

Mono<String> response = client.get()
        .uri("https://example.com/api/users")
        .retrieve()
        .bodyToMono(String.class);

response.subscribe(System.out::println);

带请求参数

WebClient client = WebClient.builder()
        .baseUrl("https://api.example.com")
        .build();

Mono<User> user = client.get()
        .uri("/users/{id}", 123)
        .header("Authorization", "Bearer token")
        .retrieve()
        .bodyToMono(User.class);

Python

  1. 引入依赖
pip install requests
  1. 编码部分
import requests
import json

def invokeStreamLlm():
    requestBody = {
    	name: "Hello World!"
    }
    response = requests.post(
        "API",
        json=requestBody,
        stream=True,
        headers={"Content-Type": "application/json"},
    )
    if response.status_code != 200:
        return
    for line in response.iter_lines():
        if line and line.strip():
            content = json.loads(line[5:])
            code = content['code']
            if code == 200:
                print(content['data'])
            elif code == -200:
                print('结束标识')


if __name__ == "__main__":
    content = invokeStreamLlm()

vue

本地开发时,vue2开发环境下,可能会出现无法全部流式下发的情况,而是等消息完全发送完统一下发。在部署后,通过nginx代理后,不会有此问题。这可能是vue2的一些限制导致的。暂时未找到解决方法。

参数说明

1. reconnectInterval
  • 作用:控制 SSE 连接断开后自动重连的时间间隔(毫秒)。

  • 设置为 null

    • 表示 禁用自动重连,连接断开后不会尝试重新连接。
    • 适用于 一次性请求需要手动控制重连逻辑 的场景。
  • 默认行为

    • 如果不设置或设为 undefinedfetchEventSource 会使用默认的重连策略(通常是 1-5 秒)。
    • 如果设为 0,会立即重连(可能对服务器造成压力)。
  • 示例

    reconnectInterval: null, // 禁用自动重连
    reconnectInterval: 5000, // 每 5 秒重试一次
    

2. openWhenHidden
  • 作用:控制 当浏览器标签页处于后台(不可见)时是否保持 SSE 连接

  • 不配置可能出现的坑:在切片窗口后,请求自动断开了连接,导致请求无法正常响应;或者采用F12调试时,请求也会自动断开连接

  • 设置为 true

    • 表示 即使页面不可见(如切换到其他标签页或最小化浏览器),仍然保持连接
    • 适用于 需要实时更新的后台任务(如聊天应用、股票行情推送)。
  • 设置为 false

    • 表示 当页面不可见时,自动断开 SSE 连接,以减少资源消耗。
    • 适用于 不关键的后台数据推送,或需要节省带宽/服务器资源的场景。
  • 默认行为

    • 如果不设置,fetchEventSource 可能会默认 false(不同实现可能不同)。
  • 示例

    openWhenHidden: true, // 即使页面不可见,仍然保持连接
    openWhenHidden: false, // 页面不可见时自动断开
    

3. signal
  • 作用:用于 手动控制 SSE 连接的终止(通过 AbortController)。
  • 设置为 ctrl.signal
    • 允许通过 ctrl.abort() 主动关闭 SSE 连接
    • 适用于 用户手动取消请求组件卸载时清理资源(如 React 的 useEffect 清理函数)。
  • 设置为 nullundefined
    • 表示 无法通过 AbortController 手动终止连接,只能等待服务器关闭或网络错误。
  • 示例
    const ctrl = new AbortController();
    signal: ctrl.signal, // 允许通过 ctrl.abort() 关闭连接
    
    // 在需要时手动关闭:
    ctrl.abort(); // 触发 onclose 或 onerror
    

常见使用场景:

场景 推荐配置
实时聊天应用 penWhenHidden: true, reconnectInterval: 1000
一次性数据流 reconnectInterval: null
节省资源的后台任务 penWhenHidden: false
需要手动取消的请求 signal: ctrl.signal
  1. 引入依赖
npm install @microsoft/fetch-event-source -D
  1. 编码部分
<script>
import { fetchEventSource } from '@microsoft/fetch-event-source';
export default {
  data() {
    return {
     
    };
  },
  mounted() {
    this.invokeSse();
  },
  methods: {
    invokeSse() {
      fetchEventSource('API', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          Accept: ['text/event-stream', 'application/json'],
        },
        reconnectInterval: null,
        openWhenHidden:true,
        body: JSON.stringify({
        data: ''
        }),
        onopen(event) {
          console.log(event);
        },
        onmessage(msg) {
          // 消息接收
          console.log(msg);
        },
        onclose(e) {
          console.log(e);
        },
        onerror(err) {
          console.log(err);
        },
      });
    }
  },
};
</script>
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐