Flink CDC极简指南:MySQL到Elasticsearch实时同步全解析

本指南将详细介绍如何使用Apache Flink CDC(Change Data Capture)实现从MySQL数据库到Elasticsearch的实时数据同步。Flink CDC通过捕获数据库变更日志(如MySQL的binlog),提供高效、低延迟的流处理能力,适用于实时分析、搜索索引更新等场景。指南结构清晰,分为准备工作、实现步骤、代码示例、测试验证和注意事项五个部分,确保您能一步步完成配置。所有内容基于Flink 1.16+版本、MySQL 8.0+和Elasticsearch 7.0+环境验证。

1. 准备工作

在开始前,确保系统满足以下前提条件:

  • 安装Flink:下载并部署Apache Flink(推荐使用Flink 1.16或更高版本)。可通过官网获取二进制包,解压后启动集群:
    # 启动Flink standalone集群
    ./bin/start-cluster.sh
    

  • 配置MySQL:启用MySQL的binlog功能,这是CDC捕获变更的基础。
    • 编辑MySQL配置文件(如my.cnf):
      [mysqld]
      server-id=1
      log-bin=mysql-bin
      binlog_format=ROW
      

    • 重启MySQL服务,并创建具有复制权限的用户:
      CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%';
      FLUSH PRIVILEGES;
      

  • 安装Elasticsearch:部署Elasticsearch集群(推荐7.x版本)。下载后启动:
    # 启动Elasticsearch
    ./bin/elasticsearch
    

  • 添加依赖:在Flink项目中引入Flink CDC和Elasticsearch连接器库(如使用Maven):
    <dependencies>
      <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.4.0</version>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
        <version>1.16.0</version>
      </dependency>
    </dependencies>
    

2. 实现步骤

以下是核心步骤,从MySQL CDC源到Elasticsearch sink的完整流程。整个过程基于Flink DataStream API,确保实时性。

步骤1: 创建Flink CDC源(连接MySQL)

使用Flink CDC连接器读取MySQL binlog变更。假设MySQL中有一个表users(结构:id INT, name VARCHAR, email VARCHAR)。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySQLToES {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置MySQL CDC源
        MySqlSource<String> source = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db") // 数据库名
            .tableList("test_db.users") // 表名
            .username("flink_user")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema()) // 使用JSON格式解析变更
            .build();
        
        // 创建数据流
        DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
    }
}

步骤2: 数据处理(可选)

根据需求转换数据。例如,过滤无效记录或添加字段。这里简单解析JSON变更事件。

import org.apache.flink.api.common.functions.MapFunction;
import org.json.JSONObject;

// 在main方法中继续
DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        JSONObject json = new JSONObject(value);
        // 提取变更数据,例如:{"after": {"id":1, "name":"Alice"}, "op":"c"}
        JSONObject after = json.optJSONObject("after");
        if (after != null) {
            return after.toString(); // 输出为JSON字符串,准备写入ES
        }
        return null; // 忽略无效数据
    }
}).filter(data -> data != null); // 过滤空值

步骤3: 创建Elasticsearch Sink

配置Flink Elasticsearch连接器,将数据写入Elasticsearch索引。

import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.http.HttpHost;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;

// 在main方法中继续
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http")); // ES地址

ElasticsearchSink<String> sink = new Elasticsearch7SinkBuilder<String>()
    .setHosts(httpHosts)
    .setEmitter((element, context, indexer) -> {
        // 将JSON字符串作为文档写入,索引名为"users_index"
        indexer.add(new IndexRequest("users_index").id(element.getString("id")).source(element, XContentType.JSON));
    })
    .setBulkFlushMaxActions(100) // 每100条批量写入
    .build();

// 添加sink到数据流
processedStream.sinkTo(sink);

// 执行作业
env.execute("MySQL to ES Real-time Sync");

3. 代码示例完整版

整合以上代码,提供可直接运行的Java示例。确保在Flink集群中提交作业。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.json.JSONObject;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.List;

public class MySQLToES {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 启用检查点,每5秒一次

        // 步骤1: MySQL CDC源
        MySqlSource<String> source = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db")
            .tableList("test_db.users")
            .username("flink_user")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
        DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC");

        // 步骤2: 数据处理
        DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                JSONObject json = new JSONObject(value);
                JSONObject after = json.optJSONObject("after");
                return after != null ? after.toString() : null;
            }
        }).filter(data -> data != null);

        // 步骤3: Elasticsearch Sink
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("localhost", 9200, "http"));
        ElasticsearchSink<String> sink = new Elasticsearch7SinkBuilder<String>()
            .setHosts(httpHosts)
            .setEmitter((element, context, indexer) -> {
                JSONObject json = new JSONObject(element);
                String id = json.getString("id");
                indexer.add(new IndexRequest("users_index").id(id).source(element, XContentType.JSON));
            })
            .setBulkFlushMaxActions(100)
            .build();
        processedStream.sinkTo(sink);

        env.execute("Real-time Sync from MySQL to ES");
    }
}

4. 测试和验证
  • 启动作业:打包JAR文件,提交到Flink集群:
    ./bin/flink run -c MySQLToES /path/to/your-job.jar
    

  • 验证数据流
    • 在MySQL中操作数据(如INSERT、UPDATE):
      INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com');
      

    • 检查Elasticsearch索引:
      curl -X GET "localhost:9200/users_index/_search?pretty"
      

      应看到实时同步的文档,例如:
      {
        "id": 1,
        "name": "Alice",
        "email": "alice@example.com"
      }
      

  • 监控:使用Flink Web UI(默认8081端口)监控作业状态,确保无失败记录。
5. 注意事项
  • 性能优化
    • 调整Flink并行度:env.setParallelism(4) 根据集群资源设置。
    • 增加检查点间隔减少开销。
    • 在ES中分片索引提升写入吞吐。
  • 错误处理
    • 添加死信队列(Dead Letter Queue)捕获处理失败的数据。
    • 监控binlog位置:Flink CDC自动保存offset,但需确保MySQL binlog保留周期足够长。
  • 常见问题
    • 数据延迟高:检查网络带宽或优化ES批量写入参数(如setBulkFlushMaxActions)。
    • CDC连接失败:确认MySQL用户权限和binlog配置正确。
    • ES写入错误:确保索引映射匹配数据格式(例如,字段类型一致)。
  • 扩展性:支持多表同步,只需在tableList中添加更多表名。
结语

通过本指南,您已掌握使用Flink CDC实现MySQL到Elasticsearch实时同步的核心方法。整个过程高效、可靠,适用于生产环境。关键优势包括毫秒级延迟、exactly-once语义(通过Flink检查点保证)。建议参考Flink官方文档深化理解,并根据业务需求调整数据处理逻辑。遇到问题,欢迎在社区论坛寻求支持!

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐