AQChat:从服务启动到消息处理全流程

AQChat 是一个基于 Netty + WebSocket 的高性能即时通讯系统,采用 Protobuf 二进制协议实现消息编解码,支持文本、图片、文
件、音频、视频等多种消息类型,具备消息提醒、消息撤回重新编辑、艾特成员/AI助手等功能。集成阿里百炼、Gitee AI 等多个 AI
平台,支持多轮对话、文本转语音、文本转图片等功能。

1. 服务启动流程

启动入口

  • AQChatApplication.java 是 SpringBoot 应用的主类,通过 SpringApplication.run() 方法启动应用

Netty 服务器初始化

AQChatNettyStarter.java 负责初始化 Netty 服务器:

  • 创建 ServerBootstrap 实例
  • 配置 bossGroupworkGroup 线程池
  • 设置 childHandler 处理连接
  • 绑定 WebSocket 端口

消息处理器注册

  • AQChatCommandHandler.java 注册为全局指令处理器,负责处理所有客户端请求
  • MessageRecognizer.java 在启动时初始化,建立消息命令与消息体的映射关系

2. 消息发送流程

客户端发送消息

  • 客户端通过 WebSocket 发送消息,消息格式为 Protobuf 序列化数据

Netty 通道处理

  • MessageDecoder.java 解码消息,将二进制数据转换为 `SendMsgCmd对象
  • AQChatCommandHandler.java 接收到消息后,通过 commandHandler 获取对应的处理器

指令处理器调用

SendMsgCmdHandler.java 处理消息发送请求:

  • 验证用户登录状态和房间状态
  • 生成消息ID并保存到 Redis
  • 通过 MQ 发送消息

消息存储

  • MqSendingAgent.javastoreMessage() 方法将消息发送到 MQ 的 STORE_MESSAGE_TOPIC
  • StoreMessageReceiver.java 接收存储消息,调用 messageService.saveMessage() 保存到数据库

3. 消息广播流程

MQ 消费者接收

  • MessageBroadcastReceiver.java 接收广播消息,使用 DefaultMQPushConsumer 订阅 SEND_MESSAGE_TOPIC 主题
  • 处理消息并广播给所有在线用户

全局通道管理

  • GlobalChannelHolder.java 管理所有用户的连接通道,维护 CHANNELS 映射表
  • 提供 sendBroadcastMessage() 方法向房间内所有用户发送消息

消息广播实现

  • MessageBroadcaster.java 实现消息广播,使用 ChannelGroup 管理房间内的所有通道
  • 提供 broadcast() 方法向所有在线用户发送消息

4. AI 消息处理流程

AI 服务调用

  • GiteeAiService.java 调用 Gitee AI 服务,使用 giteeAiClient 发送请求
  • 处理流式响应,通过 globalChannelHolder.sendBroadcastAiMessage() 广播 AI 回复

AI 消息处理

  • AiHelperReceiver.java 接收 AI 助手消息,使用 aiService.streamCallWithMessage() 处理流式输出
  • 通过 globalChannelHolder.sendBroadcastAiMessage() 广播 AI 回复

5. 数据存储流程

消息持久化

  • StoreMessageReceiver.java 接收存储消息,调用 messageService.saveMessage() 保存消息
  • IAQMessageMapper.java 提供数据库接口,messageMapper.insert() 插入数据库

数据库操作

  • AqMessage.java 是消息实体类,映射到 aq_message
  • IAQMessageService.java 提供业务逻辑接口,saveMessage() 方法调用 Mapper 插入数据

6. 用户状态管理

用户登录

  • UserLoginCmdHandler.java 处理用户登录,生成用户ID并保存到 Redis
  • 将用户通道加入全局通道持有者

用户退出

  • UserLogoutCmdHandler.java 处理用户退出,清除用户信息,移除用户通道

心跳检测

  • HeartBeatHandler.java 实现心跳检测,使用 IdleStateHandler 监控连接状态,超时自动断开连接
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐