用idea进行数据同步
本文介绍了RabbitMQ消息队列的配置和使用方法。首先需要在YAML文件中配置RabbitMQ连接参数(主机、端口、用户名密码等),然后通过MqConstants类定义交换机和队列名称。文章详细说明了两种实现方式:基于@Bean的配置类方式集中管理队列和交换机绑定关系,以及基于@RabbitListener注解的灵活监听方式。在业务场景中,通过rabbitTemplate发送新增/修改/删除消息
-
声明对列和交换机
你需要先在yaml文件当中进行rabbitmq的相关配置
rabbitmq:
host:192.168.150.101 //消息件的地址
port:5672 //端口数据
username:itcast //用户名
password:123321 //密码
virtual-host:/ //虚拟机主机名
声明队列交换机,创建新的工具类,定义不同功能的交换机
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
-
生产者发送消息时,指定交换机为
HOTEL_EXCHANGE、路由键为HOTEL_INSERT_KEY等。 -
消费者声明队列并绑定到
HOTEL_EXCHANGE交换机,通过对应的路由键(HOTEL_INSERT_KEY/HOTEL_DELETE_KEY)来订阅特定业务(新增修改 / 删除 )的消息 。
定义队列交换机的对象和绑定关系:
-
基于@Bean的方式
这种方式适合在配置类当中集中管理队列,交换机,绑定关系的 Bean 定义,结构清晰,便于统一维护。
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; // 配置类注解,让 Spring 扫描并加载这些 Bean 定义 @Configuration public class RabbitMqConfig { // 1. 定义交换机(对应 MqConstants 里的交换机) @Bean public TopicExchange hotelExchange() { // 创建一个 Topic 类型的交换机,名称用 MqConstants 里的 return new TopicExchange(MqConstants.HOTEL_EXCHANGE); } // 2. 定义“新增和修改”队列 @Bean public Queue hotelInsertQueue() { return new Queue(MqConstants.HOTEL_INSERT_QUEUE); } // 3. 定义“删除”队列 @Bean public Queue hotelDeleteQueue() { return new Queue(MqConstants.HOTEL_DELETE_QUEUE); } // 4. 绑定“新增和修改”队列到交换机(设置路由键) @Bean public Binding bindHotelInsertQueue(TopicExchange hotelExchange, Queue hotelInsertQueue) { // 用 MqConstants 里的新增路由键,将队列绑定到交换机 return BindingBuilder.bind(hotelInsertQueue) .to(hotelExchange) .with(MqConstants.HOTEL_INSERT_KEY); } // 5. 绑定“删除”队列到交换机(设置路由键) @Bean public Binding bindHotelDeleteQueue(TopicExchange hotelExchange, Queue hotelDeleteQueue) { // 用 MqConstants 里的删除路由键,将队列绑定到交换机 return BindingBuilder.bind(hotelDeleteQueue) .to(hotelExchange) .with(MqConstants.HOTEL_DELETE_KEY); } }-
交换机类型:这里用了
TopicExchange(主题交换机 ),和MqConstants里hotel.topic对应,也可以根据实际需求换成DirectExchange(直连 )、FanoutExchange(扇形 / 广播 )等。 -
Bean 依赖注入:绑定方法(如
bindHotelInsertQueue)的参数,会由 Spring 自动注入对应的交换机、队列 Bean,要保证方法参数名或类型能匹配上容器里的 Bean 。 -
路由键作用:通过
with方法指定路由键,这样交换机就会根据路由键,把不同业务(新增修改、删除 )的消息路由到对应的队列。
-
基于注解的方式
这种方式更灵活,通常在消费者监听方法上直接声明队列、交换机和绑定关系,适合快速开发简单场景,或者临时新增队列绑定的情况。
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component // 让 Spring 扫描到这个组件 public class HotelMqListener { // 监听“新增和修改”队列,同时声明队列、交换机、绑定关系 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE, durable = "true"), // 声明队列,durable = true 表示持久化 exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = "topic"), // 声明交换机,类型 topic key = MqConstants.HOTEL_INSERT_KEY // 路由键 )) public void handleHotelInsert(String message) { // 这里写接收到“新增或修改”消息后的业务逻辑,比如更新 Elasticsearch 酒店数据 System.out.println("收到酒店新增/修改消息:" + message); } // 监听“删除”队列,同时声明队列、交换机、绑定关系 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE, durable = "true"), exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = "topic"), key = MqConstants.HOTEL_DELETE_KEY )) public void handleHotelDelete(String message) { // 这里写接收到“删除”消息后的业务逻辑,比如从 Elasticsearch 移除酒店数据 System.out.println("收到酒店删除消息:" + message); } }-
注解参数说明:
-
@Queue:声明队列,name指定队列名称,durable设置是否持久化(建议生产环境设为true,防止 RabbitMQ 重启队列丢失 )。 -
@Exchange:声明交换机,name是交换机名称,type指定类型(如topic、direct、fanout等 )。 -
@QueueBinding:把队列、交换机、路由键绑定起来,让交换机知道如何把消息路由到这个队列。
-
-
消费者方法:
handleHotelInsert和handleHotelDelete方法就是实际处理消息的逻辑,当对应的队列有消息进来时,方法会被触发执行。
以下为Rabbitmq的消息发送的代码,当我们在进行增删改等操作的时候,我们只需在其方法内进行消息的发送
// 新增酒店 @PostMapping public void saveHotel(@RequestBody Hotel hotel) { hotelService.save(hotel); // 发送新增消息,用新增路由键 rabbitTemplate.convertAndSend( MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId() ); } // 修改酒店 @PutMapping public void updateById(@RequestBody Hotel hotel) { if (Objects.isNull(hotel.getId())) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); // 发送修改消息,用修改路由键 rabbitTemplate.convertAndSend( MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_UPDATE_KEY, hotel.getId() ); } // 删除酒店 @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); // 发送删除消息,用删除路由键 rabbitTemplate.convertAndSend( MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id ); } }当我们进行消息的发送个时候,我们所需要的参数有。交换机的名字。交换机的路由。需要发送的消息。
-
-
消息发送之后,再进行消息的监听。
消息的监听的时候,需要先进行接口的定义,分别定义新增修改和删除的业务接口,可以使用RabbitListener注解进行消息的监听。
/**
* 监听酒店新增或修改队列:从数据库查询最新数据,同步到 Elasticsearch
*
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
@Transactional
public void listenHotelInsertOrUpdate(Long id) {
hotelRepository.save(hotel);
}
/**
* 监听酒店删除队列:从 Elasticsearch 删除对应酒店数据
* @param id 酒店ID
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
@Transactional
public void listenHotelDelete(Long id) {
// 从 Elasticsearch 删除酒店
hotelRepository.deleteById(id);
System.out.println("酒店删除,同步 Elasticsearch 成功,酒店ID:" + id);
}
定义过接口之后,我们就可以进行方法的实现
@Override
public void deleteById(Long id) {
// 1. 准备Request:创建删除请求,指定索引和文档ID
DeleteRequest request = new DeleteRequest(INDEX_NAME, id.toString());
// 2. 准备发送请求:执行删除操作(这里直接执行,也可做一些请求参数的额外设置,比如超时等)
try {
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
if (response.getResult() != null) {
System.out.println("Elasticsearch 文档删除成功,文档ID:" + id + ",结果:" + response.getResult());
} else {
System.err.println("Elasticsearch 文档删除失败,文档ID:" + id);
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("删除 Elasticsearch 文档时发生异常,文档ID:" + id + ",异常信息:" + e.getMessage());
}
}
@Override
public void insertById(Long id) {
// 0. 根据id查询酒店数据:从数据库查询
Hotel hotel = hotelDbService.getHotelById(id); // 需实现该方法,返回 Hotel 实体
if (hotel == null) {
System.err.println("根据ID查询酒店数据为空,ID:" + id);
return;
}
// 1. 准备Request:创建索引请求,指定索引
IndexRequest request = new IndexRequest(INDEX_NAME);
request.id(id.toString()); // 设置文档ID,与酒店ID对应
try {
// 将 Hotel 对象转为 JSON 字符串,作为文档内容
String hotelJson = objectMapper.writeValueAsString(hotel);
request.source(hotelJson, XContentType.JSON);
// 2. 准备DSL:这里 DSL 已经通过 request.source 等方式设置好了,
// 若是复杂场景,可继续添加路由、超时等参数,比如 request.routing("...")
// 3. 发送请求:执行索引(新增/更新)操作
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println("Elasticsearch 文档插入/更新成功,文档ID:" + id + ",结果:" + response.getResult());
} catch (Exception e) {
e.printStackTrace();
System.err.println("插入/更新 Elasticsearch 文档时发生异常,文档ID:" + id + ",异常信息:" + e.getMessage());
}
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)