以下是基于RabbitMQ实现消息提醒功能的完整案例,涵盖Java后端与Vue2前端实现,包含架构设计、代码实现及运维配置。

基于RabbitMQ的消息提醒系统设计与实现

一、系统架构设计

1. 业务场景

实现用户级消息提醒功能,支持:

  • 系统通知(如登录提醒、订单状态变更)
  • 站内信(用户间消息)
  • 实时弹窗提醒+历史消息列表

2. 技术栈

  • 后端:Spring Boot 2.7.x、Spring AMQP(RabbitMQ客户端)、MySQL、Redis(在线状态缓存)
  • 前端:Vue2、Element UI、WebSocket(实时推送)
  • 中间件:RabbitMQ 3.12.x(消息路由)、Nginx(WebSocket反向代理)

3. 架构图

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   业务系统   │────>│  RabbitMQ   │────>│  消息服务    │
└─────────────┘     └──────┬──────┘     └──────┬──────┘
                           │                   │
                           ▼                   ▼
                    ┌─────────────┐     ┌─────────────┐
                    │ WebSocket   │<────│   MySQL     │
                    │ 推送服务     │     │(消息存储)  │
                    └──────┬──────┘     └─────────────┘
                           │
                           ▼
                    ┌─────────────┐
                    │  Vue2前端   │
                    └─────────────┘

4. RabbitMQ队列设计

交换机类型 名称 作用 路由键规则
FanoutExchange notify.fanout 系统广播通知(如全平台公告) 无(广播)
TopicExchange notify.topic 定向用户通知 user.{userId}(如user.1001)
队列 notify.user.{userId} 存储用户专属消息 绑定到notify.topic
队列 notify.broadcast 存储广播消息 绑定到notify.fanout

二、后端实现(Java)

1. 依赖配置(pom.xml)

<dependencies>
    <!-- Spring AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <!-- 数据库 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
</dependencies>

2. RabbitMQ配置

@Configuration
public class RabbitConfig {
    // 1. 交换机定义
    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange("notify.fanout", true, false);
    }

    @Bean
    public TopicExchange userExchange() {
        return new TopicExchange("notify.topic", true, false);
    }

    // 2. 广播队列(持久化,避免消息丢失)
    @Bean
    public Queue broadcastQueue() {
        return QueueBuilder.durable("notify.broadcast")
                .withArgument("x-dead-letter-exchange", "notify.dlx") // 死信交换机
                .build();
    }

    // 3. 绑定关系(广播队列绑定到扇形交换机)
    @Bean
    public Binding broadcastBinding() {
        return BindingBuilder.bind(broadcastQueue()).to(broadcastExchange());
    }

    // 4. 动态创建用户队列(由消息消费者触发)
    public Queue userQueue(Long userId) {
        return QueueBuilder.durable("notify.user." + userId)
                .withArgument("x-expires", 86400000) // 24小时无消息自动删除
                .build();
    }

    // 5. 用户队列绑定到主题交换机
    public Binding userBinding(Long userId) {
        return BindingBuilder.bind(userQueue(userId))
                .to(userExchange())
                .with("user." + userId); // 路由键:user.1001
    }
}

3. 消息模型定义

@Data
@Entity
@Table(name = "t_notify")
public class NotifyMessage {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long userId; // 接收用户ID(0表示广播)
    private String title;
    private String content;
    private String type; // SYSTEM/USER
    private Boolean read = false;
    private LocalDateTime createTime = LocalDateTime.now();
}

// 消息DTO(用于MQ传输)
@Data
public class NotifyDTO implements Serializable {
    private Long id;
    private Long userId;
    private String title;
    private String content;
    private String type;
}

4. 消息生产者(发送提醒)

@Service
public class NotifyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private NotifyRepository notifyRepository;

    // 发送用户定向消息
    public void sendToUser(NotifyDTO dto) {
        // 1. 保存到数据库
        NotifyMessage message = new NotifyMessage();
        BeanUtils.copyProperties(dto, message);
        notifyRepository.save(message);
        // 2. 发送到RabbitMQ(主题交换机)
        rabbitTemplate.convertAndSend(
                "notify.topic", 
                "user." + dto.getUserId(), // 路由键匹配用户队列
                dto
        );
    }

    // 发送广播消息
    public void sendBroadcast(NotifyDTO dto) {
        NotifyMessage message = new NotifyMessage();
        BeanUtils.copyProperties(dto, message);
        message.setUserId(0L); // 0表示广播
        notifyRepository.save(message);
        // 发送到扇形交换机(广播)
        rabbitTemplate.convertAndSend("notify.fanout", "", dto);
    }
}

5. 消息消费者(处理并推送前端)

@Service
public class NotifyConsumer {
    @Autowired
    private WebSocketServer webSocketServer;
    @Autowired
    private RabbitAdmin rabbitAdmin; // 用于动态创建队列
    @Autowired
    private RabbitConfig rabbitConfig;

    // 1. 处理用户定向消息(动态队列)
    @RabbitListener(queuesToDeclare = @Queue(name = "notify.user.template", autoDelete = "true"))
    public void handleUserMessage(NotifyDTO dto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        // 确保用户队列存在(首次接收时创建)
        Queue userQueue = rabbitConfig.userQueue(dto.getUserId());
        Binding userBinding = rabbitConfig.userBinding(dto.getUserId());
        rabbitAdmin.declareQueue(userQueue);
        rabbitAdmin.declareBinding(userBinding);

        // 通过WebSocket推送给在线用户
        webSocketServer.sendToUser(dto.getUserId(), dto);
        // 手动确认消息已处理
        channel.basicAck(tag, false);
    }

    // 2. 处理广播消息
    @RabbitListener(queues = "notify.broadcast")
    public void handleBroadcastMessage(NotifyDTO dto) {
        webSocketServer.broadcast(dto); // 推送给所有在线用户
    }
}

6. WebSocket实时推送

@ServerEndpoint("/ws/notify/{userId}")
@Component
public class WebSocketServer {
    private static ConcurrentHashMap<Long, Session> userSessions = new ConcurrentHashMap<>();

    // 连接建立时注册用户
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Long userId) {
        userSessions.put(userId, session);
    }

    // 发送给指定用户
    public void sendToUser(Long userId, NotifyDTO dto) {
        Session session = userSessions.get(userId);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(new ObjectMapper().writeValueAsString(dto));
            } catch (IOException e) {
                log.error("推送失败", e);
            }
        }
    }

    // 广播消息
    public void broadcast(NotifyDTO dto) {
        userSessions.values().forEach(session -> {
            try {
                if (session.isOpen()) {
                    session.getBasicRemote().sendText(new ObjectMapper().writeValueAsString(dto));
                }
            } catch (IOException e) {
                log.error("广播失败", e);
            }
        });
    }
}

7. 消息查询接口(供前端拉取历史)

@RestController
@RequestMapping("/api/notify")
public class NotifyController {
    @Autowired
    private NotifyRepository notifyRepository;

    // 分页查询用户消息
    @GetMapping("/user/{userId}")
    public Page<NotifyMessage> getUserMessages(
            @PathVariable Long userId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        return notifyRepository.findByUserIdOrderByCreateTimeDesc(
                userId, 
                PageRequest.of(page, size)
        );
    }

    // 标记消息为已读
    @PutMapping("/read/{id}")
    public void markAsRead(@PathVariable Long id) {
        notifyRepository.markAsRead(id);
    }
}

三、前端实现(Vue2)

1. WebSocket连接管理(src/utils/websocket.js)

import store from '@/store'

let websocket = null

export function initWebSocket() {
  const userId = store.getters.userId
  if (!userId) return

  // 连接WebSocket(通过Nginx反向代理)
  const wsUri = process.env.VUE_APP_WS_URI + `/ws/notify/${userId}`
  websocket = new WebSocket(wsUri)

  websocket.onopen = () => {
    console.log('WebSocket连接成功')
  }

  websocket.onmessage = (event) => {
    const message = JSON.parse(event.data)
    // 触发消息提醒
    store.dispatch('notify/addUnreadMessage', message)
    // 显示桌面通知(需用户授权)
    showDesktopNotify(message)
  }

  websocket.onerror = (error) => {
    console.error('WebSocket错误:', error)
  }

  websocket.onclose = () => {
    console.log('连接关闭,3秒后重连')
    setTimeout(initWebSocket, 3000) // 断线重连
  }
}

// 桌面通知
function showDesktopNotify(message) {
  if (Notification.permission === 'granted') {
    new Notification(message.title, { body: message.content })
  } else if (Notification.permission !== 'denied') {
    Notification.requestPermission().then(permission => {
      if (permission === 'granted') {
        new Notification(message.title, { body: message.content })
      }
    })
  }
}

export function closeWebSocket() {
  if (websocket) {
    websocket.close()
  }
}

2. 消息存储与状态管理(src/store/modules/notify.js)

import axios from 'axios'

const state = {
  unreadCount: 0,
  unreadMessages: [],
  historyMessages: []
}

const mutations = {
  SET_UNREAD_COUNT(state, count) {
    state.unreadCount = count
  },
  ADD_UNREAD_MESSAGE(state, message) {
    state.unreadMessages.unshift(message)
    state.unreadCount++
  },
  SET_HISTORY_MESSAGES(state, messages) {
    state.historyMessages = messages
  },
  MARK_AS_READ(state, id) {
    const index = state.unreadMessages.findIndex(m => m.id === id)
    if (index !== -1) {
      state.unreadMessages.splice(index, 1)
      state.unreadCount--
    }
  }
}

const actions = {
  // 获取历史消息
  async fetchHistoryMessages({ commit }, { userId, page = 0 }) {
    const res = await axios.get(`/api/notify/user/${userId}?page=${page}`)
    commit('SET_HISTORY_MESSAGES', res.data.content)
  },
  // 标记已读
  async markAsRead({ commit }, id) {
    await axios.put(`/api/notify/read/${id}`)
    commit('MARK_AS_READ', id)
  }
}

export default {
  namespaced: true,
  state,
  mutations,
  actions
}

3. 消息提醒组件(src/components/NotifyPopup.vue)

<template>
  <div class="notify-popup" v-if="showPopup">
    <div class="notify-header">
      <h3>{{ currentMessage.title }}</h3>
      <button @click="closePopup">×</button>
    </div>
    <div class="notify-content">{{ currentMessage.content }}</div>
    <div class="notify-footer">
      <button @click="markAsRead">已读</button>
    </div>
  </div>
</template>

<script>
import { mapState, mapActions } from 'vuex'

export default {
  data() {
    return {
      showPopup: false,
      currentMessage: null
    }
  },
  computed: {
    ...mapState('notify', ['unreadMessages'])
  },
  watch: {
    unreadMessages(newVal) {
      if (newVal.length > 0 && !this.showPopup) {
        this.currentMessage = newVal[0]
        this.showPopup = true
      }
    }
  },
  methods: {
    ...mapActions('notify', ['markAsRead']),
    closePopup() {
      this.showPopup = false
    },
    async markAsRead() {
      await this.markAsRead(this.currentMessage.id)
      this.showPopup = false
    }
  }
}
</script>

<style scoped>
.notify-popup {
  position: fixed;
  bottom: 20px;
  right: 20px;
  width: 300px;
  border: 1px solid #eee;
  border-radius: 4px;
  box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.notify-header {
  padding: 10px;
  background: #f5f5f5;
  display: flex;
  justify-content: space-between;
}
.notify-content {
  padding: 15px;
}
.notify-footer {
  padding: 10px;
  text-align: right;
}
</style>

4. 消息列表页面(src/views/MessageCenter.vue)

<template>
  <div class="message-center">
    <el-tabs v-model="activeTab">
      <el-tab-pane label="未读消息 ({{ unreadCount }})" name="unread">
        <el-list v-for="msg in unreadMessages" :key="msg.id">
          <el-list-item>
            <el-card>
              <div slot="header">{{ msg.title }}</div>
              <div>{{ msg.content }}</div>
              <el-button @click="markAsRead(msg.id)">标记已读</el-button>
            </el-card>
          </el-list-item>
        </el-list>
      </el-tab-pane>
      <el-tab-pane label="历史消息" name="history">
        <el-pagination @current-change="handlePageChange" :current-page="page" :page-size="20" />
        <el-list v-for="msg in historyMessages" :key="msg.id">
          <!-- 历史消息展示 -->
        </el-list>
      </el-tab-pane>
    </el-tabs>
  </div>
</template>

<script>
// 实现与NotifyPopup类似,调用actions获取历史消息和标记已读
</script>

四、RabbitMQ运维配置

1. 安装与启动(Docker)

# 启动RabbitMQ(带管理界面)
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3.12-management

2. 关键配置(rabbitmq.conf)

# 持久化配置
disk_free_limit.relative = 1.0

# 连接限制(避免恶意连接)
connections.max = 10000

# 内存阈值(超过时换页到磁盘)
vm_memory_high_watermark.relative = 0.7

# 开启死信交换机(处理未消费消息)
# 已在队列定义中配置x-dead-letter-exchange

3. 监控与告警

  • 通过管理界面(http://ip:15672)监控队列长度、消息速率
  • 配置Prometheus + Grafana监控关键指标(如rabbitmq_queue_messages_ready
  • 当队列长度超过阈值(如10000条)时触发告警(通过Zabbix/AlertManager)

五、功能测试与扩展

1. 测试场景

  • 单用户消息:调用sendToUser发送消息,验证前端实时弹窗+数据库存储
  • 广播消息:调用sendBroadcast,验证所有在线用户接收
  • 离线消息:用户离线时发送消息,上线后通过WebSocket重连接收
  • 压力测试:使用JMeter模拟1000用户并发发送消息,观察RabbitMQ吞吐量(建议QoS=1)

2. 扩展建议

  • 增加消息撤回功能(通过RabbitMQ的basic.cancel
  • 实现消息已读回执(前端确认后更新数据库状态)
  • 对高频消息(如通知)做合并展示(前端防抖处理)

总结

本方案基于RabbitMQ的主题交换机和扇形交换机实现了灵活的消息路由,结合WebSocket实现实时推送,同时通过数据库存储保证消息不丢失。后端采用Java Spring Boot实现高可靠性,前端Vue2提供良好的用户体验,适合中小型应用的消息提醒场景。运维层面通过Docker部署和监控配置确保系统稳定性。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐