kafka+debezium+Elasticsearch
目前项目需要采用elasticsearch进行搜索,简单记录一下mysql+Elasticsearch的同步过程。以上已初步配置了kafka和debezium连接,接下来记录kafka+elasticsearch。
kafka+debezium+Elasticsearch
- kafka+debezium配置
文章目录
前言
目前项目需要采用elasticsearch进行搜索,简单记录一下mysql+Elasticsearch的同步过程。
1. 安装 Kafka 和 Zookeeper
1.1 下载 Kafka
# 下载 Kafka(包含 Zookeeper)
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
1.2 启动 Zookeeper
# 后台启动 Zookeeper 不加-daemon就非后台启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
1.3 启动 Kafka
# 后台启动 Kafka
./bin/kafka-server-start.sh -daemon config/server.properties
2. 配置 MySQL
2.1 启用 Binlog
编辑 MySQL 配置文件(/etc/mysql/my.cnf 或 /etc/my.cnf):
[mysqld]
server-id=11111
log-bin=mysql-bin
binlog_format=ROW # 必须为 ROW 模式
expire_logs_days=7 # 日志保留时间
重启 MySQL:
sudo systemctl restart mysql
2.2 创建 Debezium 用户
登录 MySQL,执行以下 SQL:
CREATE USER 'debezium'@'%' IDENTIFIED BY 'Debezium@123';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
3. 安装 Debezium MySQL Connector
3.1 下载 Connector 插件
# 创建插件目录
mkdir -p kafka/plugins/debezium-mysql
# 下载 Debezium MySQL Connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz
# 解压到当前目录 /kafka/plugins/debezium-mysql
tar -xzf debezium-connector-mysql-2.4.0.Final-plugin.tar.gz -C plugins/debezium-mysql
3.2 配置 Kafka Connect
编辑 config/connect-distributed.properties:
# 指定插件路径
plugin.path=/path/to/kafka/plugins
# 允许自动创建 Topic
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
# 网络配置
rest.port=8083
rest.advertised.host.name=localhost
编辑 config/server.properties:
# 开启自动创建主题
advertised.listeners=PLAINTEXT://localhost:9092
auto.create.topics.enable=true
3.3 启动 Kafka Connect
./bin/connect-distributed.sh -daemon config/connect-distributed.properties
4. 创建 Debezium Connector
4.1 提交 Connector 配置
创建 mysql-connector.json:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "11111",
"database.server.name": "mysql-server",
"database.include.list": "xx1,xx2",
"table.include.list": "xx1.tt1,xx2.tt2",
"snapshot.mode":"initial",
"snapshot.locking.mode": "none",
"schema.history.internal":"io.debezium.storage.kafka.history.KafkaSchemaHistory",
"schema.history.internal.kafka.topic": "schema-changes-mysql",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history,internal.store.only.captured.tables.ddl":"true",
"inconsistent.schema.handing.mode":"warn",
"topic.prefix": "dbserver1"
}
}
4.2 注册 Connector
curl -X POST -H "Content-Type: application/json" --data @mysql-connector.json http://localhost:8083/connectors
4.3 验证 Connector 状态
curl http://localhost:8083/connectors/mysql-connector/status
#通过api查看编辑connector
# 删除连接
curl -X DELETE http://localhost:8083/connectors/mysql-connector
# 重新启动
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
# 查看所有连接器
curl http://localhost:8083/connectors
# 查看连接器配置
curl http://localhost:8083/connectors/mysql-connector/config
# 暂停/恢复连接
curl -X PUT http://localhost:8083/connectors/mysql-connector/pause /resume
5. 测试数据同步
5.1 监听 Kafka Topic
# 查看自动创建的 Topic(格式:<server-name>.<database>.<table>)例 dbserver1.xx1.tt1
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 监听 products 表变更
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.xx1.tt1
5.2 触发 MySQL 数据变更
在 MySQL 中执行插入或更新操作:
INSERT INTO products (name, price) VALUES ('Test Product', 99.99);
5.3 观察 Kafka 输出
如果成功,控制台会显示类似以下消息:
{
"before": null,
"after": { "id": 1, "name": "Test Product", "price": 99.99 },
"op": "c" # 表示创建操作
}
总结
以上已初步配置了kafka和debezium连接,接下来记录kafka+elasticsearch。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)