logback--进阶--06--logback日志输出到kafka
将logback日志输出到kafka,kafka将日志放到elasticSearch,通过kibana查看日志。
·
logback–进阶–06–logback日志输出到kafka
1、需求
将logback日志输出到kafka,kafka将日志放到elasticSearch,通过kibana查看日志
2、操作步骤
2.1、引入依赖
<!-- 日志输出到kafka begin-->
<!-- 这是核心依赖,提供了将 Logback 日志发送到 Kafka 的功能。它实现了 Logback 的 Appender 接口,使日志系统能够与 Kafka 集成。-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>
<!-- JSON 序列化支持 -->
<!-- 这是 Logback 日志框架的核心组件,提供了日志系统的基础功能。在 Spring Boot 应用中,通常已经包含这个依赖。-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- 这个依赖提供了将日志格式化为 JSON 格式的功能,特别适合与 ELK 栈(Elasticsearch, Logstash, Kibana)集成。-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.4</version>
</dependency>
<!-- Kafka 客户端 提供了与 Kafka 集群通信的 API。logback-kafka-appender 内部使用这个客户端库来发送消息到 Kafka。如果没有这个依赖,logback-kafka-appender 将无法正常工作。-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<!-- 日志输出到kafka end-->
2.2、新增Kafka Appender 文件
kafka-appender.xml
<!-- kafka-appender.xml -->
<included>
<!-- 在included文件中定义springProperty -->
<springProperty scope="context" name="service" source="spring.application.name" defaultValue="UnknownService"/>
<springProperty scope="context" name="topic" source="kafka.logback.topic" defaultValue="microsvclog-UnknownService"/>
<springProperty scope="context" name="env" source="kafka.logback.env" defaultValue="test"/>
<springProperty scope="context" name="bootstrapServers" source="kafka.logback.serverUrl" defaultValue="localhost:9092"/>
<!-- 添加这行来输出Kafka Appender的内部日志 -->
<logger name="com.github.danielwegener.logback.kafka" level="DEBUG" />
<!-- Kafka日志追加器配置 - 将日志事件发送到Kafka集群 -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!-- 使用LogstashEncoder将日志格式化为JSON格式 -->
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">
<!-- 自定义JSON格式模式 -->
<pattern>
<pattern>
{
"env": "${env}", <!-- 环境变量,如dev、test、prod -->
"service":"${service}", <!-- 服务名称 -->
"level":"%level", <!-- 日志级别 -->
"thread": "%thread", <!-- 线程名称 -->
"logger": "%logger{36}", <!-- 日志记录器名称,最多36个字符 -->
"exception":"%exception", <!-- 异常堆栈信息(如果存在) -->
"timestamp": "%date{yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'}", <!-- 日期时间格式 -->
"message":"%msg" <!-- 日志消息内容 -->
}
</pattern>
</pattern>
</providers>
</encoder>
<!-- Kafka主题名称,使用服务名称作为主题后缀 -->
<topic>${topic}</topic>
<!-- 键策略:不使用键(所有消息随机分布到分区) -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<!-- 传递策略:异步发送日志到Kafka(不阻塞应用程序) -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<!-- Kafka生产者配置 -->
<producerConfig>acks=0</producerConfig> <!-- 确认级别:0表示不等待确认(最快但最不可靠) -->
<producerConfig>linger.ms=1000</producerConfig> <!-- 延迟发送时间:批量发送以提高吞吐量(1秒) -->
<producerConfig>max.block.ms=0</producerConfig> <!-- 阻塞时间:0表示不阻塞,立即返回 -->
<producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig> <!-- Kafka服务器地址列表 -->
</appender>
</included>
2.3、配置文件修改
2.3.1、新增application-kafka.yml
kafka:
# logback 配置
logback:
# Kafka主题名称,使用服务名称作为主题后缀
topic: microsvclog-${spring.application.name}
# 环境
env: dev
# 应用名称
applicationName: ${spring.application.name}
# kafka 服务地址
serverUrl: 1.1.1.1:9092
2.3.2、配置激活文件
spring:
application:
name: test
# 激活配置
profiles:
active: kafka
2.4、logback-spring.xml引入Kafka Appender配置
<?xml version="1.0" encoding="UTF-8" ?>
<configuration scan="true" scanPeriod="60 seconds" debug="true">
<!-- 引入Kafka Appender配置 -->
<include resource="logback/kafka-appender.xml"/>
<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="kafkaAppender"/>
</root>
<logger name="fei.zhou.common_function.business" level="info" additivity="false">
<appender-ref ref="kafkaAppender"/>
</logger>
</configuration>
3、测试范围
- 日志平台要有日志
- kafka挂了对应用没有影响
4、测试
有以下日志说明连接kafka是成功
2025-08-14 06:33:03.667 [main-] INFO -347 o.a.k.c.producer.ProducerConfig - ProducerConfig values:
acks = 0
batch.size = 16384
bootstrap.servers = [1.1.1.1:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-2
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 1000
max.block.ms = 0
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2025-08-14 06:33:03.853 [main-] INFO -117 o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
2025-08-14 06:33:03.854 [main-] INFO -118 o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 111
2025-08-14 06:33:03.854 [main-] INFO -119 o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1
2025-08-14 06:33:05.251 [kafka-producer-network-thread | producer-2-] INFO -280 org.apache.kafka.clients.Metadata - [Producer clientId=producer-2] Cluster ID: J62353er740irw
5、特殊问题的处理
5.1、加载logback-spring.xml动作在读取配置中心(nacos)动作之前
如果项目启动后,加载logback-spring.xml动作在读取配置中心(nacos)动作之前,会出现以下错误
25-08-15.07:33:33.096 [kafka-producer-network-thread | producer-2] INFO NetworkClient - [Producer clientId=producer-2] Node -1 disconnected.
25-08-15.07:33:33.096 [kafka-producer-network-thread | producer-2] WARN NetworkClient - [Producer clientId=producer-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
25-08-15.07:33:33.097 [main ] INFO ParamUtil - PER_TASK_CONFIG_SIZE: 3000.0
25-08-15.07:33:33.097 [kafka-producer-network-thread | producer-2] WARN NetworkClient - [Producer clientId=producer-2] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
那么需要配置一个文件,内容如下
import ch.qos.logback.classic.LoggerContext;
import com.github.danielwegener.logback.kafka.KafkaAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 解决logback 先启动,没有读取到nacos配置的情况
* 这个类,会在项目成功启动后(项目已经读取nacos配置),重新读取nacos的kafka配置
*
* @author <a href="920786312@qq.com">周飞</a>
* @since 2025/8/15 07:35
*/
@Component
@RefreshScope
public class KafkaLogbackConfig implements ApplicationListener<ApplicationReadyEvent> {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Value("${kafka.logback.serverUrl}")
private String kafkaServerUrl;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
updateKafkaConfig();
}
public void updateKafkaConfig() {
try {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
KafkaAppender kafkaAppender = (KafkaAppender) loggerContext.getLogger("ROOT").getAppender("kafkaAppender");
if (kafkaAppender != null) {
kafkaAppender.stop();
kafkaAppender.addProducerConfig("bootstrap.servers=" + kafkaServerUrl);
kafkaAppender.start();
logger.info("Kafka配置已更新: " + kafkaServerUrl);
}
} catch (Exception e) {
logger.info("更新Kafka配置失败: " + e.getMessage());
}
}
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)