使用canal+rocketMQ将Mysql增量数据同步到ElasticSearch
saveToEs做了异常处理,是因为高版本的es会出现返回结果无法解析的问题,虽然更新成功了但一直报错导致消息无法成功消费(实际上已经消费了),这里取巧不让它报错。如果出现jvm问题,是因为rocketmq和java版本冲突,可以把java降至Java8。去logs/example/example.log查看是否报错,启动成功会有日志。rocketmq配置和es配置,canal没有配置。流程较长,
·
版本:
canal:1.1.7
rocketMQ:4.9.4
Mysql:8.0
ElasticSearch:9.0.3
springboot:2.7.5
首先启动rocketmq
环境变量

cd /d D:\rocketmq-all-4.9.4-bin-release\bin
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
如果出现jvm问题,是因为rocketmq和java版本冲突,可以把java降至Java8
可以配置新环境变量
然后在runbroker、runserver中修改java_home

启动mysql
mysql -u 用户名 -p;
启动canal
配置canal
conf/canal.properties
canal.destinations = example
rocketmq.producer.group = 消费者组名
conf/example/instance.properties
#如果没有就加入
canal.instance.mysql.slaveId=1234
canal.instance.dbUsername=你的用户名
canal.instance.dbPassword=你的密码
#可以去mysql注册一个专门供canal使用的用户
#设置监听表,也可以默认监听所有表,不做修改
canal.instance.filter.regex=表名\\..*
#设置topic
canal.mq.topic=你的topic
启动bin/startup.bat
去logs/example/example.log查看是否报错,启动成功会有日志

启动ElasticSearch和Kibana
自行启动
springboot集成
pom文件
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<!--rocketmq-springboot启动类-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!--rocketmq客户端-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<!--远程调用依赖,自己调试时报错加上的-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>4.9.4</version>
</dependency>
<!--es-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
配置文件(properties)
用的yml文件,自行更替即可
rocketmq配置和es配置,canal没有配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=community_producer
rocketmq.producer.topic=example
rocketmq.producer.send-message-timeout=2000
rocketmq.producer.retry-times-when-send-failed=1
rocketmq.producer.retry-times-when-send-async-failed=1
rocketmq.consumer.group=community_consumer
spring.elasticsearch.uris=http://localhost:9200
spring.data.elasticsearch.repositories.enabled=true
model
mysqlModel
@Data
@RequiredArgsConstructor
public class Question{
private Long id;
private String title;
private String description;
private String tag;
private Long createTime;
private Long updateTime;
private Long creatorId;
private Integer readingNum;
private Integer commentNum;
private Long likeNum;
}
esModel,修改索引为自己的索引名
@Data
@RequiredArgsConstructor
@Document(indexName = "questions")
public class QuestionEs implements Serializable {
@Id
private Long id;
private String title;
private String description;
private String tag;
@Field(type = FieldType.Date)
private Long createTime;
@Field(type = FieldType.Date)
private Long updateTime;
private Long creatorId;
private Integer readingNum;
private Integer commentNum;
private Long likeNum;
}
canalModel
@Data
@RequiredArgsConstructor
public class CanalDTO implements Serializable {
private List<Question> data;
private String database;
private Long es;
private JSONArray old;
private String table;
private Long ts;
private String type;
}
repository
@Repository
public interface EsRepository extends ElasticsearchRepository<QuestionEs, String> {
}
监听并同步方法
saveToEs做了异常处理,是因为高版本的es会出现返回结果无法解析的问题,虽然更新成功了但一直报错导致消息无法成功消费(实际上已经消费了),这里取巧不让它报错。
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer.group}",
topic = "${rocketmq.producer.topic}"
)
public class MqSyncMsgConsumer implements RocketMQListener<String> {
private final Logger logger = LoggerFactory.getLogger("es_sync_logger");
@Autowired
private EsRepository esRepository;
@Override
public void onMessage(String msg) {
System.out.println("接收到消息 -> " + msg);
CanalDTO canalDTO = JSON.parseObject(msg, CanalDTO.class);
String table = canalDTO.getTable();
String type = canalDTO.getType();
System.out.println(type);
List<Question> data = canalDTO.getData();
if(data.isEmpty()){
throw new RuntimeException("canal监听到消息,但为空");
}
for (Question question : data) {
QuestionEs questionEs = new QuestionEs();
if ("UPDATE".equals(type) && "question".equals(table)){
Optional<QuestionEs> esOptional = esRepository.findById(question.getId().toString());
if (esOptional.isPresent()) {
QuestionEs esTmp = esOptional.get();
saveToEs(question,esTmp);
} else {
saveToEs(question, questionEs);
}
logger.info("id = {} 更新es成功", questionEs.getId());
}else if("INSERT".equals(type) && "question".equals(table)){
saveToEs(question, questionEs);
}
}
}
private void saveToEs(Question question, QuestionEs questionEs) {
BeanUtils.copyProperties(question, questionEs);
try {
esRepository.save(questionEs);
}catch (Exception e){
String err = e.getMessage();
if (!err.contains("200 OK")){
logger.error("es操作文档失败,异常信息:", e);
throw new RuntimeException(e);
}
}
}
}
结果
测试一下update

控制台
kibana控制台

流程较长,如有纰漏敬请指出。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)