前言

以下是基于 SpringBoot 3 实现“MySQL 同步 Elasticsearch(ES)”的完整指南,涵盖入门配置、实战场景、底层原理,关键步骤附操作说明与逻辑图,确保细节清晰可落地。

MySQL 同步 Elasticsearch 全指南(SpringBoot 3 版)

一、核心同步方案选型(入门必知)

首先明确主流同步方案的差异,根据业务场景选择,SpringBoot 3 项目中最常用以下两种:

方案 原理 优点 缺点 适用场景
Log-Based(日志同步) 解析 MySQL binlog(如 Canal),触发 ES 写入 低延迟(准实时)、无侵入 需开启 binlog、需维护中间件 实时查询(如订单、商品搜索)
Poll-Based(轮询同步) 定时查询 MySQL 变更(如 Quartz + MyBatis) 实现简单、无需额外中间件 高延迟(分钟级)、对 MySQL 有压力 非实时场景(如报表统计)
数据双写 每次写mysql的时候都需要往ES里插入数据 不依赖中间件 代码入侵,复杂度高 简单单一项目

推荐首选:Canal + Elasticsearch 方案(准实时、无业务侵入,SpringBoot 3 生态适配完善),下文重点讲解此方案。

二、环境准备(入门配置)

1. 基础环境版本

  • JDK:17+(SpringBoot 3 最低要求)
  • SpringBoot:3.0+(示例用 3.2.5)
  • MySQL:8.0+(需开启 binlog)
  • Elasticsearch:8.0+(需与 Spring Data ES 版本兼容,示例用 8.11.0)
  • Canal:1.1.7+(阿里开源的 MySQL binlog 解析工具)

2. 关键配置步骤

(1)MySQL 开启 binlog

修改 MySQL 配置文件 my.cnf(Linux)或 my.ini(Windows),重启 MySQL 生效:

[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式(仅记录行变更,Canal 需此模式)
binlog-format=ROW
# 服务 ID(唯一,不能与 Canal 重复)
server-id=1
# 指定同步的数据库(可选,不写则同步所有库)
binlog-do-db=test_db
(2)Canal 配置(监听 MySQL binlog)
  1. 下载 Canal Server(官网地址),解压后进入 conf 目录,复制 exampletest_db(与 MySQL 库名对应)。
  2. 修改 conf/test_db/instance.properties 核心配置:
    # MySQL 地址和端口
    canal.instance.master.address=127.0.0.1:3306
    # MySQL 用户名密码(需有 SELECT、REPLICATION SLAVE 权限)
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    # 监听的数据库(与 MySQL binlog-do-db 一致)
    canal.instance.defaultDatabaseName=test_db
    # 订阅的表(格式:库名.表名,多个用逗号分隔,* 表示所有表)
    canal.instance.filter.regex=test_db.user,test_db.order
    
  3. 启动 Canal Server:进入 bin 目录,执行 startup.bat(Windows)或 startup.sh(Linux)。

三、SpringBoot 3 实战开发(核心代码)

1. 依赖引入(pom.xml)

需引入 Spring Data Elasticsearch(操作 ES)、Canal Client(接收 Canal 解析的 binlog 数据):

<<dependencies>
    <!-- SpringBoot 父依赖 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.5</version>
        <relativePath/>
    </parent>

    <!-- Spring Web(可选,用于接口测试) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Data Elasticsearch(操作 ES) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>

    <!-- Canal Client(接收 binlog 数据) -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.7</version>
    </dependency>

    <!-- MySQL 驱动(可选,轮询方案需用) -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
</</dependencies>

2. 配置文件(application.yml)

配置 ES 连接和 Canal 客户端参数:

spring:
  # Elasticsearch 配置
  elasticsearch:
    uris: http://127.0.0.1:9200  # ES 地址(集群用逗号分隔)
    username: elastic             # ES 用户名(8.0+ 默认有密码)
    password: 123456              # ES 密码

# Canal 客户端配置(自定义)
canal:
  server: 127.0.0.1:11111  # Canal Server 地址(默认端口 11111)
  destination: test_db     # 与 Canal 配置的 instance 名一致(test_db)
  username: ""             # Canal 用户名(默认空)
  password: ""             # Canal 密码(默认空)

3. 核心代码实现

(1)定义 ES 实体类(与 MySQL 表映射)

user 表为例,通过注解指定 ES 索引、字段映射:

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

// indexName:ES 索引名,shards:分片数,replicas:副本数
@Document(indexName = "user_index", shards = 1, replicas = 0)
public class UserEs {
    @Id  // ES 文档的唯一 ID(对应 MySQL 的主键)
    private Long id;

    // 字段类型:文本,analyzer:分词器(ik_max_word 适合中文)
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String username;

    // 字段类型:关键词(不分词,用于精确查询)
    @Field(type = FieldType.Keyword)
    private String phone;

    // 字段类型:整数
    @Field(type = FieldType.Integer)
    private Integer age;

    // Getter、Setter 省略
}
(2)ES 操作 Repository(Spring Data 风格)

继承 ElasticsearchRepository,无需写 SQL,直接调用内置方法:

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

// 泛型:<ES实体类, 主键类型>
public interface UserEsRepository extends ElasticsearchRepository<UserEs, Long> {
}
(3)Canal 客户端监听(核心:接收 binlog 并同步 ES)

通过 Canal Client 连接 Canal Server,监听 MySQL 表的 INSERT/UPDATE/DELETE 事件,触发 ES 同步:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalSyncService {
    // 注入 ES Repository
    @Autowired
    private UserEsRepository userEsRepository;

    // 从配置文件读取 Canal 参数
    @Value("${canal.server}")
    private String canalServer;
    @Value("${canal.destination}")
    private String destination;
    @Value("${canal.username}")
    private String canalUsername;
    @Value("${canal.password}")
    private String canalPassword;

    // Canal 连接器
    private CanalConnector canalConnector;

    // 项目启动时初始化 Canal 连接(@PostConstruct 注解)
    @PostConstruct
    public void initCanal() {
        // 1. 创建 Canal 连接器
        String[] serverArr = canalServer.split(":");
        canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(serverArr[0], Integer.parseInt(serverArr[1])),
                destination, canalUsername, canalPassword
        );

        // 2. 启动监听线程(异步处理,避免阻塞主线程)
        new Thread(this::startListen, "canal-sync-thread").start();
    }

    // 监听 Canal 消息(核心逻辑)
    private void startListen() {
        try {
            // 连接 Canal Server
            canalConnector.connect();
            // 订阅表(与 Canal 配置一致,* 表示所有表)
            canalConnector.subscribe("test_db.*");
            // 回滚到最新位置(避免重复消费历史数据)
            canalConnector.rollback();

            // 循环监听消息
            while (true) {
                // 一次拉取 100 条消息(可根据业务调整)
                Message message = canalConnector.getWithoutAck(100);
                long batchId = message.getId();

                // 有消息则处理
                if (batchId != -1 && message.getEntries().size() > 0) {
                    handleCanalMessage(message.getEntries());
                    // 确认消费(避免重复消费)
                    canalConnector.ack(batchId);
                } else {
                    // 无消息时休眠 1 秒,减少 CPU 占用
                    Thread.sleep(1000);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 处理 Canal 消息(解析 binlog 事件,同步到 ES)
    private void handleCanalMessage(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 过滤非行数据变更的事件(如事务开始/结束)
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }

            try {
                // 解析事件内容(获取表名、操作类型、数据)
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                String tableName = entry.getHeader().getTableName(); // MySQL 表名
                CanalEntry.EventType eventType = rowChange.getEventType(); // 操作类型:INSERT/UPDATE/DELETE

                // 只处理 user 表的变更(可扩展到其他表)
                if ("user".equals(tableName)) {
                    handleUserTable(eventType, rowChange.getRowDatasList());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 处理 user 表的变更,同步到 ES
    private void handleUserTable(CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas) {
        for (CanalEntry.RowData rowData : rowDatas) {
            // 1. 解析 MySQL 行数据为 UserEs 对象
            UserEs userEs = parseRowDataToUserEs(rowData, eventType);
            if (userEs == null) {
                continue;
            }

            // 2. 根据操作类型同步 ES
            switch (eventType) {
                case INSERT: // 新增:ES 新增文档
                    userEsRepository.save(userEs);
                    System.out.println("ES 新增 user:" + userEs.getId());
                    break;
                case UPDATE: // 更新:ES 更新文档(根据 ID)
                    userEsRepository.save(userEs);
                    System.out.println("ES 更新 user:" + userEs.getId());
                    break;
                case DELETE: // 删除:ES 删除文档(根据 ID)
                    userEsRepository.deleteById(userEs.getId());
                    System.out.println("ES 删除 user:" + userEs.getId());
                    break;
                default:
                    break;
            }
        }
    }

    // 将 Canal RowData 解析为 UserEs 对象
    private UserEs parseRowDataToUserEs(CanalEntry.RowData rowData, CanalEntry.EventType eventType) {
        UserEs userEs = new UserEs();
        // INSERT/UPDATE 取新数据,DELETE 取旧数据
        List<CanalEntry.Column> columns = (eventType == CanalEntry.EventType.DELETE) 
                ? rowData.getBeforeColumnsList() 
                : rowData.getAfterColumnsList();

        // 遍历字段,赋值给 UserEs
        for (CanalEntry.Column column : columns) {
            String columnName = column.getName();
            String columnValue = column.getValue();

            switch (columnName) {
                case "id":
                    userEs.setId(Long.parseLong(columnValue));
                    break;
                case "username":
                    userEs.setUsername(columnValue);
                    break;
                case "phone":
                    userEs.setPhone(columnValue);
                    break;
                case "age":
                    userEs.setAge(Integer.parseInt(columnValue));
                    break;
                default:
                    break;
            }
        }
        return userEs;
    }

    // 项目关闭时关闭 Canal 连接(@PreDestroy 注解)
    @PreDestroy
    public void closeCanal() {
        if (canalConnector != null) {
            canalConnector.disconnect();
        }
    }
}
(4)测试同步效果
  1. 启动 SpringBoot 项目、Canal Server、ES。
  2. 在 MySQL 的 test_db.user 表执行 SQL:
    -- 新增一条数据
    INSERT INTO user (id, username, phone, age) VALUES (1, "张三", "13800138000", 25);
    
  3. 验证 ES 是否同步:访问 http://127.0.0.1:9200/user_index/_doc/1,应返回如下数据:
    {
      "_index": "user_index",
      "_id": "1",
      "_version": 1,
      "found": true,
      "_source": {
        "id": 1,
        "username": "张三",
        "phone": "13800138000",
        "age": 25
      }
    }
    

四、使用场景与优化(实战进阶)

1. 典型使用场景

  • 电商商品搜索:MySQL 存储商品基础信息,ES 存储商品标题、详情等文本字段,支持关键词搜索、过滤(如价格区间、分类)。
  • 日志检索:业务日志写入 MySQL 后同步到 ES,支持按时间、日志级别、关键词快速查询。
  • 用户画像查询:用户基本信息、行为数据同步到 ES,支持多维度组合查询(如“20-30岁+北京+近7天登录”)。

2. 关键优化点

(1)避免 ES 重复同步
  • 依赖 Canal 的 ack 机制:消费成功后再确认 ack(batchId),失败则 rollback(batchId),下次重新拉取。
  • 给 ES 文档加版本号:在 UserEs 中加 @Version 注解,ES 会自动处理版本冲突,避免旧数据覆盖新数据。
(2)降低 MySQL 压力
  • Canal 过滤无用表/字段:在 instance.properties 中通过 canal.instance.filter.regex 只监听需要同步的表,减少 binlog 解析量。
  • 避免 ES 同步影响业务:Canal 客户端用异步线程处理同步逻辑,不阻塞 MySQL 业务线程。
(3)高可用部署
  • Canal 集群:部署多个 Canal Server,通过 ZooKeeper 实现 HA,避免单点故障。
  • ES 集群:部署 ES 集群(至少 3 节点),开启副本,确保数据不丢失。

五、底层原理(从 MySQL 到 ES 的数据流向)

1. 整体数据流向图

MySQL → binlog → Canal Server → Canal Client(SpringBoot) → Elasticsearch
  1. MySQL 生成 binlog:MySQL 执行 INSERT/UPDATE/DELETE 后,将行变更记录到 binlog(ROW 模式下记录完整行数据)。
  2. Canal Server 模拟从库:Canal Server 伪装成 MySQL 的从库,向 MySQL 发送 dump 命令,获取 binlog 并解析为结构化数据(如 RowData)。
  3. Canal Client 接收数据:SpringBoot 中的 Canal Client 连接 Canal Server,拉取解析后的 binlog 数据(按 batchId 批量拉取)。
  4. 同步到 ES:Canal Client 解析 RowData 为 ES 实体类,调用 ElasticsearchRepositorysave/delete 方法,将数据写入 ES。

2. Canal 解析 binlog 核心原理

  • MySQL binlog 结构:每个 binlog 文件包含多个 EventROW 模式下的 Write_rows_event/Update_rows_event/Delete_rows_event 对应表的增删改操作。
  • Canal 解析过程:
    1. 读取 binlog 文件头,获取 binlog 格式、版本等信息。
    2. 解析每个 Event,提取表名、操作类型、行数据(before 旧数据、after 新数据)。
    3. 将解析结果封装为 Message 对象,推送给 Canal Client。

六、问题排查(常见坑与解决)

  1. Canal 连接 MySQL 失败

    • 检查 MySQL 用户名密码是否有 REPLICATION SLAVE 权限:执行 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    • 检查 MySQL binlog 是否开启:执行 show variables like 'log_bin';,确保值为 ON
  2. ES 同步后查不到数据

    • 检查 ES 索引是否存在:执行 GET /user_index,不存在则需手动创建(或开启 Spring Data ES 的自动创建索引,需配置 spring.data.elasticsearch.repositories.enabled=true)。
    • 检查分词器:若用 ik 分词器,需确保 ES 已安装 ik 插件(安装教程)。
  3. Canal 重复消费数据

    • 确保消费成功后调用 ack(batchId),失败调用 rollback(batchId)
    • 避免 Canal Client 频繁重启:重启时会从上次 ack 的位置继续消费,若未 ack 则重复拉取。

通过以上步骤,可基于 SpringBoot 3 实现 MySQL 到 ES 的稳定同步,从入门配置到实战优化、底层原理全覆盖,可根据实际业务调整表映射、同步策略和部署架构。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐