版本:

canal:1.1.7

rocketMQ:4.9.4

Mysql:8.0

ElasticSearch:9.0.3

springboot:2.7.5

首先启动rocketmq

环境变量

 

cd /d D:\rocketmq-all-4.9.4-bin-release\bin

start mqnamesrv.cmd

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

如果出现jvm问题,是因为rocketmq和java版本冲突,可以把java降至Java8

可以配置新环境变量

然后在runbroker、runserver中修改java_home

启动mysql
mysql -u 用户名 -p;

启动canal
    配置canal
        conf/canal.properties
canal.destinations = example

rocketmq.producer.group = 消费者组名
        conf/example/instance.properties
#如果没有就加入
canal.instance.mysql.slaveId=1234

canal.instance.dbUsername=你的用户名
canal.instance.dbPassword=你的密码
#可以去mysql注册一个专门供canal使用的用户

#设置监听表,也可以默认监听所有表,不做修改
canal.instance.filter.regex=表名\\..*

#设置topic
canal.mq.topic=你的topic
启动bin/startup.bat

去logs/example/example.log查看是否报错,启动成功会有日志

启动ElasticSearch和Kibana

自行启动

springboot集成
        pom文件
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!--canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

         <!--rocketmq-springboot启动类-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>

        <!--rocketmq客户端-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>

        <!--远程调用依赖,自己调试时报错加上的-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-remoting</artifactId>
            <version>4.9.4</version>
        </dependency>

        <!--es-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        配置文件(properties)

用的yml文件,自行更替即可

rocketmq配置和es配置,canal没有配置

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=community_producer
rocketmq.producer.topic=example
rocketmq.producer.send-message-timeout=2000
rocketmq.producer.retry-times-when-send-failed=1
rocketmq.producer.retry-times-when-send-async-failed=1
rocketmq.consumer.group=community_consumer

spring.elasticsearch.uris=http://localhost:9200
spring.data.elasticsearch.repositories.enabled=true
model
mysqlModel
@Data
@RequiredArgsConstructor
public class Question{
    private Long id;

    private String title;

    private String description;

    private String tag;

    private Long createTime;

    private Long updateTime;

    private Long creatorId;

    private Integer readingNum;

    private Integer commentNum;

    private Long likeNum;

}
esModel,修改索引为自己的索引名 
@Data
@RequiredArgsConstructor
@Document(indexName = "questions")
public class QuestionEs implements Serializable {
    @Id
    private Long id;

    private String title;

    private String description;

    private String tag;

    @Field(type = FieldType.Date)
    private Long createTime;

    @Field(type = FieldType.Date)
    private Long updateTime;

    private Long creatorId;

    private Integer readingNum;

    private Integer commentNum;

    private Long likeNum;

}
canalModel
@Data
@RequiredArgsConstructor
public class CanalDTO implements Serializable {
    private List<Question> data;
    private String database;
    private Long es;

    private JSONArray old;
    private String table;
    private Long ts;

    private String type;

}
repository
@Repository
public interface EsRepository extends ElasticsearchRepository<QuestionEs, String> {
}
监听并同步方法

saveToEs做了异常处理,是因为高版本的es会出现返回结果无法解析的问题,虽然更新成功了但一直报错导致消息无法成功消费(实际上已经消费了),这里取巧不让它报错。

@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer.group}",
        topic = "${rocketmq.producer.topic}"
        )
public class MqSyncMsgConsumer implements RocketMQListener<String> {
    private final Logger logger = LoggerFactory.getLogger("es_sync_logger");

    @Autowired
    private EsRepository esRepository;

    @Override
    public void onMessage(String msg) {
        System.out.println("接收到消息 -> " + msg);
        CanalDTO canalDTO = JSON.parseObject(msg, CanalDTO.class);

        String table = canalDTO.getTable();

        String type = canalDTO.getType();
        System.out.println(type);

        List<Question> data = canalDTO.getData();
        if(data.isEmpty()){
            throw new RuntimeException("canal监听到消息,但为空");
        }
        for (Question question : data) {
            QuestionEs questionEs = new QuestionEs();
            if ("UPDATE".equals(type) && "question".equals(table)){
                Optional<QuestionEs> esOptional = esRepository.findById(question.getId().toString());
                if (esOptional.isPresent()) {
                    QuestionEs esTmp = esOptional.get();
                    saveToEs(question,esTmp);
                } else {
                    saveToEs(question, questionEs);
                }
                logger.info("id = {} 更新es成功", questionEs.getId());
            }else if("INSERT".equals(type) && "question".equals(table)){
                saveToEs(question, questionEs);
            }
        }
    }

    private void saveToEs(Question question, QuestionEs questionEs) {
        BeanUtils.copyProperties(question, questionEs);
        try {
            esRepository.save(questionEs);
        }catch (Exception e){
            String err = e.getMessage();
            if (!err.contains("200 OK")){
                logger.error("es操作文档失败,异常信息:", e);
                throw new RuntimeException(e);
            }
        }
    }
}
结果

测试一下update

控制台 

 

kibana控制台

 流程较长,如有纰漏敬请指出。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐