kafka+debezium+Elasticsearch

  1. kafka+debezium配置


前言

目前项目需要采用elasticsearch进行搜索,简单记录一下mysql+Elasticsearch的同步过程。


1. 安装 Kafka 和 Zookeeper

1.1 下载 Kafka

# 下载 Kafka(包含 Zookeeper)
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1

1.2 启动 Zookeeper

# 后台启动 Zookeeper 不加-daemon就非后台启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

1.3 启动 Kafka

# 后台启动 Kafka
./bin/kafka-server-start.sh -daemon config/server.properties

2. 配置 MySQL

2.1 启用 Binlog

编辑 MySQL 配置文件(/etc/mysql/my.cnf/etc/my.cnf):

[mysqld]
server-id=11111
log-bin=mysql-bin
binlog_format=ROW          # 必须为 ROW 模式
expire_logs_days=7         # 日志保留时间

重启 MySQL:

sudo systemctl restart mysql

2.2 创建 Debezium 用户

登录 MySQL,执行以下 SQL:

CREATE USER 'debezium'@'%' IDENTIFIED BY 'Debezium@123';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

3. 安装 Debezium MySQL Connector

3.1 下载 Connector 插件

# 创建插件目录
mkdir -p kafka/plugins/debezium-mysql

# 下载 Debezium MySQL Connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz
# 解压到当前目录 /kafka/plugins/debezium-mysql
tar -xzf debezium-connector-mysql-2.4.0.Final-plugin.tar.gz  -C plugins/debezium-mysql

3.2 配置 Kafka Connect

编辑 config/connect-distributed.properties

# 指定插件路径
plugin.path=/path/to/kafka/plugins

# 允许自动创建 Topic
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

# 网络配置
rest.port=8083
rest.advertised.host.name=localhost

编辑 config/server.properties

# 开启自动创建主题
advertised.listeners=PLAINTEXT://localhost:9092
auto.create.topics.enable=true

3.3 启动 Kafka Connect

./bin/connect-distributed.sh -daemon config/connect-distributed.properties

4. 创建 Debezium Connector

4.1 提交 Connector 配置

创建 mysql-connector.json

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "127.0.0.1",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "11111",
    "database.server.name": "mysql-server",
    "database.include.list": "xx1,xx2",
    "table.include.list": "xx1.tt1,xx2.tt2",
    "snapshot.mode":"initial",
    "snapshot.locking.mode": "none",
    "schema.history.internal":"io.debezium.storage.kafka.history.KafkaSchemaHistory",
    "schema.history.internal.kafka.topic": "schema-changes-mysql",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "schema.history,internal.store.only.captured.tables.ddl":"true",
    "inconsistent.schema.handing.mode":"warn",
    "topic.prefix": "dbserver1"
    }
}

4.2 注册 Connector

curl -X POST -H "Content-Type: application/json" --data @mysql-connector.json http://localhost:8083/connectors

4.3 验证 Connector 状态

curl http://localhost:8083/connectors/mysql-connector/status

#通过api查看编辑connector
# 删除连接
curl -X DELETE http://localhost:8083/connectors/mysql-connector
# 重新启动
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
# 查看所有连接器
curl http://localhost:8083/connectors
# 查看连接器配置
curl http://localhost:8083/connectors/mysql-connector/config 
# 暂停/恢复连接
curl -X PUT http://localhost:8083/connectors/mysql-connector/pause /resume

5. 测试数据同步

5.1 监听 Kafka Topic

# 查看自动创建的 Topic(格式:<server-name>.<database>.<table>)例 dbserver1.xx1.tt1
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 监听 products 表变更
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.xx1.tt1

5.2 触发 MySQL 数据变更

在 MySQL 中执行插入或更新操作:

INSERT INTO products (name, price) VALUES ('Test Product', 99.99);

5.3 观察 Kafka 输出

如果成功,控制台会显示类似以下消息:

{
  "before": null,
  "after": { "id": 1, "name": "Test Product", "price": 99.99 },
  "op": "c"  # 表示创建操作
}

总结

以上已初步配置了kafka和debezium连接,接下来记录kafka+elasticsearch。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐