一个完整的日志收集方案:Elasticsearch + Logstash + Kibana+Filebeat (三)
·
现在我们主要完成AI-RAG服务的扩展,利用ES的向量检索能力完成历史聊天记录的存储和向量检索,让ai聊天有记忆。
通过网盘分享的文件:EC0601.7z
链接: https://pan.baidu.com/s/102TnnSIMUep0kboZXCdR1Q 提取码: sfya
主要做法是在首次聊天完成后将对话内容写出日志到D:\dev\dev2025\EC0601\logs\chat-his.log

写出日志同时嵌入向量

向量可以从ollama的端点:/api/embeddings获取,不同的模型获取到的维度不一样,我的是4096,这个维度很重要,向量检索必须要使用相同的维度去相互匹配,所以要记录这个值,后续创建es模板需要用到。

然后在第二次以及后续的对话中,就可以从es中基于相似度获取聊天记录从而让聊天模型有记忆


这里解释一下:Elasticsearch 的 k-NN
Elasticsearch 的 k-NN(k-Nearest Neighbors,k 最近邻)查询是一种用于执行**向量相似度搜索**的功能。在您的场景中,它被用来根据查询的嵌入向量找到最相似的历史聊天记录。
### 什么是 Elasticsearch k-NN 查询?
Elasticsearch 的 k-NN 查询允许您在 `dense_vector` 字段上执行相似度搜索。它会找到在向量空间中与给定查询向量距离最近的 `k` 个文档。这对于实现语义搜索、推荐系统、重复检测等场景非常有用。
### k-NN 查询的目的
在您的聊天历史记录场景中,k-NN 查询的主要目的是:
* **语义搜索**:通过比较查询的文本嵌入向量与存储在 Elasticsearch 中的聊天记录的 `prompt_embedding` 或 `response_embedding` 向量,找到语义上最相关的历史对话,即使它们不包含完全相同的关键词。
* **上下文检索**:为 AI 模型的响应提供相关的历史上下文,从而生成更准确和连贯的回复(RAG - Retrieval Augmented Generation)。
### Elasticsearch k-NN 查询的关键组件
一个基本的 k-NN 查询通常包含以下几个部分:
1. **`knn` 对象**:这是 k-NN 查询的主体。
2. **`field`**:
* 指定要执行相似度搜索的 `dense_vector` 字段。在您的案例中,这通常是 `prompt_embedding` 或 `response_embedding`。
3. **`query_vector`**:
* 这是您要用来查找相似文档的查询向量。它是一个浮点数数组,其维度必须与目标 `dense_vector` 字段的 `dims`(维度)相匹配。在您的场景中,这就是 `getEmbedding(queryPrompt)` 生成的向量。
4. **`k`**:
* 指定您希望返回的最相似结果的数量。例如,`k: 5` 会返回 5 个最相似的文档。
5. **`num_candidates`**:
* 这是一个性能参数。它指定了 Elasticsearch 在内部需要检查的候选文档的数量,以找到 `k` 个最近邻。通常,`num_candidates` 应该大于 `k`(例如,`k` 的几倍),因为它会影响搜索的准确性和性能。更大的 `num_candidates` 可能会提高准确性,但会增加计算开销。
### 示例 (Kibana Dev Tools)
假设您的 `chat-history-*` 索引中存储了聊天记录的 `response_embedding` 字段,并且它们的维度是 `4096`。如果您想查询与某个特定向量最相似的聊天记录,可以在 Kibana Dev Tools 中这样构建查询:
```
GET chat-history-*/_search
{
"knn": {
"field": "response_embedding",
"query_vector": [
0.1, 0.2, -0.3, ..., 0.05, 0.99, -0.7 // 这里需要是实际的4096维查询向量
],
"k": 5,
"num_candidates": 100
}
}
```
**注意事项:**
* `query_vector` 必须是一个完整的向量,其长度要与 Elasticsearch 映射中 `dense_vector` 字段的 `dims` 参数(在您的情况下是 `4096`)严格一致。
* `num_candidates` 是一个调优参数。根据您的数据量和对准确性/性能的需求进行调整。
您的 Spring Boot 应用程序中的 `AIController.java` 代码正是构建了这样一个 k-NN 查询体,并通过 `restTemplate.postForEntity` 发送给 Elasticsearch。现在的关键是确保应用程序生成的 `query_vector` (由 `getEmbedding(queryPrompt)` 返回)的维度是正确的 `4096`。
这是调试效果:可以看到检索是成功了,本轮对话问题:"你知道我是谁吗"
它已经拿到了上一轮对话的内容:“你好我是伊丽萨白 ”。
当然1.7b它智商不在线,没能把我想要的回答组织起来,这通常需要调整提示语:"基于以下聊天历史记录" 让它更好理解。


接下来就是通过配置实现log写入es
- es:elasticsearch需要手动创建处理管道和模板用用于存储chat-his.log
PUT _ingest/pipeline/chat-history
{
"description": "处理聊天历史数据的管道",
"processors": [
{
"date": {
"field": "@timestamp",
"target_field": "timestamp",
"formats": ["ISO8601"]
}
},
{
"remove": {
"field": "@timestamp"
}
},
{
"set": {
"field": "metadata.processed",
"value": true
}
},
{
"set": {
"field": "metadata.processed_at",
"value": "{{_ingest.timestamp}}"
}
}
]
}
PUT _index_template/chat_history_template
{
"index_patterns": ["chat-history-*"],
"template": {
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "strict_date_optional_time_nanos"
},
"model": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"prompt": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"response": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"prompt_embedding": {
"type": "dense_vector",
"dims": 4096 //注意这个4096,ollama对应模型的向量维度
},
"response_embedding": {
"type": "dense_vector",
"dims": 4096 //注意这个4096,ollama对应模型的向量维度
}
}
}
}
}
- logstash:在原来的基础上添加了if [log_type] == "chat-history" 的输入和输出,这个解析过程一定程度决定于filebeat的输出格式
# Sample Logstash configuration for creating a complete
# Beats -> Logstash -> Elasticsearch pipeline with log parsing
input {
beats {
port => 5044
}
}
# Filter to parse logs using grok and date plugins
filter {
if [log_type] == "springboot-app" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:logger} - %{GREEDYDATA:message}" }
}
}
if [log_type] == "chat-history" {
# Step 1: 使用 grok 去除三引号(""") 包裹
grok {
match => {
"[event][original]" => "^\"\"\"%{GREEDYDATA:json_content}\"\"\""
}
tag_on_failure => ["_grok_chat_history_failure"]
}
# Step 2: 如果提取成功,则尝试解析 json_content
if [json_content] and [json_content] != "" {
json {
source => "json_content"
target => "parsed_chat_history"
remove_field => ["json_content"]
}
}
else {
# 如果没有匹配到三引号,尝试直接解析 event.original 为 JSON
json {
source => "[event][original]"
target => "parsed_chat_history"
remove_field => ["[event][original]"]
}
}
# Step 3: 提升 chat_log_data 中的字段到顶层
if [parsed_chat_history][chat_log_data] {
mutate {
rename => {
"[parsed_chat_history][chat_log_data][timestamp]" => "timestamp"
"[parsed_chat_history][chat_log_data][model]" => "model"
"[parsed_chat_history][chat_log_data][prompt]" => "prompt"
"[parsed_chat_history][chat_log_data][response]" => "response"
"[parsed_chat_history][chat_log_data][prompt_embedding]" => "prompt_embedding"
"[parsed_chat_history][chat_log_data][response_embedding]" => "response_embedding"
}
}
# 删除中间字段
mutate {
remove_field => ["parsed_chat_history"]
}
}
}
}
output {
if [log_type] == "springboot-app" {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "springboot-logs-%{+YYYY.MM.dd}"
ssl_enabled => true
ssl_verification_mode => "full"
ssl_certificate_authorities => ["D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt"]
api_key => "V6VUSpcBUPesLBBNVAlH:O7l1zeyOwQFfy9w5Af_JTA"
}
}
if [log_type] == "chat-history" {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "chat-history-%{+YYYY.MM.dd}" //匹配es模板自动创建按天分片的索引
pipeline => "chat-history" //es管道,对接收的字段做进一步处理,比如日期格式转换等
ssl_enabled => true
ssl_verification_mode => "full"
ssl_certificate_authorities => ["D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt"]
api_key => "V6VUSpcBUPesLBBNVAlH:O7l1zeyOwQFfy9w5Af_JTA"
}
}
stdout {
codec => rubydebug
}
}
具体做法在解析前可以要看这里(event字段内容的原始格式)样式进行解析

- filebeat:这里加入log_type: chat-history的日志收集
filebeat.inputs:
- type: filestream
id: springboot-logs
paths:
- D:/dev/dev2025/EC0601/logs/springboot-ai-rag-demo.log
enabled: true
fields:
log_type: springboot-app
app_name: springboot-ai-rag-demo
fields_under_root: true
multiline:
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
- type: filestream
id: chat-history
paths:
- D:/dev/dev2025/EC0601/logs/chat-his.log
enabled: true
fields:
log_type: chat-history
app_name: springboot-ai-rag-demo
fields_under_root: true
output.logstash:
hosts: ["localhost:5044"]
enabled: true
# 启用 HTTP 状态接口
http.enabled: true
http.port: 5066
logging.level: debug
logging.selectors: ["*"]
- JDK:最后是JDK,应为es需要安全证书(https://localhost:9200/),所以java程序在访问es时,需要把es证书导入证书到jdk目录 :/lib/security/cacerts
- 提示输入密码时,默认密码通常是 changeit。
- 当它问“是否信任此证书? [否]:”时,输入 y 并按回车。
keytool -import -trustcacerts -alias elasticsearch_ca -file "D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt" -keystore "C:/Program Files/Java/jdk-17/lib/security/cacerts"
- springboot项目:需要修改之前log配置,现在通过logback配置将控制台日志和聊天日志分析写入到指定目录
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_PATH" value="D:/dev/dev2025/EC0601/logs"/>
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 主应用日志文件 -->
<appender name="SPRINGBOOT_APP_LOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/springboot-ai-rag-demo.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/springboot-ai-rag-demo.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 聊天历史记录文件 -->
<appender name="CHAT_HISTORY" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/chat-his.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/chat-his.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<!-- 聊天历史记录日志 -->
<logger name="com.example.demo.logging.ChatHistoryLogger" level="INFO" additivity="false">
<appender-ref ref="CHAT_HISTORY"/>
</logger>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="SPRINGBOOT_APP_LOG"/>
</root>
</configuration>
最后贴一下AIController,和ChatHistoryLogger
package com.example.demo.controller;
import com.example.demo.logging.ChatHistoryLogger;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/ai")
public class AIController {
@Value("${ollama.base-url:http://localhost:11434}")
private String ollamaBaseUrl;
@Value("${spring.elasticsearch.rest.uris:https://localhost:9200}")
private String elasticsearchBaseUrl;
private final RestTemplate restTemplate = new RestTemplate();
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private ChatHistoryLogger chatHistoryLogger;
@PostMapping("/chat")
public String chat(@RequestBody Map<String, String> request) {
String prompt = request.get("prompt");
String model = request.getOrDefault("model", "qwen3:1.7b");
// 1. 从 Elasticsearch 获取历史记录 (使用向量检索)
List<String> chatHistory = getChatHistoryFromElasticsearch(prompt);
String historyContext = chatHistory.isEmpty() ? "" : "基于以下聊天历史记录:\n" + String.join("\n", chatHistory) + "\n";
// 2. 构建发送给 Ollama 的完整 prompt
String fullPrompt = historyContext + prompt;
// 调用 Ollama API
Map<String, Object> body = new HashMap<>();
body.put("model", model);
body.put("prompt", fullPrompt);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);
StringBuilder aiResponseBuilder = new StringBuilder();
try {
restTemplate.execute(
ollamaBaseUrl + "/api/generate",
HttpMethod.POST,
req -> {
new ObjectMapper().writeValue(req.getBody(), body);
req.getHeaders().setContentType(MediaType.APPLICATION_JSON);
},
resp -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getBody()))) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty()) {
Map<String, Object> json = objectMapper.readValue(line, Map.class);
Object part = json.get("response");
if (part != null) {
aiResponseBuilder.append(part.toString());
}
}
}
}
return null;
}
);
} catch (Exception e) {
aiResponseBuilder.append("与Ollama交互失败: ").append(e.getMessage());
// Optionally log the full stack trace for debugging
// logger.error("Error interacting with Ollama", e);
}
String aiResponse = aiResponseBuilder.length() > 0 ? aiResponseBuilder.toString() : "无响应";
// 记录聊天历史(包含向量嵌入)
chatHistoryLogger.logChat(model, prompt, aiResponse);
return aiResponse;
}
private List<String> getChatHistoryFromElasticsearch(String queryPrompt) {
List<String> history = new ArrayList<>();
try {
// 1. 生成查询向量
List<Float> queryEmbedding = chatHistoryLogger.getEmbedding(queryPrompt);
if (queryEmbedding.isEmpty()) {
System.err.println("无法为查询生成嵌入,跳过向量检索。");
return history;
}
String searchUrl = elasticsearchBaseUrl + "/chat-history-*/_search";
// 2. 构建 Elasticsearch k-NN 查询体
Map<String, Object> esQueryBody = new HashMap<>();
Map<String, Object> knnMap = new HashMap<>();
knnMap.put("field", "response_embedding"); // 用于向量嵌入的字段
knnMap.put("query_vector", queryEmbedding);
knnMap.put("k", 5); // 返回最相似的 5 个结果
knnMap.put("num_candidates", 10); // 搜索候选数量,通常 k 的几倍
esQueryBody.put("knn", knnMap);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBasicAuth("elastic", "elastic"); // 请替换为您的实际用户名和密码
HttpEntity<Map<String, Object>> entity = new HttpEntity<>(esQueryBody, headers);
ResponseEntity<String> response = restTemplate.postForEntity(searchUrl, entity, String.class);
if (response.getBody() != null) {
Map<String, Object> responseMap = objectMapper.readValue(response.getBody(), Map.class);
List<Map<String, Object>> hits = (List<Map<String, Object>>)((Map<String, Object>) responseMap.get("hits")).get("hits");
for (Map<String, Object> hit : hits) {
Map<String, Object> source = (Map<String, Object>) hit.get("_source");
String historicalPrompt = (String) source.get("prompt");
String historicalResponse = (String) source.get("response");
String role = (String) source.get("role");
String content = (String) source.get("content");
// Use prompt and response directly from the source
if (historicalPrompt != null && historicalResponse != null) {
history.add("用户: " + historicalPrompt + "\nAI: " + historicalResponse);
} else if (role != null && content != null) {
history.add(role + ": " + content);
}
}
}
} catch (Exception e) {
System.err.println("从Elasticsearch获取历史记录失败: " + e.getMessage());
}
return history;
}
}
package com.example.demo.logging;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class ChatHistoryLogger {
private static final Logger logger = LoggerFactory.getLogger(ChatHistoryLogger.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final RestTemplate restTemplate = new RestTemplate();
@Value("${ollama.base-url:http://localhost:11434}")
private String ollamaBaseUrl;
public void logChat(String model, String prompt, String response) {
try {
// 获取向量嵌入
List<Float> promptEmbedding = getEmbedding(prompt);
List<Float> responseEmbedding = getEmbedding(response);
// 构建内部日志条目
Map<String, Object> innerLogEntry = new HashMap<>();
innerLogEntry.put("timestamp", LocalDateTime.now().toString());
innerLogEntry.put("model", model);
innerLogEntry.put("prompt", prompt);
innerLogEntry.put("response", response);
innerLogEntry.put("prompt_embedding", promptEmbedding);
innerLogEntry.put("response_embedding", responseEmbedding);
// 将内部日志条目包装在新的顶级字段中
Map<String, Object> logEntry = new HashMap<>();
logEntry.put("chat_log_data", innerLogEntry);
String jsonLog = objectMapper.writeValueAsString(logEntry);
logger.info(jsonLog);
} catch (Exception e) {
logger.error("Failed to log chat history", e);
}
}
/**使用 Ollama 的 embeddings API 生成向量 */
public List<Float> getEmbedding(String text) {
try {
Map<String, Object> body = new HashMap<>();
body.put("model", "qwen:7b");
body.put("prompt", text);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);
ResponseEntity<Map> response = restTemplate.postForEntity(
ollamaBaseUrl + "/api/embeddings",
entity,
Map.class
);
if (response.getBody() != null && response.getBody().containsKey("embedding")) {
@SuppressWarnings("unchecked")
List<Float> embedding = (List<Float>) response.getBody().get("embedding");
return embedding;
}
} catch (Exception e) {
logger.error("Failed to get embedding", e);
}
return new ArrayList<>();
}
}
下面时工作空间结构,由于文件太大,这里就不贴源码了,后面会上传到csdn,从csdn下载,谢谢大家的支持。

更多推荐


所有评论(0)