【分布式】[数据同步]----- 采用Canal 中间件 把 MySQL 数据 同步到 Elasticsearch入门到实战实例详细教程
SpringBoot 3实现MySQL同步Elasticsearch指南 本文介绍了使用SpringBoot 3实现MySQL与Elasticsearch同步的完整方案。主要内容包括: 同步方案选型:对比了Log-Based(Canal)、Poll-Based和数据双写三种方案,推荐使用Canal+Elasticsearch方案,具有准实时、无侵入等优势。 环境配置:详细说明了MySQL开启bin
前言
以下是基于 SpringBoot 3 实现“MySQL 同步 Elasticsearch(ES)”的完整指南,涵盖入门配置、实战场景、底层原理,关键步骤附操作说明与逻辑图,确保细节清晰可落地。
MySQL 同步 Elasticsearch 全指南(SpringBoot 3 版)
一、核心同步方案选型(入门必知)
首先明确主流同步方案的差异,根据业务场景选择,SpringBoot 3 项目中最常用以下两种:
| 方案 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| Log-Based(日志同步) | 解析 MySQL binlog(如 Canal),触发 ES 写入 | 低延迟(准实时)、无侵入 | 需开启 binlog、需维护中间件 | 实时查询(如订单、商品搜索) |
| Poll-Based(轮询同步) | 定时查询 MySQL 变更(如 Quartz + MyBatis) | 实现简单、无需额外中间件 | 高延迟(分钟级)、对 MySQL 有压力 | 非实时场景(如报表统计) |
| 数据双写 | 每次写mysql的时候都需要往ES里插入数据 | 不依赖中间件 | 代码入侵,复杂度高 | 简单单一项目 |
推荐首选:Canal + Elasticsearch 方案(准实时、无业务侵入,SpringBoot 3 生态适配完善),下文重点讲解此方案。
二、环境准备(入门配置)
1. 基础环境版本
- JDK:17+(SpringBoot 3 最低要求)
- SpringBoot:3.0+(示例用 3.2.5)
- MySQL:8.0+(需开启 binlog)
- Elasticsearch:8.0+(需与 Spring Data ES 版本兼容,示例用 8.11.0)
- Canal:1.1.7+(阿里开源的 MySQL binlog 解析工具)
2. 关键配置步骤
(1)MySQL 开启 binlog
修改 MySQL 配置文件 my.cnf(Linux)或 my.ini(Windows),重启 MySQL 生效:
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式(仅记录行变更,Canal 需此模式)
binlog-format=ROW
# 服务 ID(唯一,不能与 Canal 重复)
server-id=1
# 指定同步的数据库(可选,不写则同步所有库)
binlog-do-db=test_db
(2)Canal 配置(监听 MySQL binlog)
- 下载 Canal Server(官网地址),解压后进入
conf目录,复制example为test_db(与 MySQL 库名对应)。 - 修改
conf/test_db/instance.properties核心配置:# MySQL 地址和端口 canal.instance.master.address=127.0.0.1:3306 # MySQL 用户名密码(需有 SELECT、REPLICATION SLAVE 权限) canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 监听的数据库(与 MySQL binlog-do-db 一致) canal.instance.defaultDatabaseName=test_db # 订阅的表(格式:库名.表名,多个用逗号分隔,* 表示所有表) canal.instance.filter.regex=test_db.user,test_db.order - 启动 Canal Server:进入
bin目录,执行startup.bat(Windows)或startup.sh(Linux)。
三、SpringBoot 3 实战开发(核心代码)
1. 依赖引入(pom.xml)
需引入 Spring Data Elasticsearch(操作 ES)、Canal Client(接收 Canal 解析的 binlog 数据):
<<dependencies>
<!-- SpringBoot 父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<!-- Spring Web(可选,用于接口测试) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data Elasticsearch(操作 ES) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- Canal Client(接收 binlog 数据) -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<!-- MySQL 驱动(可选,轮询方案需用) -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
</</dependencies>
2. 配置文件(application.yml)
配置 ES 连接和 Canal 客户端参数:
spring:
# Elasticsearch 配置
elasticsearch:
uris: http://127.0.0.1:9200 # ES 地址(集群用逗号分隔)
username: elastic # ES 用户名(8.0+ 默认有密码)
password: 123456 # ES 密码
# Canal 客户端配置(自定义)
canal:
server: 127.0.0.1:11111 # Canal Server 地址(默认端口 11111)
destination: test_db # 与 Canal 配置的 instance 名一致(test_db)
username: "" # Canal 用户名(默认空)
password: "" # Canal 密码(默认空)
3. 核心代码实现
(1)定义 ES 实体类(与 MySQL 表映射)
以 user 表为例,通过注解指定 ES 索引、字段映射:
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
// indexName:ES 索引名,shards:分片数,replicas:副本数
@Document(indexName = "user_index", shards = 1, replicas = 0)
public class UserEs {
@Id // ES 文档的唯一 ID(对应 MySQL 的主键)
private Long id;
// 字段类型:文本,analyzer:分词器(ik_max_word 适合中文)
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String username;
// 字段类型:关键词(不分词,用于精确查询)
@Field(type = FieldType.Keyword)
private String phone;
// 字段类型:整数
@Field(type = FieldType.Integer)
private Integer age;
// Getter、Setter 省略
}
(2)ES 操作 Repository(Spring Data 风格)
继承 ElasticsearchRepository,无需写 SQL,直接调用内置方法:
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
// 泛型:<ES实体类, 主键类型>
public interface UserEsRepository extends ElasticsearchRepository<UserEs, Long> {
}
(3)Canal 客户端监听(核心:接收 binlog 并同步 ES)
通过 Canal Client 连接 Canal Server,监听 MySQL 表的 INSERT/UPDATE/DELETE 事件,触发 ES 同步:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalSyncService {
// 注入 ES Repository
@Autowired
private UserEsRepository userEsRepository;
// 从配置文件读取 Canal 参数
@Value("${canal.server}")
private String canalServer;
@Value("${canal.destination}")
private String destination;
@Value("${canal.username}")
private String canalUsername;
@Value("${canal.password}")
private String canalPassword;
// Canal 连接器
private CanalConnector canalConnector;
// 项目启动时初始化 Canal 连接(@PostConstruct 注解)
@PostConstruct
public void initCanal() {
// 1. 创建 Canal 连接器
String[] serverArr = canalServer.split(":");
canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(serverArr[0], Integer.parseInt(serverArr[1])),
destination, canalUsername, canalPassword
);
// 2. 启动监听线程(异步处理,避免阻塞主线程)
new Thread(this::startListen, "canal-sync-thread").start();
}
// 监听 Canal 消息(核心逻辑)
private void startListen() {
try {
// 连接 Canal Server
canalConnector.connect();
// 订阅表(与 Canal 配置一致,* 表示所有表)
canalConnector.subscribe("test_db.*");
// 回滚到最新位置(避免重复消费历史数据)
canalConnector.rollback();
// 循环监听消息
while (true) {
// 一次拉取 100 条消息(可根据业务调整)
Message message = canalConnector.getWithoutAck(100);
long batchId = message.getId();
// 有消息则处理
if (batchId != -1 && message.getEntries().size() > 0) {
handleCanalMessage(message.getEntries());
// 确认消费(避免重复消费)
canalConnector.ack(batchId);
} else {
// 无消息时休眠 1 秒,减少 CPU 占用
Thread.sleep(1000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 处理 Canal 消息(解析 binlog 事件,同步到 ES)
private void handleCanalMessage(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
// 过滤非行数据变更的事件(如事务开始/结束)
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
try {
// 解析事件内容(获取表名、操作类型、数据)
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName(); // MySQL 表名
CanalEntry.EventType eventType = rowChange.getEventType(); // 操作类型:INSERT/UPDATE/DELETE
// 只处理 user 表的变更(可扩展到其他表)
if ("user".equals(tableName)) {
handleUserTable(eventType, rowChange.getRowDatasList());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 处理 user 表的变更,同步到 ES
private void handleUserTable(CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas) {
for (CanalEntry.RowData rowData : rowDatas) {
// 1. 解析 MySQL 行数据为 UserEs 对象
UserEs userEs = parseRowDataToUserEs(rowData, eventType);
if (userEs == null) {
continue;
}
// 2. 根据操作类型同步 ES
switch (eventType) {
case INSERT: // 新增:ES 新增文档
userEsRepository.save(userEs);
System.out.println("ES 新增 user:" + userEs.getId());
break;
case UPDATE: // 更新:ES 更新文档(根据 ID)
userEsRepository.save(userEs);
System.out.println("ES 更新 user:" + userEs.getId());
break;
case DELETE: // 删除:ES 删除文档(根据 ID)
userEsRepository.deleteById(userEs.getId());
System.out.println("ES 删除 user:" + userEs.getId());
break;
default:
break;
}
}
}
// 将 Canal RowData 解析为 UserEs 对象
private UserEs parseRowDataToUserEs(CanalEntry.RowData rowData, CanalEntry.EventType eventType) {
UserEs userEs = new UserEs();
// INSERT/UPDATE 取新数据,DELETE 取旧数据
List<CanalEntry.Column> columns = (eventType == CanalEntry.EventType.DELETE)
? rowData.getBeforeColumnsList()
: rowData.getAfterColumnsList();
// 遍历字段,赋值给 UserEs
for (CanalEntry.Column column : columns) {
String columnName = column.getName();
String columnValue = column.getValue();
switch (columnName) {
case "id":
userEs.setId(Long.parseLong(columnValue));
break;
case "username":
userEs.setUsername(columnValue);
break;
case "phone":
userEs.setPhone(columnValue);
break;
case "age":
userEs.setAge(Integer.parseInt(columnValue));
break;
default:
break;
}
}
return userEs;
}
// 项目关闭时关闭 Canal 连接(@PreDestroy 注解)
@PreDestroy
public void closeCanal() {
if (canalConnector != null) {
canalConnector.disconnect();
}
}
}
(4)测试同步效果
- 启动 SpringBoot 项目、Canal Server、ES。
- 在 MySQL 的
test_db.user表执行 SQL:-- 新增一条数据 INSERT INTO user (id, username, phone, age) VALUES (1, "张三", "13800138000", 25); - 验证 ES 是否同步:访问
http://127.0.0.1:9200/user_index/_doc/1,应返回如下数据:{ "_index": "user_index", "_id": "1", "_version": 1, "found": true, "_source": { "id": 1, "username": "张三", "phone": "13800138000", "age": 25 } }
四、使用场景与优化(实战进阶)
1. 典型使用场景
- 电商商品搜索:MySQL 存储商品基础信息,ES 存储商品标题、详情等文本字段,支持关键词搜索、过滤(如价格区间、分类)。
- 日志检索:业务日志写入 MySQL 后同步到 ES,支持按时间、日志级别、关键词快速查询。
- 用户画像查询:用户基本信息、行为数据同步到 ES,支持多维度组合查询(如“20-30岁+北京+近7天登录”)。
2. 关键优化点
(1)避免 ES 重复同步
- 依赖 Canal 的
ack机制:消费成功后再确认ack(batchId),失败则rollback(batchId),下次重新拉取。 - 给 ES 文档加版本号:在
UserEs中加@Version注解,ES 会自动处理版本冲突,避免旧数据覆盖新数据。
(2)降低 MySQL 压力
- Canal 过滤无用表/字段:在
instance.properties中通过canal.instance.filter.regex只监听需要同步的表,减少 binlog 解析量。 - 避免 ES 同步影响业务:Canal 客户端用异步线程处理同步逻辑,不阻塞 MySQL 业务线程。
(3)高可用部署
- Canal 集群:部署多个 Canal Server,通过 ZooKeeper 实现 HA,避免单点故障。
- ES 集群:部署 ES 集群(至少 3 节点),开启副本,确保数据不丢失。
五、底层原理(从 MySQL 到 ES 的数据流向)
1. 整体数据流向图
MySQL → binlog → Canal Server → Canal Client(SpringBoot) → Elasticsearch
- MySQL 生成 binlog:MySQL 执行
INSERT/UPDATE/DELETE后,将行变更记录到 binlog(ROW 模式下记录完整行数据)。 - Canal Server 模拟从库:Canal Server 伪装成 MySQL 的从库,向 MySQL 发送
dump命令,获取 binlog 并解析为结构化数据(如RowData)。 - Canal Client 接收数据:SpringBoot 中的 Canal Client 连接 Canal Server,拉取解析后的 binlog 数据(按
batchId批量拉取)。 - 同步到 ES:Canal Client 解析
RowData为 ES 实体类,调用ElasticsearchRepository的save/delete方法,将数据写入 ES。
2. Canal 解析 binlog 核心原理
- MySQL binlog 结构:每个 binlog 文件包含多个
Event,ROW模式下的Write_rows_event/Update_rows_event/Delete_rows_event对应表的增删改操作。 - Canal 解析过程:
- 读取 binlog 文件头,获取 binlog 格式、版本等信息。
- 解析每个
Event,提取表名、操作类型、行数据(before旧数据、after新数据)。 - 将解析结果封装为
Message对象,推送给 Canal Client。
六、问题排查(常见坑与解决)
-
Canal 连接 MySQL 失败:
- 检查 MySQL 用户名密码是否有
REPLICATION SLAVE权限:执行GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';。 - 检查 MySQL binlog 是否开启:执行
show variables like 'log_bin';,确保值为ON。
- 检查 MySQL 用户名密码是否有
-
ES 同步后查不到数据:
- 检查 ES 索引是否存在:执行
GET /user_index,不存在则需手动创建(或开启 Spring Data ES 的自动创建索引,需配置spring.data.elasticsearch.repositories.enabled=true)。 - 检查分词器:若用
ik分词器,需确保 ES 已安装 ik 插件(安装教程)。
- 检查 ES 索引是否存在:执行
-
Canal 重复消费数据:
- 确保消费成功后调用
ack(batchId),失败调用rollback(batchId)。 - 避免 Canal Client 频繁重启:重启时会从上次
ack的位置继续消费,若未ack则重复拉取。
- 确保消费成功后调用
通过以上步骤,可基于 SpringBoot 3 实现 MySQL 到 ES 的稳定同步,从入门配置到实战优化、底层原理全覆盖,可根据实际业务调整表映射、同步策略和部署架构。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)