1.层级分类

1.1 传输层(Spring WebSocket基础层)

组件 对应类/接口 功能说明
WebSocket协议处理 org.springframework.web.socket 包下的标准类(如TextWebSocketHandler Spring原生WebSocket处理
握手拦截器 cn.iocoder.yudao.framework.websocket.core.interceptor.LoginUserHandshakeInterceptor WebSocket连接时的认证拦截
Session装饰器 cn.iocoder.yudao.framework.websocket.core.handler.WebSocketSessionHandlerDecorator 增强Session生命周期管理

关键代码片段

TextWebSocketHandler

public class TextWebSocketHandler extends AbstractWebSocketHandler {
    public TextWebSocketHandler() {
    }

    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
        try {
            session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Binary messages not supported"));
        } catch (IOException var4) {
        }

    }
}

主要作用:

  1. 处理客户端与服务器之间的文本类型 WebSocket 通信
  2. 拒绝接收二进制消息(通过关闭连接并返回原因)
  3. 作为 WebSocket 连接的核心处理器,管理消息收发逻辑

和端点配置一起使用:
YudaoWebSocketAutoConfiguration中的Bean一起使用:

    @Bean
    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,
                                                   WebSocketHandler webSocketHandler,
                                                   WebSocketProperties webSocketProperties) {
        return registry -> registry
                // 添加 WebSocketHandler
                .addHandler(webSocketHandler, webSocketProperties.getPath())
                .addInterceptors(handshakeInterceptors)
                // 允许跨域,否则前端连接会直接断开
                .setAllowedOriginPatterns("*");
    }
websocket:
    enable: true # websocket的开关
    path: /infra/ws # 指定 WebSocket 服务的连接端点路径,若服务器地址为 localhost:8080,则客户端连接地址为 ws://localhost:8080/infra/ws
    sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq
    sender-rocketmq:
      topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic
      consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group
    sender-rabbitmq:
      exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange
      queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue
    sender-kafka:
      topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic
      consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group

LoginUserHandshakeInterceptor

/**
 * 登录用户的 {@link HandshakeInterceptor} 实现类
 *
 * 流程如下:
 * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
 * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
 */
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) {
        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
        if (loginUser != null) {
            WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
        }
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {
        // do nothing
    }

}

WebSocketSessionHandlerDecorator

WebSocketSessionHandlerDecorator 类是一个基于装饰器模式的 WebSocketHandler 增强类,主要作用是对 WebSocket 连接的会话(WebSocketSession)进行管理和功能增强,确保连接的稳定性和并发安全性。

核心功能解析

该类通过重写 WebSocketHandler 的关键方法,实现了两大核心功能:

1. 管理 WebSocket 会话的生命周期

通过与 WebSocketSessionManager 配合,统一管理客户端连接的会话:

  • 连接建立时(afterConnectionEstablished
    当客户端与服务器成功建立 WebSocket 连接后,将新创建的 WebSocketSession 添加到 sessionManager 中。
    这样,sessionManager 就能维护所有在线客户端的会话列表,方便后续查找特定客户端、群发消息等操作(例如通过用户 ID 定位其会话并推送消息)。

  • 连接关闭时(afterConnectionClosed
    当客户端断开连接(主动关闭或异常断开),从 sessionManager 中移除对应的 WebSocketSession,避免无效会话占用资源,确保会话列表的准确性。

2. 增强会话的并发安全性和消息发送稳定性

通过包装 WebSocketSession 为 ConcurrentWebSocketSessionDecorator,解决两个实际问题:

  • 并发操作安全
    WebSocketSession 本身并非线程安全的,若多个线程同时向同一个会话发送消息(例如服务器同时推送多条通知),可能导致消息错乱或连接异常。
    ConcurrentWebSocketSessionDecorator 内部通过同步机制(如锁)保证并发发送的安全性。

  • 消息发送限制
    避免因消息发送超时或数据量过大导致的连接问题:

    • SEND_TIME_LIMIT(5 秒):限制单条消息的发送超时时间,超过则强制关闭连接,防止连接长期阻塞。
    • BUFFER_SIZE_LIMIT(100KB):限制消息发送缓冲区的大小,避免超大消息占用过多内存。
装饰器模式的作用

该类继承自 WebSocketHandlerDecorator(Spring 提供的装饰器基类),通过 “装饰” 原始的 WebSocketHandler(即构造方法中的 delegate),在不修改原始处理器逻辑的前提下,新增了会话管理和并发安全功能。这种设计的优势是:

  • 职责分离:原始 WebSocketHandler 专注于业务消息处理(如解析消息、调用监听器),装饰器专注于会话管理和安全增强。
  • 可扩展性:若未来需要新增会话相关功能(如会话心跳检测),只需再添加一个装饰器,无需修改已有代码。
使用场景举例

在分布式系统的在线聊天功能中:

  1. 当用户 A 连接到 WebSocket 服务器,afterConnectionEstablished 被调用,会话被包装为线程安全的实例并注册到 sessionManager
  2. 当用户 B 向 A 发送消息时,服务器从 sessionManager 中找到 A 的会话,通过装饰后的线程安全会话发送消息,避免并发问题。
  3. 当 A 关闭页面,afterConnectionClosed 被调用,会话从 sessionManager 中移除,其他服务再查询时会知道 A 已离线。

1.2 服务层(核心业务逻辑层)

会话管理
组件 对应类/接口 功能说明
会话管理器接口 cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager 定义会话管理规范
会话管理器实现 cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl 基于内存的会话存储
Session工具类 cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils 提供用户信息获取方法

WebSocketSessionManagerImpl

/**
 * 默认的 {@link WebSocketSessionManager} 实现类
 * 增删改查连接
 */
public class WebSocketSessionManagerImpl implements WebSocketSessionManager {

    /**
     * id 与 WebSocketSession 映射
     * 按 “会话 ID” 查连接(类似按手机号查联系人)
     * key:Session 编号
     */
    private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();

    /**
     * user 与 WebSocketSession 映射
     * 按 “用户类型→用户 ID” 查连接(类似按 “用户组→姓名” 查联系人列表)
     * key1:用户类型
     * key2:用户编号
     */
    private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
            = new ConcurrentHashMap<>();

    @Override
    public void addSession(WebSocketSession session) {
        // 添加到 idSessions 中
        idSessions.put(session.getId(), session);
        // 添加到 userSessions 中
        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
        if (user == null) {
            return;
        }
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
        //第一步:查用户类型的分组,发现没有,创建一个空 Map
        if (userSessionsMap == null) {
            userSessionsMap = new ConcurrentHashMap<>();
            if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
                userSessionsMap = userSessions.get(user.getUserType());
            }
        }
        //第二步:在同一个用户类型的分组下,查用户 ID的会话列表,发现没有,创建一个空列表:{用户类型:{用户ID: []}}。
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
        if (sessions == null) {
            sessions = new CopyOnWriteArrayList<>();
            if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
                sessions = userSessionsMap.get(user.getId());
            }
        }
        sessions.add(session);
    }

    @Override
    public void removeSession(WebSocketSession session) {
        // 移除从 idSessions 中
        idSessions.remove(session.getId());
        // 移除从 idSessions 中
        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
        if (user == null) {
            return;
        }
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
        if (userSessionsMap == null) {
            return;
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
        sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
        if (CollUtil.isEmpty(sessions)) {
            userSessionsMap.remove(user.getId(), sessions);
        }
    }

    @Override
    public WebSocketSession getSession(String id) {
        return idSessions.get(id);
    }

    @Override
    public Collection<WebSocketSession> getSessionList(Integer userType) {
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
        if (CollUtil.isEmpty(userSessionsMap)) {
            return new ArrayList<>();
        }
        LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
        Long contextTenantId = TenantContextHolder.getTenantId();
        for (List<WebSocketSession> sessions : userSessionsMap.values()) {
            if (CollUtil.isEmpty(sessions)) {
                continue;
            }
            // 特殊:如果租户不匹配,则直接排除
            if (contextTenantId != null) {
                Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
                if (!contextTenantId.equals(userTenantId)) {
                    continue;
                }
            }
            result.addAll(sessions);
        }
        return result;
    }

    @Override
    public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
        if (CollUtil.isEmpty(userSessionsMap)) {
            return new ArrayList<>();
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
        return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
    }

}

所有方法都围绕 WebSocketSession 对象工作,这是 Spring WebSocket 对原生 WebSocket 连接的封装。每个 WebSocketSession 对应一个实际的 WebSocket 连接。

addSession(WebSocketSession session)

交互流程:

  1. 参数传入:当新的 WebSocket 连接建立时,Spring WebSocket 会创建 WebSocketSession 对象并传入该方法

  2. 会话存储

    idSessions.put(session.getId(), session);
    • 使用 session.getId() 获取连接的唯一ID

    • 将 session 对象存储到并发映射中

  3. 用户关联

    LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
    • 从 session 中提取认证用户信息(通常来自握手时的属性)

    • 如果用户已认证,将 session 按用户维度存储

原生 WebSocket 关联

  • session.getId() 是 WebSocket 连接的唯一标识符

  • WebSocketFrameworkUtils.getLoginUser(session) 通常从 session.getAttributes() 获取握手时存储的用户信息

websocketFrameWorkUtils

/**
 * 专属于 web 包的工具类
 */
public class WebSocketFrameworkUtils {

    public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER";

    /**
     * 设置当前用户
     *
     * @param loginUser 登录用户
     * @param attributes Session
     */
    public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) {
        attributes.put(ATTRIBUTE_LOGIN_USER, loginUser);
    }

    /**
     * 获取当前用户
     *
     * @return 当前用户
     */
    public static LoginUser getLoginUser(WebSocketSession session) {
        return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER);
    }

    /**
     * 获得当前用户的编号
     *
     * @return 用户编号
     */
    public static Long getLoginUserId(WebSocketSession session) {
        LoginUser loginUser = getLoginUser(session);
        return loginUser != null ? loginUser.getId() : null;
    }

    /**
     * 获得当前用户的类型
     *
     * @return 用户编号
     */
    public static Integer getLoginUserType(WebSocketSession session) {
        LoginUser loginUser = getLoginUser(session);
        return loginUser != null ? loginUser.getUserType() : null;
    }

    /**
     * 获得当前用户的租户编号
     *
     * @param session Session
     * @return 租户编号
     */
    public static Long getTenantId(WebSocketSession session) {
        LoginUser loginUser = getLoginUser(session);
        return loginUser != null ? loginUser.getTenantId() : null;
    }

}

(1)直接交互的核心组件

  • WebSocketSession 对象
    这是 Spring WebSocket 提供的底层会话对象,用于表示一个 WebSocket 连接。工具类通过 session.getAttributes() 操作会话的属性 map,实现用户信息的存取。
    例:getLoginUser(session) 就是从 WebSocketSession 的属性中取出之前存入的 LoginUser 对象。

  • LoginUser 实体类
    这是业务层定义的用户信息类(包含用户 ID、用户类型、租户 ID 等)。工具类通过 setLoginUser() 把用户信息绑定到会话,通过 getXxx() 方法从会话中提取用户的具体信息(如 getLoginUserId() 获取用户 ID)。

  • WebSocketSessionManagerImpl 会话管理器
    之前分析的 WebSocketSessionManagerImpl 类会直接调用这个工具类:

    • 例如 addSession 方法中,通过 WebSocketFrameworkUtils.getLoginUser(session) 获取用户信息,才能完成会话的分组存储。
    • 又如 getSessionList 方法中,通过 WebSocketFrameworkUtils.getTenantId(...) 获取租户 ID,用于过滤不同租户的会话。

(2)间接关联的组件

  • WebSocket 处理器(WebSocketHandler
    在建立 WebSocket 连接的初期(如 afterConnectionEstablished 方法),通常会调用 setLoginUser() 把登录用户信息存入会话属性,后续工具类才能从中获取信息。
    例:用户登录后建立 WebSocket 连接时,业务代码会执行 WebSocketFrameworkUtils.setLoginUser(loginUser, session.getAttributes())

  • 安全框架(如 Spring Security)
    LoginUser 通常来自安全框架的认证结果(如用户登录后,安全框架会生成 LoginUser 对象)。工具类相当于把安全框架的用户信息 “传递” 给 WebSocket 会话。

  • 与 WebSocket 底层有直接交互,但仅限于操作 WebSocketSession 的属性(session.getAttributes()),这是 Spring WebSocket 框架提供的标准 API,用于在会话中存储自定义数据(如用户信息)。
    这种交互是 “表层” 的,不涉及底层网络通信(如 TCP 连接、数据帧解析等),而是利用框架提供的会话属性机制进行业务数据传递。

  • 不直接与更底层的网络层(如操作系统、TCP 协议)交互,它的作用是在应用层内部(业务逻辑 ↔ WebSocket 会话)传递数据,是 “应用层工具类”。

WebSocketFrameworkUtils 就像一个 “快递员”:

  • 当用户建立 WebSocket 连接时,它把用户信息(LoginUser)“打包” 存入会话(WebSocketSession)的 “包裹”(attributes)中。
  • 当其他组件(如 WebSocketSessionManagerImpl)需要用户信息时,它从会话的 “包裹” 中取出并 “派送” 给调用者。
  • 它不负责 “运输”(底层网络通信),只负责 “打包和派送”(用户信息与会话的绑定和解绑),是 WebSocket 业务逻辑中不可或缺的信息传递工具。
消息发送服务
组件 对应类/接口 功能说明
消息发送接口 cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender 定义消息发送规范
本地发送实现 cn.iocoder.yudao.framework.websocket.core.sender.LocalWebSocketMessageSender 单机版消息发送
Redis发送实现 cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender 集群版Redis广播
RocketMQ发送实现 cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender 集群版RocketMQ广播
WebSocketMessageSender
/**
 * WebSocket 消息的发送器接口
 */
public interface WebSocketMessageSender {

    /**
     * 发送消息给指定用户
     *
     * @param userType 用户类型
     * @param userId 用户编号
     * @param messageType 消息类型
     * @param messageContent 消息内容,JSON 格式
     */
    void send(Integer userType, Long userId, String messageType, String messageContent);

    /**
     * 发送消息给指定用户类型
     *
     * @param userType 用户类型
     * @param messageType 消息类型
     * @param messageContent 消息内容,JSON 格式
     */
    void send(Integer userType, String messageType, String messageContent);

    /**
     * 发送消息给指定 Session
     *
     * @param sessionId Session 编号
     * @param messageType 消息类型
     * @param messageContent 消息内容,JSON 格式
     */
    void send(String sessionId, String messageType, String messageContent);

    default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
        send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
    }

    default void sendObject(Integer userType, String messageType, Object messageContent) {
        send(userType, messageType, JsonUtils.toJsonString(messageContent));
    }

    default void sendObject(String sessionId, String messageType, Object messageContent) {
        send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
    }

}

1. WebSocketMessageSender 接口

功能:定义了 WebSocket 消息发送的统一接口,提供三种发送模式:

  • 发送给特定用户(userType + userId)

  • 发送给某类用户(userType)

  • 发送给特定会话(sessionId)

方法解析

  1. send 方法系列:发送文本消息(JSON格式)

  2. sendObject 默认方法:发送对象(自动序列化为JSON)

设计特点

  • 统一的发送接口

  • 支持方法重载

  • 提供对象自动序列化能力

2. AbstractWebSocketMessageSender 抽象类

功能:实现了消息发送的核心逻辑,是具体实现的基类。

核心方法解析

send(String sessionId, Integer userType, Long userId, String messageType, String messageContent)

逻辑流程

  1. 根据参数组合确定目标会话:

    • sessionId 优先

    • 然后 userType + userId

    • 最后仅 userType

  2. 调用 doSend 执行实际发送

与底层交互

  • 通过 sessionManager.getSession() 和 sessionManager.getSessionList() 获取目标会话

  • 这些方法最终会操作 WebSocketSession 对象

doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent)

逻辑流程

  1. 构造标准消息格式(类型+内容)

  2. 序列化为JSON

  3. 遍历所有会话:

    • 检查会话有效性(非空、已打开)

    • 发送 TextMessage

与底层交互

session.sendMessage(new TextMessage(payload));
  • 直接调用 WebSocketSession 的 sendMessage 方法

  • 使用 Spring 的 TextMessage 封装消息

  • 处理可能的 IOException

LocalWebSocketMessageSender

功能:单机版消息发送器,直接继承抽象类无额外逻辑。

RabbitMQWebSocketMessageSender

功能:基于 RabbitMQ 的分布式消息发送器。

核心方法

private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType,
                               String messageType, String messageContent) {
    RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage()
            .setSessionId(sessionId).setUserId(userId).setUserType(userType)
            .setMessageType(messageType).setMessageContent(messageContent);
    rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);
}

与底层交互

  1. 使用 RabbitTemplate 发送消息

  2. 通过 Topic Exchange 广播

  3. 消息格式自定义(包含目标信息和内容)

RedisWebSocketMessageSender

功能:基于 Redis 的分布式消息发送器。

核心方法

private void sendRedisMessage(String sessionId, Long userId, Integer userType,
                            String messageType, String messageContent) {
    RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
            .setSessionId(sessionId).setUserId(userId).setUserType(userType)
            .setMessageType(messageType).setMessageContent(messageContent);
    redisMQTemplate.send(mqMessage);
}

与底层交互

  1. 使用 RedisMQTemplate 发送消息

  2. 基于 Redis 的发布/订阅机制

RocketMQWebSocketMessageSender

功能:基于 RocketMQ 的分布式消息发送器。

核心方法

private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
                               String messageType, String messageContent) {
    RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
            .setSessionId(sessionId).setUserId(userId).setUserType(userType)
            .setMessageType(messageType).setMessageContent(messageContent);
    rocketMQTemplate.syncSend(topic, mqMessage);
}

与底层交互

  1. 使用 RocketMQTemplate 同步发送

  2. 指定 topic 进行消息路由

1.3 应用层(业务使用层)

(1) 消息监听器
组件 对应类/接口 功能说明
监听器接口 cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener 消息处理接口
示例监听器实现 cn.iocoder.yudao.module.infra.websocket.listener.DemoWebSocketMessageListener 演示消息处理

1. WebSocketMessageListener 接口

功能:定义了 WebSocket 消息监听器的标准接口,用于处理前端发送的特定类型消息。

方法解析

onMessage(WebSocketSession session, T message)

  • 作用:处理收到的 WebSocket 消息

  • 参数

    • session:当前 WebSocket 会话,包含连接信息和用户上下文

    • message:反序列化后的消息对象

  • 交互点

    • 通过 session 可以获取发送方信息

    • 通过泛型 T 实现类型安全的消息处理

getType()

  • 作用:返回监听器关注的消息类型

  • 关联:与 JsonWebSocketMessage#getType() 对应

  • 设计意义

    • 实现消息类型到处理器的路由

    • 支持多种消息类型的差异化处理

泛型设计<T> 使每个监听器可以处理特定类型的消息对象

DemoWebSocketMessageListener 实现类

功能:处理示例消息("demo-message-send"类型),实现单发和群发逻辑。

核心逻辑解析

消息处理流程 (onMessage 方法)

  1. 获取发送方信息

    Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
    • 从 WebSocketSession 中提取当前登录用户ID

  2. 单发消息处理

    if (message.getToUserId() != null) {
        DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
                .setText(message.getText()).setSingle(true);
        webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(),
                "demo-message-receive", toMessage);
        return;
    }
    • 构造响应消息对象

    • 通过 WebSocketMessageSender 发送给特定用户

  3. 群发消息处理

    DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
            .setText(message.getText()).setSingle(false);
    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(),
            "demo-message-receive", toMessage);
    • 向所有管理员用户广播消息

消息类型声明 (getType 方法)

@Override
public String getType() {
    return "demo-message-send";
}
  • 声明只处理类型为 "demo-message-send" 的消息

关键交互点

  1. 与 WebSocketSession 的交互

    • 通过 WebSocketFrameworkUtils.getLoginUserId(session) 获取认证用户信息

    • 会话对象包含所有 WebSocket 连接上下文

  2. 与消息发送器的交互

    @Resource
    private WebSocketMessageSender webSocketMessageSender;
    
    webSocketMessageSender.sendObject(...);
    • 使用统一的发送接口响应消息

    • 支持单发和群发模式

  3. 消息对象转换

    • 接收 DemoSendMessage 类型消息

    • 构造并发送 DemoReceiveMessage 类型响应

(2) 消息发送API
组件 对应类/接口 功能说明
WebSocket API cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi 供其他模块调用的发送接口
API实现类 cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApiImpl 委托给WebSocketMessageSender

1.4 配置层

组件 对应类/文件 功能说明
自动配置类 cn.iocoder.yudao.framework.websocket.config.YudaoWebSocketAutoConfiguration 初始化WebSocket组件
配置属性类 cn.iocoder.yudao.framework.websocket.config.YudaoWebSocketProperties 读取yudao.websocket配置项
消息处理器配置 cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler 消息路由分发

JsonWebSocketMessageHandler

JsonWebSocketMessageHandler 是 Spring WebSocket 的核心消息处理器,负责处理 JSON 格式的 WebSocket 消息,并根据消息类型路由到对应的监听器。

核心功能

  1. 消息协议处理:处理基于 JSON 的 WebSocket 消息协议

  2. 消息路由:根据消息类型分发到对应的监听器

  3. 租户隔离:支持多租户环境下的消息处理

  4. 心跳处理:内置 ping/pong 心跳机制

1. 成员变量

private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
  • 作用:维护消息类型(type)到监听器的映射关系

  • 数据结构:使用 HashMap 实现 O(1) 复杂度的查找

  • 初始化:通过构造函数注入所有监听器并建立映射

2. 构造函数

@SuppressWarnings({"rawtypes", "unchecked"})
public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
    listenersList.forEach((Consumer<WebSocketMessageListener>)
            listener -> listeners.put(listener.getType(), listener));
}
  • 参数:接收所有 WebSocketMessageListener 实现类的列表

  • 处理逻辑

    • 遍历所有监听器

    • 以监听器的 getType() 返回值作为 key

    • 监听器本身作为 value 存入 map

  • 类型安全:使用泛型和类型擦除处理,需要 @SuppressWarnings

3. 核心方法 handleTextMessage

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception

方法处理流程:

  1. 空消息检查

    if (message.getPayloadLength() == 0) {
        return;
    }
    • 直接忽略空消息

  2. 心跳处理

    if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
        session.sendMessage(new TextMessage("pong"));
        return;
    }
    • 响应标准 ping 心跳消息

    • 底层交互:直接调用 session.sendMessage() 发送 pong 响应

  3. 消息解析

    JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
    • 将 JSON 字符串解析为 JsonWebSocketMessage 对象

    • 底层交互:使用 JsonUtils 工具类进行 JSON 反序列化

  4. 消息校验

    if (jsonMessage == null) {
        log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
        return;
    }
    if (StrUtil.isEmpty(jsonMessage.getType())) {
        log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
        return;
    }
    • 确保消息体和消息类型有效

  5. 监听器查找

    WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
    • 根据消息类型从映射表中查找对应的监听器

  6. 消息内容反序列化

    Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
    Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
    • 获取监听器声明的泛型类型

    • 将消息内容反序列化为目标类型

    • 底层交互

      • TypeUtil 获取泛型参数类型

      • JsonUtils 进行类型化反序列化

  7. 租户上下文处理

    Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
    TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
    • 从 session 中获取租户 ID

    • 在租户上下文中执行监听器的消息处理

    • 底层交互

      • WebSocketFrameworkUtils 从 session 属性获取租户信息

      • TenantUtils 管理租户上下文

交互点 使用的组件 目的
消息发送 WebSocketSession 响应心跳消息
JSON解析 JsonUtils 消息反序列化
泛型处理 TypeUtil 获取监听器的消息类型
租户管理 TenantUtils 租户上下文切换
属性获取 WebSocketFrameworkUtils 从 session 获取租户/用户信息

YudaoWebSocketAutoConfiguration

负责注册所有 WebSocket 核心组件,包括:

  1. WebSocket 消息处理器JsonWebSocketMessageHandler

  2. 认证拦截器LoginUserHandshakeInterceptor

  3. 会话管理器WebSocketSessionManager

  4. 消息发送器WebSocketMessageSender

  5. 集群支持(Redis/RocketMQ/Kafka 消息广播)

WebSocket 认证拦截器:LoginUserHandshakeInterceptor
  • 在 WebSocket 握手阶段进行 Token 认证

  • 认证成功后,用户信息存入 WebSocketSession 的 attributes

WebSocket 会话管理器:WebSocketSessionManager
  • 默认使用内存存储会话(ConcurrentHashMap

  • 支持按 sessionIduserTypeuserId 检索会话

WebSocket 消息发送器:类WebSocketMessageSender(接口) + 多种实现(Local/Redis/RocketMQ
  • 根据 yudao.websocket.sender-type 配置动态选择实现

  • 支持单机(local)和集群(redis/rocketmq/kafka)模式

WebSocket 集群消息广播(Redis/RocketMQ/Kafka)

2. WebSocket 连接建立

2.1 前端发起连接

请求示例

ws://localhost:48080/infra/ws?token=abc123

🔹 流程

  1. 浏览器发起 WebSocket 握手请求

  2. 请求先经过 HTTP 协议层(因为 WebSocket 基于 HTTP 升级)

💡 通俗解释

就像你想和朋友(服务器)建立电话专线,得先拨号(HTTP握手),说暗号(Token认证)通过后才能通话。


2.2 认证拦截器处理

LoginUserHandshakeInterceptor
方法beforeHandshake()

public boolean beforeHandshake(..., Map<String, Object> attributes) {
    String token = request.getParameter("token"); // 从URL取token
    LoginUser loginUser = authenticationService.verifyToken(token); // 认证
    attributes.put(LOGIN_USER_ATTR, loginUser); // 用户信息存Session
    return true; // 允许连接
}

🔹 关键参数

  • token:从 URL 的 QueryString 获取(WebSocket 不能传 Header)

  • LoginUser:认证后用户信息(含 userTypeuserId

💡 通俗解释

接线员(拦截器)检查你的身份证(Token),确认合法后把你的档案(用户信息)存到电话系统(Session)里。


2.3 会话管理注册

WebSocketSessionHandlerDecorator
方法afterConnectionEstablished()

public void afterConnectionEstablished(WebSocketSession session) {
    sessionManager.addSession(session); // 会话存入管理器
    super.afterConnectionEstablished(session);
}

🔹 数据流向
WebSocketSession → WebSocketSessionManagerImpl 的 Map 存储

💡 通俗解释

电话接通后,客服系统(SessionManager)把你的电话号码(SessionID)和档案关联起来,方便以后回拨。


3. 消息接收与处理

3.1 前端发送消息

消息格式(JSON):

{"type": "demo", "content": {"text": "Hello"}}

3.2 消息处理器路由

JsonWebSocketMessageHandler
方法handleTextMessage()

public void handleTextMessage(WebSocketSession session, TextMessage message) {
    JsonWebSocketMessage wrapper = parseMessage(message); // 解析JSON
    WebSocketMessageListener listener = listeners.get(wrapper.getType()); // 找监听器
    Object content = convertContent(wrapper.getContent(), listener.getMessageClass());
    listener.handleMessage(session, content); // 交给业务监听器
}

🔹 关键参数

  • type:决定由哪个 WebSocketMessageListener 处理

  • content:根据监听器定义的 Class 反序列化

💡 通俗解释

你打电话说“我要订餐”(type=demo),总机(JsonWebSocketMessageHandler)转接给餐饮部(DemoWebSocketMessageListener)。


3.3 业务监听器处理

DemoWebSocketMessageListener
方法handleMessage()

public void handleMessage(WebSocketSession session, DemoSendMessage message) {
    String reply = "响应:" + message.getText();
    webSocketMessageSender.sendObject(session.getId(), "demo-reply", reply);
}

💡 通俗解释

餐饮部接到订单后,做好饭菜(业务处理),叫快递员(WebSocketMessageSender)送餐。


4. RabbitMQ 消息广播

4.1 发送消息到 RabbitMQ

RabbitMQWebSocketMessageSender
方法send()

public void send(Integer userType, Long userId, String messageType, String messageContent) {
    RabbitWebSocketMessage message = new RabbitWebSocketMessage()
            .setUserType(userType).setUserId(userId)
            .setMessageType(messageType).setMessageContent(messageContent);
    rabbitTemplate.convertAndSend(exchange, "", message); // 广播到Exchange
}

🔹 关键参数

  • exchangeyudao-websocket-exchange(配置项)

  • RoutingKey:空字符串(Fanout 广播)

💡 通俗解释

快递员把包裹(消息)送到物流中心(Exchange),由它分发给所有分店(集群节点)。


4.2 消费消息并推送

RabbitMQWebSocketMessageConsumer
方法onMessage()

@RabbitListener(queues = "${yudao.websocket.sender-rabbitmq.queue}")
public void onMessage(RabbitWebSocketMessage message) {
    // 1. 查本地Session
    Collection<WebSocketSession> sessions = sessionManager.getSessionList(
        message.getUserType(), 
        message.getUserId()
    );
    // 2. 推送
    for (WebSocketSession session : sessions) {
        session.sendMessage(new TextMessage(message.getMessageContent()));
    }
}

🔹 关键逻辑

  • 只有目标 Session 在当前节点时才推送

  • 其他节点的消息会被忽略

💡 通俗解释

分店收到包裹后,检查收件人(userId)是否在本店,是则送货上门(WebSocket推送),否则丢弃。


5. 完整流程图示


6. 关键设计总结

  1. 认证与会话分离

    • 握手时认证(LoginUserHandshakeInterceptor

    • 会话生命周期独立管理(WebSocketSessionManager

  2. 消息路由解耦

    • type 字段决定处理逻辑(类似 HTTP 的 URL)

    • 监听器可插拔(新增业务只需加 WebSocketMessageListener 实现)

  3. 集群广播透明化

    • 业务代码无感知(统一调用 WebSocketMessageSender 接口)

    • RabbitMQ 保证消息可达所有节点

  4. 配置驱动

    • 切换本地/RabbitMQ 只需改 yudao.websocket.sender-type

💡 终极比喻

整个系统像一家连锁餐厅:

  • 前台(WebSocketHandler):接待顾客(消息)并分配包厢(Listener)

  • 厨房(Listener):做菜(业务逻辑)

  • 外卖平台(RabbitMQ):协调各分店(集群节点)同步订单

  • 配送员(Consumer):确保菜品送到正确的餐桌(Session)

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐