java每日精进 7.31【WebSocket全流程】
握手时认证(
1.层级分类
1.1 传输层(Spring WebSocket基础层)
| 组件 | 对应类/接口 | 功能说明 |
|---|---|---|
| WebSocket协议处理 | org.springframework.web.socket 包下的标准类(如TextWebSocketHandler) |
Spring原生WebSocket处理 |
| 握手拦截器 | cn.iocoder.yudao.framework.websocket.core.interceptor.LoginUserHandshakeInterceptor |
WebSocket连接时的认证拦截 |
| Session装饰器 | cn.iocoder.yudao.framework.websocket.core.handler.WebSocketSessionHandlerDecorator |
增强Session生命周期管理 |
关键代码片段:
TextWebSocketHandler
public class TextWebSocketHandler extends AbstractWebSocketHandler {
public TextWebSocketHandler() {
}
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
try {
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Binary messages not supported"));
} catch (IOException var4) {
}
}
}
主要作用:
- 处理客户端与服务器之间的文本类型 WebSocket 通信
- 拒绝接收二进制消息(通过关闭连接并返回原因)
- 作为 WebSocket 连接的核心处理器,管理消息收发逻辑
和端点配置一起使用:
YudaoWebSocketAutoConfiguration中的Bean一起使用:
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,
WebSocketHandler webSocketHandler,
WebSocketProperties webSocketProperties) {
return registry -> registry
// 添加 WebSocketHandler
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptors)
// 允许跨域,否则前端连接会直接断开
.setAllowedOriginPatterns("*");
}
websocket:
enable: true # websocket的开关
path: /infra/ws # 指定 WebSocket 服务的连接端点路径,若服务器地址为 localhost:8080,则客户端连接地址为 ws://localhost:8080/infra/ws
sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq
sender-rocketmq:
topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic
consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group
sender-rabbitmq:
exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange
queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue
sender-kafka:
topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic
consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group
LoginUserHandshakeInterceptor
/**
* 登录用户的 {@link HandshakeInterceptor} 实现类
*
* 流程如下:
* 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
* 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
*/
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
if (loginUser != null) {
WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
// do nothing
}
}
WebSocketSessionHandlerDecorator
WebSocketSessionHandlerDecorator 类是一个基于装饰器模式的 WebSocketHandler 增强类,主要作用是对 WebSocket 连接的会话(WebSocketSession)进行管理和功能增强,确保连接的稳定性和并发安全性。
核心功能解析
该类通过重写 WebSocketHandler 的关键方法,实现了两大核心功能:
1. 管理 WebSocket 会话的生命周期
通过与 WebSocketSessionManager 配合,统一管理客户端连接的会话:
-
连接建立时(
afterConnectionEstablished):
当客户端与服务器成功建立 WebSocket 连接后,将新创建的WebSocketSession添加到sessionManager中。
这样,sessionManager就能维护所有在线客户端的会话列表,方便后续查找特定客户端、群发消息等操作(例如通过用户 ID 定位其会话并推送消息)。 -
连接关闭时(
afterConnectionClosed):
当客户端断开连接(主动关闭或异常断开),从sessionManager中移除对应的WebSocketSession,避免无效会话占用资源,确保会话列表的准确性。
2. 增强会话的并发安全性和消息发送稳定性
通过包装 WebSocketSession 为 ConcurrentWebSocketSessionDecorator,解决两个实际问题:
-
并发操作安全:
WebSocketSession本身并非线程安全的,若多个线程同时向同一个会话发送消息(例如服务器同时推送多条通知),可能导致消息错乱或连接异常。ConcurrentWebSocketSessionDecorator内部通过同步机制(如锁)保证并发发送的安全性。 -
消息发送限制:
避免因消息发送超时或数据量过大导致的连接问题:SEND_TIME_LIMIT(5 秒):限制单条消息的发送超时时间,超过则强制关闭连接,防止连接长期阻塞。BUFFER_SIZE_LIMIT(100KB):限制消息发送缓冲区的大小,避免超大消息占用过多内存。
装饰器模式的作用
该类继承自 WebSocketHandlerDecorator(Spring 提供的装饰器基类),通过 “装饰” 原始的 WebSocketHandler(即构造方法中的 delegate),在不修改原始处理器逻辑的前提下,新增了会话管理和并发安全功能。这种设计的优势是:
- 职责分离:原始
WebSocketHandler专注于业务消息处理(如解析消息、调用监听器),装饰器专注于会话管理和安全增强。 - 可扩展性:若未来需要新增会话相关功能(如会话心跳检测),只需再添加一个装饰器,无需修改已有代码。
使用场景举例
在分布式系统的在线聊天功能中:
- 当用户 A 连接到 WebSocket 服务器,
afterConnectionEstablished被调用,会话被包装为线程安全的实例并注册到sessionManager。 - 当用户 B 向 A 发送消息时,服务器从
sessionManager中找到 A 的会话,通过装饰后的线程安全会话发送消息,避免并发问题。 - 当 A 关闭页面,
afterConnectionClosed被调用,会话从sessionManager中移除,其他服务再查询时会知道 A 已离线。
1.2 服务层(核心业务逻辑层)
会话管理
| 组件 | 对应类/接口 | 功能说明 |
|---|---|---|
| 会话管理器接口 | cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager |
定义会话管理规范 |
| 会话管理器实现 | cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl |
基于内存的会话存储 |
| Session工具类 | cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils |
提供用户信息获取方法 |
WebSocketSessionManagerImpl
/**
* 默认的 {@link WebSocketSessionManager} 实现类
* 增删改查连接
*/
public class WebSocketSessionManagerImpl implements WebSocketSessionManager {
/**
* id 与 WebSocketSession 映射
* 按 “会话 ID” 查连接(类似按手机号查联系人)
* key:Session 编号
*/
private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();
/**
* user 与 WebSocketSession 映射
* 按 “用户类型→用户 ID” 查连接(类似按 “用户组→姓名” 查联系人列表)
* key1:用户类型
* key2:用户编号
*/
private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
= new ConcurrentHashMap<>();
@Override
public void addSession(WebSocketSession session) {
// 添加到 idSessions 中
idSessions.put(session.getId(), session);
// 添加到 userSessions 中
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
if (user == null) {
return;
}
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
//第一步:查用户类型的分组,发现没有,创建一个空 Map
if (userSessionsMap == null) {
userSessionsMap = new ConcurrentHashMap<>();
if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
userSessionsMap = userSessions.get(user.getUserType());
}
}
//第二步:在同一个用户类型的分组下,查用户 ID的会话列表,发现没有,创建一个空列表:{用户类型:{用户ID: []}}。
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
if (sessions == null) {
sessions = new CopyOnWriteArrayList<>();
if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
sessions = userSessionsMap.get(user.getId());
}
}
sessions.add(session);
}
@Override
public void removeSession(WebSocketSession session) {
// 移除从 idSessions 中
idSessions.remove(session.getId());
// 移除从 idSessions 中
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
if (user == null) {
return;
}
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
if (userSessionsMap == null) {
return;
}
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
if (CollUtil.isEmpty(sessions)) {
userSessionsMap.remove(user.getId(), sessions);
}
}
@Override
public WebSocketSession getSession(String id) {
return idSessions.get(id);
}
@Override
public Collection<WebSocketSession> getSessionList(Integer userType) {
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
if (CollUtil.isEmpty(userSessionsMap)) {
return new ArrayList<>();
}
LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
Long contextTenantId = TenantContextHolder.getTenantId();
for (List<WebSocketSession> sessions : userSessionsMap.values()) {
if (CollUtil.isEmpty(sessions)) {
continue;
}
// 特殊:如果租户不匹配,则直接排除
if (contextTenantId != null) {
Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
if (!contextTenantId.equals(userTenantId)) {
continue;
}
}
result.addAll(sessions);
}
return result;
}
@Override
public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
if (CollUtil.isEmpty(userSessionsMap)) {
return new ArrayList<>();
}
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
}
}
所有方法都围绕 WebSocketSession 对象工作,这是 Spring WebSocket 对原生 WebSocket 连接的封装。每个 WebSocketSession 对应一个实际的 WebSocket 连接。
addSession(WebSocketSession session)
交互流程:
-
参数传入:当新的 WebSocket 连接建立时,Spring WebSocket 会创建
WebSocketSession对象并传入该方法 -
会话存储:
idSessions.put(session.getId(), session);
-
使用
session.getId()获取连接的唯一ID -
将 session 对象存储到并发映射中
-
-
用户关联:
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
-
从 session 中提取认证用户信息(通常来自握手时的属性)
-
如果用户已认证,将 session 按用户维度存储
-
原生 WebSocket 关联:
-
session.getId()是 WebSocket 连接的唯一标识符 -
WebSocketFrameworkUtils.getLoginUser(session)通常从session.getAttributes()获取握手时存储的用户信息
websocketFrameWorkUtils
/**
* 专属于 web 包的工具类
*/
public class WebSocketFrameworkUtils {
public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER";
/**
* 设置当前用户
*
* @param loginUser 登录用户
* @param attributes Session
*/
public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) {
attributes.put(ATTRIBUTE_LOGIN_USER, loginUser);
}
/**
* 获取当前用户
*
* @return 当前用户
*/
public static LoginUser getLoginUser(WebSocketSession session) {
return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER);
}
/**
* 获得当前用户的编号
*
* @return 用户编号
*/
public static Long getLoginUserId(WebSocketSession session) {
LoginUser loginUser = getLoginUser(session);
return loginUser != null ? loginUser.getId() : null;
}
/**
* 获得当前用户的类型
*
* @return 用户编号
*/
public static Integer getLoginUserType(WebSocketSession session) {
LoginUser loginUser = getLoginUser(session);
return loginUser != null ? loginUser.getUserType() : null;
}
/**
* 获得当前用户的租户编号
*
* @param session Session
* @return 租户编号
*/
public static Long getTenantId(WebSocketSession session) {
LoginUser loginUser = getLoginUser(session);
return loginUser != null ? loginUser.getTenantId() : null;
}
}
(1)直接交互的核心组件
-
WebSocketSession对象
这是 Spring WebSocket 提供的底层会话对象,用于表示一个 WebSocket 连接。工具类通过session.getAttributes()操作会话的属性 map,实现用户信息的存取。
例:getLoginUser(session)就是从WebSocketSession的属性中取出之前存入的LoginUser对象。 -
LoginUser实体类
这是业务层定义的用户信息类(包含用户 ID、用户类型、租户 ID 等)。工具类通过setLoginUser()把用户信息绑定到会话,通过getXxx()方法从会话中提取用户的具体信息(如getLoginUserId()获取用户 ID)。 -
WebSocketSessionManagerImpl会话管理器
之前分析的WebSocketSessionManagerImpl类会直接调用这个工具类:- 例如
addSession方法中,通过WebSocketFrameworkUtils.getLoginUser(session)获取用户信息,才能完成会话的分组存储。 - 又如
getSessionList方法中,通过WebSocketFrameworkUtils.getTenantId(...)获取租户 ID,用于过滤不同租户的会话。
- 例如
(2)间接关联的组件
-
WebSocket 处理器(
WebSocketHandler)
在建立 WebSocket 连接的初期(如afterConnectionEstablished方法),通常会调用setLoginUser()把登录用户信息存入会话属性,后续工具类才能从中获取信息。
例:用户登录后建立 WebSocket 连接时,业务代码会执行WebSocketFrameworkUtils.setLoginUser(loginUser, session.getAttributes())。 -
安全框架(如 Spring Security)
LoginUser通常来自安全框架的认证结果(如用户登录后,安全框架会生成LoginUser对象)。工具类相当于把安全框架的用户信息 “传递” 给 WebSocket 会话。 -
与 WebSocket 底层有直接交互,但仅限于操作
WebSocketSession的属性(session.getAttributes()),这是 Spring WebSocket 框架提供的标准 API,用于在会话中存储自定义数据(如用户信息)。
这种交互是 “表层” 的,不涉及底层网络通信(如 TCP 连接、数据帧解析等),而是利用框架提供的会话属性机制进行业务数据传递。 -
不直接与更底层的网络层(如操作系统、TCP 协议)交互,它的作用是在应用层内部(业务逻辑 ↔ WebSocket 会话)传递数据,是 “应用层工具类”。
WebSocketFrameworkUtils 就像一个 “快递员”:
- 当用户建立 WebSocket 连接时,它把用户信息(
LoginUser)“打包” 存入会话(WebSocketSession)的 “包裹”(attributes)中。 - 当其他组件(如
WebSocketSessionManagerImpl)需要用户信息时,它从会话的 “包裹” 中取出并 “派送” 给调用者。 - 它不负责 “运输”(底层网络通信),只负责 “打包和派送”(用户信息与会话的绑定和解绑),是 WebSocket 业务逻辑中不可或缺的信息传递工具。
消息发送服务
| 组件 | 对应类/接口 | 功能说明 |
|---|---|---|
| 消息发送接口 | cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender |
定义消息发送规范 |
| 本地发送实现 | cn.iocoder.yudao.framework.websocket.core.sender.LocalWebSocketMessageSender |
单机版消息发送 |
| Redis发送实现 | cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender |
集群版Redis广播 |
| RocketMQ发送实现 | cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender |
集群版RocketMQ广播 |
WebSocketMessageSender
/**
* WebSocket 消息的发送器接口
*/
public interface WebSocketMessageSender {
/**
* 发送消息给指定用户
*
* @param userType 用户类型
* @param userId 用户编号
* @param messageType 消息类型
* @param messageContent 消息内容,JSON 格式
*/
void send(Integer userType, Long userId, String messageType, String messageContent);
/**
* 发送消息给指定用户类型
*
* @param userType 用户类型
* @param messageType 消息类型
* @param messageContent 消息内容,JSON 格式
*/
void send(Integer userType, String messageType, String messageContent);
/**
* 发送消息给指定 Session
*
* @param sessionId Session 编号
* @param messageType 消息类型
* @param messageContent 消息内容,JSON 格式
*/
void send(String sessionId, String messageType, String messageContent);
default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
}
default void sendObject(Integer userType, String messageType, Object messageContent) {
send(userType, messageType, JsonUtils.toJsonString(messageContent));
}
default void sendObject(String sessionId, String messageType, Object messageContent) {
send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
}
}
1. WebSocketMessageSender 接口
功能:定义了 WebSocket 消息发送的统一接口,提供三种发送模式:
-
发送给特定用户(userType + userId)
-
发送给某类用户(userType)
-
发送给特定会话(sessionId)
方法解析:
-
send方法系列:发送文本消息(JSON格式) -
sendObject默认方法:发送对象(自动序列化为JSON)
设计特点:
-
统一的发送接口
-
支持方法重载
-
提供对象自动序列化能力
2. AbstractWebSocketMessageSender 抽象类
功能:实现了消息发送的核心逻辑,是具体实现的基类。
核心方法解析
send(String sessionId, Integer userType, Long userId, String messageType, String messageContent)
逻辑流程:
-
根据参数组合确定目标会话:
-
sessionId 优先
-
然后 userType + userId
-
最后仅 userType
-
-
调用
doSend执行实际发送
与底层交互:
-
通过
sessionManager.getSession()和sessionManager.getSessionList()获取目标会话 -
这些方法最终会操作
WebSocketSession对象
doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent)
逻辑流程:
-
构造标准消息格式(类型+内容)
-
序列化为JSON
-
遍历所有会话:
-
检查会话有效性(非空、已打开)
-
发送
TextMessage
-
与底层交互:
session.sendMessage(new TextMessage(payload));
-
直接调用 WebSocketSession 的 sendMessage 方法
-
使用 Spring 的
TextMessage封装消息 -
处理可能的 IOException
LocalWebSocketMessageSender
功能:单机版消息发送器,直接继承抽象类无额外逻辑。
RabbitMQWebSocketMessageSender
功能:基于 RabbitMQ 的分布式消息发送器。
核心方法:
private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage()
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
.setMessageType(messageType).setMessageContent(messageContent);
rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);
}
与底层交互:
-
使用
RabbitTemplate发送消息 -
通过 Topic Exchange 广播
-
消息格式自定义(包含目标信息和内容)
RedisWebSocketMessageSender
功能:基于 Redis 的分布式消息发送器。
核心方法:
private void sendRedisMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
.setMessageType(messageType).setMessageContent(messageContent);
redisMQTemplate.send(mqMessage);
}
与底层交互:
-
使用
RedisMQTemplate发送消息 -
基于 Redis 的发布/订阅机制
RocketMQWebSocketMessageSender
功能:基于 RocketMQ 的分布式消息发送器。
核心方法:
private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
.setMessageType(messageType).setMessageContent(messageContent);
rocketMQTemplate.syncSend(topic, mqMessage);
}
与底层交互:
-
使用
RocketMQTemplate同步发送 -
指定 topic 进行消息路由
1.3 应用层(业务使用层)
(1) 消息监听器
| 组件 | 对应类/接口 | 功能说明 |
|---|---|---|
| 监听器接口 | cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener |
消息处理接口 |
| 示例监听器实现 | cn.iocoder.yudao.module.infra.websocket.listener.DemoWebSocketMessageListener |
演示消息处理 |
1. WebSocketMessageListener 接口
功能:定义了 WebSocket 消息监听器的标准接口,用于处理前端发送的特定类型消息。
方法解析
onMessage(WebSocketSession session, T message)
-
作用:处理收到的 WebSocket 消息
-
参数:
-
session:当前 WebSocket 会话,包含连接信息和用户上下文 -
message:反序列化后的消息对象
-
-
交互点:
-
通过
session可以获取发送方信息 -
通过泛型
T实现类型安全的消息处理
-
getType()
-
作用:返回监听器关注的消息类型
-
关联:与
JsonWebSocketMessage#getType()对应 -
设计意义:
-
实现消息类型到处理器的路由
-
支持多种消息类型的差异化处理
-
泛型设计:<T> 使每个监听器可以处理特定类型的消息对象
DemoWebSocketMessageListener 实现类
功能:处理示例消息("demo-message-send"类型),实现单发和群发逻辑。
核心逻辑解析
消息处理流程 (onMessage 方法)
-
获取发送方信息:
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
-
从 WebSocketSession 中提取当前登录用户ID
-
-
单发消息处理:
if (message.getToUserId() != null) { DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) .setText(message.getText()).setSingle(true); webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), "demo-message-receive", toMessage); return; }-
构造响应消息对象
-
通过
WebSocketMessageSender发送给特定用户
-
-
群发消息处理:
DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) .setText(message.getText()).setSingle(false); webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), "demo-message-receive", toMessage);-
向所有管理员用户广播消息
-
消息类型声明 (getType 方法)
@Override
public String getType() {
return "demo-message-send";
}
-
声明只处理类型为 "demo-message-send" 的消息
关键交互点
-
与 WebSocketSession 的交互:
-
通过
WebSocketFrameworkUtils.getLoginUserId(session)获取认证用户信息 -
会话对象包含所有 WebSocket 连接上下文
-
-
与消息发送器的交互:
@Resource private WebSocketMessageSender webSocketMessageSender; webSocketMessageSender.sendObject(...);
-
使用统一的发送接口响应消息
-
支持单发和群发模式
-
-
消息对象转换:
-
接收
DemoSendMessage类型消息 -
构造并发送
DemoReceiveMessage类型响应
-
(2) 消息发送API
| 组件 | 对应类/接口 | 功能说明 |
|---|---|---|
| WebSocket API | cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi |
供其他模块调用的发送接口 |
| API实现类 | cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApiImpl |
委托给WebSocketMessageSender |
1.4 配置层
| 组件 | 对应类/文件 | 功能说明 |
|---|---|---|
| 自动配置类 | cn.iocoder.yudao.framework.websocket.config.YudaoWebSocketAutoConfiguration |
初始化WebSocket组件 |
| 配置属性类 | cn.iocoder.yudao.framework.websocket.config.YudaoWebSocketProperties |
读取yudao.websocket配置项 |
| 消息处理器配置 | cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler |
消息路由分发 |
JsonWebSocketMessageHandler
JsonWebSocketMessageHandler 是 Spring WebSocket 的核心消息处理器,负责处理 JSON 格式的 WebSocket 消息,并根据消息类型路由到对应的监听器。
核心功能
-
消息协议处理:处理基于 JSON 的 WebSocket 消息协议
-
消息路由:根据消息类型分发到对应的监听器
-
租户隔离:支持多租户环境下的消息处理
-
心跳处理:内置 ping/pong 心跳机制
1. 成员变量
private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
-
作用:维护消息类型(
type)到监听器的映射关系 -
数据结构:使用
HashMap实现 O(1) 复杂度的查找 -
初始化:通过构造函数注入所有监听器并建立映射
2. 构造函数
@SuppressWarnings({"rawtypes", "unchecked"})
public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
listenersList.forEach((Consumer<WebSocketMessageListener>)
listener -> listeners.put(listener.getType(), listener));
}
-
参数:接收所有
WebSocketMessageListener实现类的列表 -
处理逻辑:
-
遍历所有监听器
-
以监听器的
getType()返回值作为 key -
监听器本身作为 value 存入 map
-
-
类型安全:使用泛型和类型擦除处理,需要
@SuppressWarnings
3. 核心方法 handleTextMessage
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception
方法处理流程:
-
空消息检查
if (message.getPayloadLength() == 0) { return; }-
直接忽略空消息
-
-
心跳处理
if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) { session.sendMessage(new TextMessage("pong")); return; }-
响应标准 ping 心跳消息
-
底层交互:直接调用
session.sendMessage()发送 pong 响应
-
-
消息解析
JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);-
将 JSON 字符串解析为
JsonWebSocketMessage对象 -
底层交互:使用
JsonUtils工具类进行 JSON 反序列化
-
-
消息校验
if (jsonMessage == null) { log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload()); return; } if (StrUtil.isEmpty(jsonMessage.getType())) { log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload()); return; }-
确保消息体和消息类型有效
-
-
监听器查找
WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());-
根据消息类型从映射表中查找对应的监听器
-
-
消息内容反序列化
Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);-
获取监听器声明的泛型类型
-
将消息内容反序列化为目标类型
-
底层交互:
-
TypeUtil获取泛型参数类型 -
JsonUtils进行类型化反序列化
-
-
-
租户上下文处理
Long tenantId = WebSocketFrameworkUtils.getTenantId(session); TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));-
从 session 中获取租户 ID
-
在租户上下文中执行监听器的消息处理
-
底层交互:
-
WebSocketFrameworkUtils从 session 属性获取租户信息 -
TenantUtils管理租户上下文
-
-
| 交互点 | 使用的组件 | 目的 |
|---|---|---|
| 消息发送 | WebSocketSession |
响应心跳消息 |
| JSON解析 | JsonUtils |
消息反序列化 |
| 泛型处理 | TypeUtil |
获取监听器的消息类型 |
| 租户管理 | TenantUtils |
租户上下文切换 |
| 属性获取 | WebSocketFrameworkUtils |
从 session 获取租户/用户信息 |
YudaoWebSocketAutoConfiguration
负责注册所有 WebSocket 核心组件,包括:
-
WebSocket 消息处理器(
JsonWebSocketMessageHandler) -
认证拦截器(
LoginUserHandshakeInterceptor) -
会话管理器(
WebSocketSessionManager) -
消息发送器(
WebSocketMessageSender) -
集群支持(Redis/RocketMQ/Kafka 消息广播)
WebSocket 认证拦截器:LoginUserHandshakeInterceptor
-
在 WebSocket 握手阶段进行 Token 认证
-
认证成功后,用户信息存入
WebSocketSession的attributes
WebSocket 会话管理器:WebSocketSessionManager
-
默认使用内存存储会话(
ConcurrentHashMap) -
支持按
sessionId、userType、userId检索会话
WebSocket 消息发送器:类:WebSocketMessageSender(接口) + 多种实现(Local/Redis/RocketMQ)
-
根据
yudao.websocket.sender-type配置动态选择实现 -
支持单机(
local)和集群(redis/rocketmq/kafka)模式
WebSocket 集群消息广播(Redis/RocketMQ/Kafka)
2. WebSocket 连接建立
2.1 前端发起连接
请求示例:
ws://localhost:48080/infra/ws?token=abc123
🔹 流程:
-
浏览器发起 WebSocket 握手请求
-
请求先经过 HTTP 协议层(因为 WebSocket 基于 HTTP 升级)
💡 通俗解释:
就像你想和朋友(服务器)建立电话专线,得先拨号(HTTP握手),说暗号(Token认证)通过后才能通话。
2.2 认证拦截器处理
类:LoginUserHandshakeInterceptor
方法:beforeHandshake()
public boolean beforeHandshake(..., Map<String, Object> attributes) {
String token = request.getParameter("token"); // 从URL取token
LoginUser loginUser = authenticationService.verifyToken(token); // 认证
attributes.put(LOGIN_USER_ATTR, loginUser); // 用户信息存Session
return true; // 允许连接
}
🔹 关键参数:
-
token:从 URL 的 QueryString 获取(WebSocket 不能传 Header) -
LoginUser:认证后用户信息(含userType、userId)
💡 通俗解释:
接线员(拦截器)检查你的身份证(Token),确认合法后把你的档案(用户信息)存到电话系统(Session)里。
2.3 会话管理注册
类:WebSocketSessionHandlerDecorator
方法:afterConnectionEstablished()
public void afterConnectionEstablished(WebSocketSession session) {
sessionManager.addSession(session); // 会话存入管理器
super.afterConnectionEstablished(session);
}
🔹 数据流向:WebSocketSession → WebSocketSessionManagerImpl 的 Map 存储
💡 通俗解释:
电话接通后,客服系统(
SessionManager)把你的电话号码(SessionID)和档案关联起来,方便以后回拨。
3. 消息接收与处理
3.1 前端发送消息
消息格式(JSON):
{"type": "demo", "content": {"text": "Hello"}}
3.2 消息处理器路由
类:JsonWebSocketMessageHandler
方法:handleTextMessage()
public void handleTextMessage(WebSocketSession session, TextMessage message) {
JsonWebSocketMessage wrapper = parseMessage(message); // 解析JSON
WebSocketMessageListener listener = listeners.get(wrapper.getType()); // 找监听器
Object content = convertContent(wrapper.getContent(), listener.getMessageClass());
listener.handleMessage(session, content); // 交给业务监听器
}
🔹 关键参数:
-
type:决定由哪个WebSocketMessageListener处理 -
content:根据监听器定义的 Class 反序列化
💡 通俗解释:
你打电话说“我要订餐”(type=demo),总机(
JsonWebSocketMessageHandler)转接给餐饮部(DemoWebSocketMessageListener)。
3.3 业务监听器处理
类:DemoWebSocketMessageListener
方法:handleMessage()
public void handleMessage(WebSocketSession session, DemoSendMessage message) {
String reply = "响应:" + message.getText();
webSocketMessageSender.sendObject(session.getId(), "demo-reply", reply);
}
💡 通俗解释:
餐饮部接到订单后,做好饭菜(业务处理),叫快递员(
WebSocketMessageSender)送餐。
4. RabbitMQ 消息广播
4.1 发送消息到 RabbitMQ
类:RabbitMQWebSocketMessageSender
方法:send()
public void send(Integer userType, Long userId, String messageType, String messageContent) {
RabbitWebSocketMessage message = new RabbitWebSocketMessage()
.setUserType(userType).setUserId(userId)
.setMessageType(messageType).setMessageContent(messageContent);
rabbitTemplate.convertAndSend(exchange, "", message); // 广播到Exchange
}
🔹 关键参数:
-
exchange:yudao-websocket-exchange(配置项) -
RoutingKey:空字符串(Fanout 广播)
💡 通俗解释:
快递员把包裹(消息)送到物流中心(Exchange),由它分发给所有分店(集群节点)。
4.2 消费消息并推送
类:RabbitMQWebSocketMessageConsumer
方法:onMessage()
@RabbitListener(queues = "${yudao.websocket.sender-rabbitmq.queue}")
public void onMessage(RabbitWebSocketMessage message) {
// 1. 查本地Session
Collection<WebSocketSession> sessions = sessionManager.getSessionList(
message.getUserType(),
message.getUserId()
);
// 2. 推送
for (WebSocketSession session : sessions) {
session.sendMessage(new TextMessage(message.getMessageContent()));
}
}
🔹 关键逻辑:
-
只有目标 Session 在当前节点时才推送
-
其他节点的消息会被忽略
💡 通俗解释:
分店收到包裹后,检查收件人(userId)是否在本店,是则送货上门(WebSocket推送),否则丢弃。
5. 完整流程图示

6. 关键设计总结
-
认证与会话分离:
-
握手时认证(
LoginUserHandshakeInterceptor) -
会话生命周期独立管理(
WebSocketSessionManager)
-
-
消息路由解耦:
-
type字段决定处理逻辑(类似 HTTP 的 URL) -
监听器可插拔(新增业务只需加
WebSocketMessageListener实现)
-
-
集群广播透明化:
-
业务代码无感知(统一调用
WebSocketMessageSender接口) -
RabbitMQ 保证消息可达所有节点
-
-
配置驱动:
-
切换本地/RabbitMQ 只需改
yudao.websocket.sender-type
-
💡 终极比喻:
整个系统像一家连锁餐厅:
前台(WebSocketHandler):接待顾客(消息)并分配包厢(Listener)
厨房(Listener):做菜(业务逻辑)
外卖平台(RabbitMQ):协调各分店(集群节点)同步订单
配送员(Consumer):确保菜品送到正确的餐桌(Session)

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)