基于RabbitMQ的消息提醒系统设计与实现(spring boot + vue2案例)
本文介绍了基于RabbitMQ实现的消息提醒系统设计方案,采用Spring Boot+Vue2技术栈。系统包含三种消息类型:系统通知、站内信和实时弹窗提醒。架构上使用RabbitMQ进行消息路由,通过FanoutExchange实现广播消息,TopicExchange处理定向用户消息。后端实现了RabbitMQ配置、消息模型定义和生产者逻辑,支持消息持久化和动态队列创建。前端通过WebSocket
·
以下是基于RabbitMQ实现消息提醒功能的完整案例,涵盖Java后端与Vue2前端实现,包含架构设计、代码实现及运维配置。
基于RabbitMQ的消息提醒系统设计与实现
一、系统架构设计
1. 业务场景
实现用户级消息提醒功能,支持:
- 系统通知(如登录提醒、订单状态变更)
- 站内信(用户间消息)
- 实时弹窗提醒+历史消息列表
2. 技术栈
- 后端:Spring Boot 2.7.x、Spring AMQP(RabbitMQ客户端)、MySQL、Redis(在线状态缓存)
- 前端:Vue2、Element UI、WebSocket(实时推送)
- 中间件:RabbitMQ 3.12.x(消息路由)、Nginx(WebSocket反向代理)
3. 架构图
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 业务系统 │────>│ RabbitMQ │────>│ 消息服务 │
└─────────────┘ └──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ WebSocket │<────│ MySQL │
│ 推送服务 │ │(消息存储) │
└──────┬──────┘ └─────────────┘
│
▼
┌─────────────┐
│ Vue2前端 │
└─────────────┘
4. RabbitMQ队列设计
| 交换机类型 | 名称 | 作用 | 路由键规则 |
|---|---|---|---|
| FanoutExchange | notify.fanout | 系统广播通知(如全平台公告) | 无(广播) |
| TopicExchange | notify.topic | 定向用户通知 | user.{userId}(如user.1001) |
| 队列 | notify.user.{userId} | 存储用户专属消息 | 绑定到notify.topic |
| 队列 | notify.broadcast | 存储广播消息 | 绑定到notify.fanout |
二、后端实现(Java)
1. 依赖配置(pom.xml)
<dependencies>
<!-- Spring AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
2. RabbitMQ配置
@Configuration
public class RabbitConfig {
// 1. 交换机定义
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("notify.fanout", true, false);
}
@Bean
public TopicExchange userExchange() {
return new TopicExchange("notify.topic", true, false);
}
// 2. 广播队列(持久化,避免消息丢失)
@Bean
public Queue broadcastQueue() {
return QueueBuilder.durable("notify.broadcast")
.withArgument("x-dead-letter-exchange", "notify.dlx") // 死信交换机
.build();
}
// 3. 绑定关系(广播队列绑定到扇形交换机)
@Bean
public Binding broadcastBinding() {
return BindingBuilder.bind(broadcastQueue()).to(broadcastExchange());
}
// 4. 动态创建用户队列(由消息消费者触发)
public Queue userQueue(Long userId) {
return QueueBuilder.durable("notify.user." + userId)
.withArgument("x-expires", 86400000) // 24小时无消息自动删除
.build();
}
// 5. 用户队列绑定到主题交换机
public Binding userBinding(Long userId) {
return BindingBuilder.bind(userQueue(userId))
.to(userExchange())
.with("user." + userId); // 路由键:user.1001
}
}
3. 消息模型定义
@Data
@Entity
@Table(name = "t_notify")
public class NotifyMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId; // 接收用户ID(0表示广播)
private String title;
private String content;
private String type; // SYSTEM/USER
private Boolean read = false;
private LocalDateTime createTime = LocalDateTime.now();
}
// 消息DTO(用于MQ传输)
@Data
public class NotifyDTO implements Serializable {
private Long id;
private Long userId;
private String title;
private String content;
private String type;
}
4. 消息生产者(发送提醒)
@Service
public class NotifyProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private NotifyRepository notifyRepository;
// 发送用户定向消息
public void sendToUser(NotifyDTO dto) {
// 1. 保存到数据库
NotifyMessage message = new NotifyMessage();
BeanUtils.copyProperties(dto, message);
notifyRepository.save(message);
// 2. 发送到RabbitMQ(主题交换机)
rabbitTemplate.convertAndSend(
"notify.topic",
"user." + dto.getUserId(), // 路由键匹配用户队列
dto
);
}
// 发送广播消息
public void sendBroadcast(NotifyDTO dto) {
NotifyMessage message = new NotifyMessage();
BeanUtils.copyProperties(dto, message);
message.setUserId(0L); // 0表示广播
notifyRepository.save(message);
// 发送到扇形交换机(广播)
rabbitTemplate.convertAndSend("notify.fanout", "", dto);
}
}
5. 消息消费者(处理并推送前端)
@Service
public class NotifyConsumer {
@Autowired
private WebSocketServer webSocketServer;
@Autowired
private RabbitAdmin rabbitAdmin; // 用于动态创建队列
@Autowired
private RabbitConfig rabbitConfig;
// 1. 处理用户定向消息(动态队列)
@RabbitListener(queuesToDeclare = @Queue(name = "notify.user.template", autoDelete = "true"))
public void handleUserMessage(NotifyDTO dto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
// 确保用户队列存在(首次接收时创建)
Queue userQueue = rabbitConfig.userQueue(dto.getUserId());
Binding userBinding = rabbitConfig.userBinding(dto.getUserId());
rabbitAdmin.declareQueue(userQueue);
rabbitAdmin.declareBinding(userBinding);
// 通过WebSocket推送给在线用户
webSocketServer.sendToUser(dto.getUserId(), dto);
// 手动确认消息已处理
channel.basicAck(tag, false);
}
// 2. 处理广播消息
@RabbitListener(queues = "notify.broadcast")
public void handleBroadcastMessage(NotifyDTO dto) {
webSocketServer.broadcast(dto); // 推送给所有在线用户
}
}
6. WebSocket实时推送
@ServerEndpoint("/ws/notify/{userId}")
@Component
public class WebSocketServer {
private static ConcurrentHashMap<Long, Session> userSessions = new ConcurrentHashMap<>();
// 连接建立时注册用户
@OnOpen
public void onOpen(Session session, @PathParam("userId") Long userId) {
userSessions.put(userId, session);
}
// 发送给指定用户
public void sendToUser(Long userId, NotifyDTO dto) {
Session session = userSessions.get(userId);
if (session != null && session.isOpen()) {
try {
session.getBasicRemote().sendText(new ObjectMapper().writeValueAsString(dto));
} catch (IOException e) {
log.error("推送失败", e);
}
}
}
// 广播消息
public void broadcast(NotifyDTO dto) {
userSessions.values().forEach(session -> {
try {
if (session.isOpen()) {
session.getBasicRemote().sendText(new ObjectMapper().writeValueAsString(dto));
}
} catch (IOException e) {
log.error("广播失败", e);
}
});
}
}
7. 消息查询接口(供前端拉取历史)
@RestController
@RequestMapping("/api/notify")
public class NotifyController {
@Autowired
private NotifyRepository notifyRepository;
// 分页查询用户消息
@GetMapping("/user/{userId}")
public Page<NotifyMessage> getUserMessages(
@PathVariable Long userId,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
return notifyRepository.findByUserIdOrderByCreateTimeDesc(
userId,
PageRequest.of(page, size)
);
}
// 标记消息为已读
@PutMapping("/read/{id}")
public void markAsRead(@PathVariable Long id) {
notifyRepository.markAsRead(id);
}
}
三、前端实现(Vue2)
1. WebSocket连接管理(src/utils/websocket.js)
import store from '@/store'
let websocket = null
export function initWebSocket() {
const userId = store.getters.userId
if (!userId) return
// 连接WebSocket(通过Nginx反向代理)
const wsUri = process.env.VUE_APP_WS_URI + `/ws/notify/${userId}`
websocket = new WebSocket(wsUri)
websocket.onopen = () => {
console.log('WebSocket连接成功')
}
websocket.onmessage = (event) => {
const message = JSON.parse(event.data)
// 触发消息提醒
store.dispatch('notify/addUnreadMessage', message)
// 显示桌面通知(需用户授权)
showDesktopNotify(message)
}
websocket.onerror = (error) => {
console.error('WebSocket错误:', error)
}
websocket.onclose = () => {
console.log('连接关闭,3秒后重连')
setTimeout(initWebSocket, 3000) // 断线重连
}
}
// 桌面通知
function showDesktopNotify(message) {
if (Notification.permission === 'granted') {
new Notification(message.title, { body: message.content })
} else if (Notification.permission !== 'denied') {
Notification.requestPermission().then(permission => {
if (permission === 'granted') {
new Notification(message.title, { body: message.content })
}
})
}
}
export function closeWebSocket() {
if (websocket) {
websocket.close()
}
}
2. 消息存储与状态管理(src/store/modules/notify.js)
import axios from 'axios'
const state = {
unreadCount: 0,
unreadMessages: [],
historyMessages: []
}
const mutations = {
SET_UNREAD_COUNT(state, count) {
state.unreadCount = count
},
ADD_UNREAD_MESSAGE(state, message) {
state.unreadMessages.unshift(message)
state.unreadCount++
},
SET_HISTORY_MESSAGES(state, messages) {
state.historyMessages = messages
},
MARK_AS_READ(state, id) {
const index = state.unreadMessages.findIndex(m => m.id === id)
if (index !== -1) {
state.unreadMessages.splice(index, 1)
state.unreadCount--
}
}
}
const actions = {
// 获取历史消息
async fetchHistoryMessages({ commit }, { userId, page = 0 }) {
const res = await axios.get(`/api/notify/user/${userId}?page=${page}`)
commit('SET_HISTORY_MESSAGES', res.data.content)
},
// 标记已读
async markAsRead({ commit }, id) {
await axios.put(`/api/notify/read/${id}`)
commit('MARK_AS_READ', id)
}
}
export default {
namespaced: true,
state,
mutations,
actions
}
3. 消息提醒组件(src/components/NotifyPopup.vue)
<template>
<div class="notify-popup" v-if="showPopup">
<div class="notify-header">
<h3>{{ currentMessage.title }}</h3>
<button @click="closePopup">×</button>
</div>
<div class="notify-content">{{ currentMessage.content }}</div>
<div class="notify-footer">
<button @click="markAsRead">已读</button>
</div>
</div>
</template>
<script>
import { mapState, mapActions } from 'vuex'
export default {
data() {
return {
showPopup: false,
currentMessage: null
}
},
computed: {
...mapState('notify', ['unreadMessages'])
},
watch: {
unreadMessages(newVal) {
if (newVal.length > 0 && !this.showPopup) {
this.currentMessage = newVal[0]
this.showPopup = true
}
}
},
methods: {
...mapActions('notify', ['markAsRead']),
closePopup() {
this.showPopup = false
},
async markAsRead() {
await this.markAsRead(this.currentMessage.id)
this.showPopup = false
}
}
}
</script>
<style scoped>
.notify-popup {
position: fixed;
bottom: 20px;
right: 20px;
width: 300px;
border: 1px solid #eee;
border-radius: 4px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.notify-header {
padding: 10px;
background: #f5f5f5;
display: flex;
justify-content: space-between;
}
.notify-content {
padding: 15px;
}
.notify-footer {
padding: 10px;
text-align: right;
}
</style>
4. 消息列表页面(src/views/MessageCenter.vue)
<template>
<div class="message-center">
<el-tabs v-model="activeTab">
<el-tab-pane label="未读消息 ({{ unreadCount }})" name="unread">
<el-list v-for="msg in unreadMessages" :key="msg.id">
<el-list-item>
<el-card>
<div slot="header">{{ msg.title }}</div>
<div>{{ msg.content }}</div>
<el-button @click="markAsRead(msg.id)">标记已读</el-button>
</el-card>
</el-list-item>
</el-list>
</el-tab-pane>
<el-tab-pane label="历史消息" name="history">
<el-pagination @current-change="handlePageChange" :current-page="page" :page-size="20" />
<el-list v-for="msg in historyMessages" :key="msg.id">
<!-- 历史消息展示 -->
</el-list>
</el-tab-pane>
</el-tabs>
</div>
</template>
<script>
// 实现与NotifyPopup类似,调用actions获取历史消息和标记已读
</script>
四、RabbitMQ运维配置
1. 安装与启动(Docker)
# 启动RabbitMQ(带管理界面)
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3.12-management
2. 关键配置(rabbitmq.conf)
# 持久化配置
disk_free_limit.relative = 1.0
# 连接限制(避免恶意连接)
connections.max = 10000
# 内存阈值(超过时换页到磁盘)
vm_memory_high_watermark.relative = 0.7
# 开启死信交换机(处理未消费消息)
# 已在队列定义中配置x-dead-letter-exchange
3. 监控与告警
- 通过管理界面(http://ip:15672)监控队列长度、消息速率
- 配置Prometheus + Grafana监控关键指标(如
rabbitmq_queue_messages_ready) - 当队列长度超过阈值(如10000条)时触发告警(通过Zabbix/AlertManager)
五、功能测试与扩展
1. 测试场景
- 单用户消息:调用
sendToUser发送消息,验证前端实时弹窗+数据库存储 - 广播消息:调用
sendBroadcast,验证所有在线用户接收 - 离线消息:用户离线时发送消息,上线后通过WebSocket重连接收
- 压力测试:使用JMeter模拟1000用户并发发送消息,观察RabbitMQ吞吐量(建议QoS=1)
2. 扩展建议
- 增加消息撤回功能(通过RabbitMQ的
basic.cancel) - 实现消息已读回执(前端确认后更新数据库状态)
- 对高频消息(如通知)做合并展示(前端防抖处理)
总结
本方案基于RabbitMQ的主题交换机和扇形交换机实现了灵活的消息路由,结合WebSocket实现实时推送,同时通过数据库存储保证消息不丢失。后端采用Java Spring Boot实现高可靠性,前端Vue2提供良好的用户体验,适合中小型应用的消息提醒场景。运维层面通过Docker部署和监控配置确保系统稳定性。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)