logback–进阶–06–logback日志输出到kafka


1、需求

将logback日志输出到kafka,kafka将日志放到elasticSearch,通过kibana查看日志

2、操作步骤

2.1、引入依赖

   <!-- 日志输出到kafka begin-->
        <!-- 这是核心依赖,提供了将 Logback 日志发送到 Kafka 的功能。它实现了 Logback 的 Appender 接口,使日志系统能够与 Kafka 集成。-->
       <dependency>
           <groupId>com.github.danielwegener</groupId>
           <artifactId>logback-kafka-appender</artifactId>
           <version>0.2.0-RC2</version>
       </dependency>

       <!-- JSON 序列化支持 -->
       <!-- 这是 Logback 日志框架的核心组件,提供了日志系统的基础功能。在 Spring Boot 应用中,通常已经包含这个依赖。-->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>

        <!-- 这个依赖提供了将日志格式化为 JSON 格式的功能,特别适合与 ELK 栈(Elasticsearch, Logstash, Kibana)集成。-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>7.4</version>
        </dependency>

        <!-- Kafka 客户端 提供了与 Kafka 集群通信的 API。logback-kafka-appender 内部使用这个客户端库来发送消息到 Kafka。如果没有这个依赖,logback-kafka-appender 将无法正常工作。-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <!-- 日志输出到kafka end-->
        

2.2、新增Kafka Appender 文件

kafka-appender.xml

<!-- kafka-appender.xml -->
<included>
    <!-- 在included文件中定义springProperty -->
    <springProperty scope="context" name="service" source="spring.application.name" defaultValue="UnknownService"/>
    <springProperty scope="context" name="topic" source="kafka.logback.topic" defaultValue="microsvclog-UnknownService"/>
    <springProperty scope="context" name="env" source="kafka.logback.env" defaultValue="test"/>
    <springProperty scope="context" name="bootstrapServers" source="kafka.logback.serverUrl" defaultValue="localhost:9092"/>

    <!-- 添加这行来输出Kafka Appender的内部日志 -->
    <logger name="com.github.danielwegener.logback.kafka" level="DEBUG" />
    <!-- Kafka日志追加器配置 - 将日志事件发送到Kafka集群 -->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <!-- 使用LogstashEncoder将日志格式化为JSON格式 -->
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">
                <!-- 自定义JSON格式模式 -->
                <pattern>
                    <pattern>
                        {
                        "env": "${env}",                                                <!-- 环境变量,如dev、test、prod -->
                        "service":"${service}",                                         <!-- 服务名称 -->
                        "level":"%level",                                               <!-- 日志级别 -->
                        "thread": "%thread",                                            <!-- 线程名称 -->
                        "logger": "%logger{36}",                                        <!-- 日志记录器名称,最多36个字符 -->
                        "exception":"%exception",                                       <!-- 异常堆栈信息(如果存在) -->
                        "timestamp": "%date{yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'}",          <!-- 日期时间格式 -->
                        "message":"%msg"                                                <!-- 日志消息内容 -->
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>

        <!-- Kafka主题名称,使用服务名称作为主题后缀 -->
        <topic>${topic}</topic>

        <!-- 键策略:不使用键(所有消息随机分布到分区) -->
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>

        <!-- 传递策略:异步发送日志到Kafka(不阻塞应用程序) -->
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>


        <!-- Kafka生产者配置 -->
        <producerConfig>acks=0</producerConfig>                       <!-- 确认级别:0表示不等待确认(最快但最不可靠) -->
        <producerConfig>linger.ms=1000</producerConfig>               <!-- 延迟发送时间:批量发送以提高吞吐量(1秒) -->
        <producerConfig>max.block.ms=0</producerConfig>               <!-- 阻塞时间:0表示不阻塞,立即返回 -->
        <producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig> <!-- Kafka服务器地址列表 -->
    </appender>
</included>

2.3、配置文件修改

2.3.1、新增application-kafka.yml

kafka:
  # logback 配置
  logback:
    # Kafka主题名称,使用服务名称作为主题后缀
    topic: microsvclog-${spring.application.name}
    # 环境
    env: dev
    # 应用名称
    applicationName: ${spring.application.name}
    # kafka 服务地址
    serverUrl: 1.1.1.1:9092

2.3.2、配置激活文件

spring:
  application:
    name: test
  # 激活配置
  profiles:
    active: kafka

2.4、logback-spring.xml引入Kafka Appender配置

<?xml version="1.0" encoding="UTF-8" ?>
<configuration scan="true" scanPeriod="60 seconds" debug="true">


    <!-- 引入Kafka Appender配置 -->
    <include resource="logback/kafka-appender.xml"/>
 
    <!-- 日志输出级别 -->
    <root level="INFO"> 
        <appender-ref ref="kafkaAppender"/>
    </root>


    <logger name="fei.zhou.common_function.business" level="info" additivity="false"> 
        <appender-ref ref="kafkaAppender"/>
    </logger>


        
</configuration>


3、测试范围

  • 日志平台要有日志
  • kafka挂了对应用没有影响

4、测试

有以下日志说明连接kafka是成功

2025-08-14 06:33:03.667 [main-] INFO -347 o.a.k.c.producer.ProducerConfig - ProducerConfig values:  
	acks = 0
	batch.size = 16384
	bootstrap.servers = [1.1.1.1:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-2
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 1000
	max.block.ms = 0
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2025-08-14 06:33:03.853 [main-] INFO -117 o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
2025-08-14 06:33:03.854 [main-] INFO -118 o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 111
2025-08-14 06:33:03.854 [main-] INFO -119 o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1
2025-08-14 06:33:05.251 [kafka-producer-network-thread | producer-2-] INFO -280 org.apache.kafka.clients.Metadata - [Producer clientId=producer-2] Cluster ID: J62353er740irw

5、特殊问题的处理

5.1、加载logback-spring.xml动作在读取配置中心(nacos)动作之前

如果项目启动后,加载logback-spring.xml动作在读取配置中心(nacos)动作之前,会出现以下错误

25-08-15.07:33:33.096 [kafka-producer-network-thread | producer-2] INFO  NetworkClient          - [Producer clientId=producer-2] Node -1 disconnected.
25-08-15.07:33:33.096 [kafka-producer-network-thread | producer-2] WARN  NetworkClient          - [Producer clientId=producer-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
25-08-15.07:33:33.097 [main            ] INFO  ParamUtil              - PER_TASK_CONFIG_SIZE: 3000.0
25-08-15.07:33:33.097 [kafka-producer-network-thread | producer-2] WARN  NetworkClient          - [Producer clientId=producer-2] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

那么需要配置一个文件,内容如下

import ch.qos.logback.classic.LoggerContext;
import com.github.danielwegener.logback.kafka.KafkaAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * 解决logback 先启动,没有读取到nacos配置的情况
 * 这个类,会在项目成功启动后(项目已经读取nacos配置),重新读取nacos的kafka配置
 *
 * @author <a href="920786312@qq.com">周飞</a>
 * @since 2025/8/15 07:35
 */
@Component
@RefreshScope
public class KafkaLogbackConfig implements ApplicationListener<ApplicationReadyEvent> {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${kafka.logback.serverUrl}")
    private String kafkaServerUrl;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        updateKafkaConfig();
    }

    public void updateKafkaConfig() {
        try {
            LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
            KafkaAppender kafkaAppender = (KafkaAppender) loggerContext.getLogger("ROOT").getAppender("kafkaAppender");

            if (kafkaAppender != null) {
                kafkaAppender.stop();
                kafkaAppender.addProducerConfig("bootstrap.servers=" + kafkaServerUrl);
                kafkaAppender.start();
                logger.info("Kafka配置已更新: " + kafkaServerUrl);
            }
        } catch (Exception e) {
            logger.info("更新Kafka配置失败: " + e.getMessage());
        }
    }
}
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐