现在我们主要完成AI-RAG服务的扩展,利用ES的向量检索能力完成历史聊天记录的存储和向量检索,让ai聊天有记忆。

通过网盘分享的文件:EC0601.7z
链接: https://pan.baidu.com/s/102TnnSIMUep0kboZXCdR1Q 提取码: sfya 

主要做法是在首次聊天完成后将对话内容写出日志到D:\dev\dev2025\EC0601\logs\chat-his.log

写出日志同时嵌入向量

向量可以从ollama的端点:/api/embeddings获取,不同的模型获取到的维度不一样,我的是4096,这个维度很重要,向量检索必须要使用相同的维度去相互匹配,所以要记录这个值,后续创建es模板需要用到。

然后在第二次以及后续的对话中,就可以从es中基于相似度获取聊天记录从而让聊天模型有记忆

这里解释一下:Elasticsearch 的 k-NN

Elasticsearch 的 k-NN(k-Nearest Neighbors,k 最近邻)查询是一种用于执行**向量相似度搜索**的功能。在您的场景中,它被用来根据查询的嵌入向量找到最相似的历史聊天记录。

### 什么是 Elasticsearch k-NN 查询?

Elasticsearch 的 k-NN 查询允许您在 `dense_vector` 字段上执行相似度搜索。它会找到在向量空间中与给定查询向量距离最近的 `k` 个文档。这对于实现语义搜索、推荐系统、重复检测等场景非常有用。

### k-NN 查询的目的

在您的聊天历史记录场景中,k-NN 查询的主要目的是:

*   **语义搜索**:通过比较查询的文本嵌入向量与存储在 Elasticsearch 中的聊天记录的 `prompt_embedding` 或 `response_embedding` 向量,找到语义上最相关的历史对话,即使它们不包含完全相同的关键词。
*   **上下文检索**:为 AI 模型的响应提供相关的历史上下文,从而生成更准确和连贯的回复(RAG - Retrieval Augmented Generation)。

### Elasticsearch k-NN 查询的关键组件

一个基本的 k-NN 查询通常包含以下几个部分:

1.  **`knn` 对象**:这是 k-NN 查询的主体。
2.  **`field`**:
    *   指定要执行相似度搜索的 `dense_vector` 字段。在您的案例中,这通常是 `prompt_embedding` 或 `response_embedding`。
3.  **`query_vector`**:
    *   这是您要用来查找相似文档的查询向量。它是一个浮点数数组,其维度必须与目标 `dense_vector` 字段的 `dims`(维度)相匹配。在您的场景中,这就是 `getEmbedding(queryPrompt)` 生成的向量。
4.  **`k`**:
    *   指定您希望返回的最相似结果的数量。例如,`k: 5` 会返回 5 个最相似的文档。
5.  **`num_candidates`**:
    *   这是一个性能参数。它指定了 Elasticsearch 在内部需要检查的候选文档的数量,以找到 `k` 个最近邻。通常,`num_candidates` 应该大于 `k`(例如,`k` 的几倍),因为它会影响搜索的准确性和性能。更大的 `num_candidates` 可能会提高准确性,但会增加计算开销。

### 示例 (Kibana Dev Tools)

假设您的 `chat-history-*` 索引中存储了聊天记录的 `response_embedding` 字段,并且它们的维度是 `4096`。如果您想查询与某个特定向量最相似的聊天记录,可以在 Kibana Dev Tools 中这样构建查询:

```
GET chat-history-*/_search
{
  "knn": {
    "field": "response_embedding",
    "query_vector": [
      0.1, 0.2, -0.3, ..., 0.05, 0.99, -0.7 // 这里需要是实际的4096维查询向量
    ],
    "k": 5,
    "num_candidates": 100
  }
}
```

**注意事项:**

*   `query_vector` 必须是一个完整的向量,其长度要与 Elasticsearch 映射中 `dense_vector` 字段的 `dims` 参数(在您的情况下是 `4096`)严格一致。
*   `num_candidates` 是一个调优参数。根据您的数据量和对准确性/性能的需求进行调整。

您的 Spring Boot 应用程序中的 `AIController.java` 代码正是构建了这样一个 k-NN 查询体,并通过 `restTemplate.postForEntity` 发送给 Elasticsearch。现在的关键是确保应用程序生成的 `query_vector` (由 `getEmbedding(queryPrompt)` 返回)的维度是正确的 `4096`。

这是调试效果:可以看到检索是成功了,本轮对话问题:"你知道我是谁吗"
它已经拿到了上一轮对话的内容:“你好我是伊丽萨白 ”。

当然1.7b它智商不在线,没能把我想要的回答组织起来,这通常需要调整提示语:"基于以下聊天历史记录" 让它更好理解。

接下来就是通过配置实现log写入es

  • es:elasticsearch需要手动创建处理管道和模板用用于存储chat-his.log
PUT _ingest/pipeline/chat-history
{
  "description": "处理聊天历史数据的管道",
  "processors": [
    {
      "date": {
        "field": "@timestamp",
        "target_field": "timestamp",
        "formats": ["ISO8601"]
      }
    },
    {
      "remove": {
        "field": "@timestamp"
      }
    },
    {
      "set": {
        "field": "metadata.processed",
        "value": true
      }
    },
    {
      "set": {
        "field": "metadata.processed_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}
PUT _index_template/chat_history_template
{
  "index_patterns": ["chat-history-*"],
  "template": {
    "mappings": {
      "properties": {
        "timestamp": {
          "type": "date",
          "format": "strict_date_optional_time_nanos"
        },
        "model": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "prompt": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "response": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "prompt_embedding": {
          "type": "dense_vector",
          "dims": 4096 //注意这个4096,ollama对应模型的向量维度
        },
        "response_embedding": {
          "type": "dense_vector",
          "dims": 4096  //注意这个4096,ollama对应模型的向量维度
        }
      }
    }
  }
}
  • logstash:在原来的基础上添加了if [log_type] == "chat-history" 的输入和输出,这个解析过程一定程度决定于filebeat的输出格式
# Sample Logstash configuration for creating a complete
# Beats -> Logstash -> Elasticsearch pipeline with log parsing

input {
  beats {
    port => 5044
  }
}

# Filter to parse logs using grok and date plugins
filter {
  if [log_type] == "springboot-app" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:logger} - %{GREEDYDATA:message}" }
    }
  }

  if [log_type] == "chat-history" {
    # Step 1: 使用 grok 去除三引号(""") 包裹
    grok {
      match => {
        "[event][original]" => "^\"\"\"%{GREEDYDATA:json_content}\"\"\""
      }
      tag_on_failure => ["_grok_chat_history_failure"]
    }

    # Step 2: 如果提取成功,则尝试解析 json_content
    if [json_content] and [json_content] != "" {
      json {
        source => "json_content"
        target => "parsed_chat_history"
        remove_field => ["json_content"]
      }
    }
    else {
      # 如果没有匹配到三引号,尝试直接解析 event.original 为 JSON
      json {
        source => "[event][original]"
        target => "parsed_chat_history"
        remove_field => ["[event][original]"]
      }
    }

    # Step 3: 提升 chat_log_data 中的字段到顶层
    if [parsed_chat_history][chat_log_data] {
      mutate {
        rename => {
          "[parsed_chat_history][chat_log_data][timestamp]" => "timestamp"
          "[parsed_chat_history][chat_log_data][model]" => "model"
          "[parsed_chat_history][chat_log_data][prompt]" => "prompt"
          "[parsed_chat_history][chat_log_data][response]" => "response"
          "[parsed_chat_history][chat_log_data][prompt_embedding]" => "prompt_embedding"
          "[parsed_chat_history][chat_log_data][response_embedding]" => "response_embedding"
        }
      }

      # 删除中间字段
      mutate {
        remove_field => ["parsed_chat_history"]
      }
    }

  }


}

output {
  if [log_type] == "springboot-app" {
    elasticsearch {
      hosts => ["https://localhost:9200"]
      index => "springboot-logs-%{+YYYY.MM.dd}"
      ssl_enabled => true
      ssl_verification_mode => "full"
      ssl_certificate_authorities => ["D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt"]
      api_key => "V6VUSpcBUPesLBBNVAlH:O7l1zeyOwQFfy9w5Af_JTA"
    }
  }

  if [log_type] == "chat-history" {
    elasticsearch {
      hosts => ["https://localhost:9200"]
      index => "chat-history-%{+YYYY.MM.dd}" //匹配es模板自动创建按天分片的索引
      pipeline => "chat-history" //es管道,对接收的字段做进一步处理,比如日期格式转换等
      ssl_enabled => true
      ssl_verification_mode => "full"
      ssl_certificate_authorities => ["D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt"]
      api_key => "V6VUSpcBUPesLBBNVAlH:O7l1zeyOwQFfy9w5Af_JTA"
    }
  }

  stdout {
    codec => rubydebug
  }
}

具体做法在解析前可以要看这里(event字段内容的原始格式)样式进行解析

  • filebeat:这里加入log_type: chat-history的日志收集
filebeat.inputs:
  - type: filestream
    id: springboot-logs
    paths:
      - D:/dev/dev2025/EC0601/logs/springboot-ai-rag-demo.log
    enabled: true
    fields:
      log_type: springboot-app
      app_name: springboot-ai-rag-demo
    fields_under_root: true
    multiline:
      pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
      negate: true
      match: after

  - type: filestream
    id: chat-history
    paths:
      - D:/dev/dev2025/EC0601/logs/chat-his.log
    enabled: true
    fields:
      log_type: chat-history
      app_name: springboot-ai-rag-demo
    fields_under_root: true

output.logstash:
  hosts: ["localhost:5044"]
  enabled: true

# 启用 HTTP 状态接口
http.enabled: true
http.port: 5066
logging.level: debug
logging.selectors: ["*"]
  • JDK:最后是JDK,应为es需要安全证书(https://localhost:9200/),所以java程序在访问es时,需要把es证书导入证书到jdk目录  :/lib/security/cacerts
     
  • 提示输入密码时,默认密码通常是 changeit。
  • 当它问“是否信任此证书? [否]:”时,输入 y 并按回车。
keytool -import -trustcacerts -alias elasticsearch_ca -file "D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt" -keystore "C:/Program Files/Java/jdk-17/lib/security/cacerts"
  • springboot项目:需要修改之前log配置,现在通过logback配置将控制台日志和聊天日志分析写入到指定目录
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_PATH" value="D:/dev/dev2025/EC0601/logs"/>
    
    <!-- 控制台输出 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 主应用日志文件 -->
    <appender name="SPRINGBOOT_APP_LOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/springboot-ai-rag-demo.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/springboot-ai-rag-demo.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 聊天历史记录文件 -->
    <appender name="CHAT_HISTORY" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/chat-his.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/chat-his.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 聊天历史记录日志 -->
    <logger name="com.example.demo.logging.ChatHistoryLogger" level="INFO" additivity="false">
        <appender-ref ref="CHAT_HISTORY"/>
    </logger>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="SPRINGBOOT_APP_LOG"/>
    </root>
</configuration> 

最后贴一下AIController,和ChatHistoryLogger

package com.example.demo.controller;

import com.example.demo.logging.ChatHistoryLogger;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/api/ai")
public class AIController {

    @Value("${ollama.base-url:http://localhost:11434}")
    private String ollamaBaseUrl;

    @Value("${spring.elasticsearch.rest.uris:https://localhost:9200}")
    private String elasticsearchBaseUrl;

    private final RestTemplate restTemplate = new RestTemplate();
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Autowired
    private ChatHistoryLogger chatHistoryLogger;

    @PostMapping("/chat")
    public String chat(@RequestBody Map<String, String> request) {
        String prompt = request.get("prompt");
        String model = request.getOrDefault("model", "qwen3:1.7b");
        
        // 1. 从 Elasticsearch 获取历史记录 (使用向量检索)
        List<String> chatHistory = getChatHistoryFromElasticsearch(prompt);
        String historyContext = chatHistory.isEmpty() ? "" : "基于以下聊天历史记录:\n" + String.join("\n", chatHistory) + "\n";

        // 2. 构建发送给 Ollama 的完整 prompt
        String fullPrompt = historyContext + prompt;

        // 调用 Ollama API
        Map<String, Object> body = new HashMap<>();
        body.put("model", model);
        body.put("prompt", fullPrompt);
        
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);
        
        StringBuilder aiResponseBuilder = new StringBuilder();
        try {
            restTemplate.execute(
                ollamaBaseUrl + "/api/generate",
                HttpMethod.POST,
                req -> {
                    new ObjectMapper().writeValue(req.getBody(), body);
                    req.getHeaders().setContentType(MediaType.APPLICATION_JSON);
                },
                resp -> {
                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getBody()))) {
                        String line;
                        while ((line = reader.readLine()) != null) {
                            if (!line.trim().isEmpty()) {
                                Map<String, Object> json = objectMapper.readValue(line, Map.class);
                                Object part = json.get("response");
                                if (part != null) {
                                    aiResponseBuilder.append(part.toString());
                                }
                            }
                        }
                    }
                    return null;
                }
            );
        } catch (Exception e) {
            aiResponseBuilder.append("与Ollama交互失败: ").append(e.getMessage());
            // Optionally log the full stack trace for debugging
            // logger.error("Error interacting with Ollama", e);
        }

        String aiResponse = aiResponseBuilder.length() > 0 ? aiResponseBuilder.toString() : "无响应";
        
        // 记录聊天历史(包含向量嵌入)
        chatHistoryLogger.logChat(model, prompt, aiResponse);
        
        return aiResponse;
    }
    

    private List<String> getChatHistoryFromElasticsearch(String queryPrompt) {
        List<String> history = new ArrayList<>();
        try {
            // 1. 生成查询向量
            List<Float> queryEmbedding = chatHistoryLogger.getEmbedding(queryPrompt);
            if (queryEmbedding.isEmpty()) {
                System.err.println("无法为查询生成嵌入,跳过向量检索。");
                return history;
            }

            String searchUrl = elasticsearchBaseUrl + "/chat-history-*/_search";
            
            // 2. 构建 Elasticsearch k-NN 查询体
            Map<String, Object> esQueryBody = new HashMap<>();
            Map<String, Object> knnMap = new HashMap<>();
            knnMap.put("field", "response_embedding"); // 用于向量嵌入的字段
            knnMap.put("query_vector", queryEmbedding);
            knnMap.put("k", 5); // 返回最相似的 5 个结果
            knnMap.put("num_candidates", 10); // 搜索候选数量,通常 k 的几倍

            esQueryBody.put("knn", knnMap);

            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.setBasicAuth("elastic", "elastic"); // 请替换为您的实际用户名和密码

            HttpEntity<Map<String, Object>> entity = new HttpEntity<>(esQueryBody, headers);

            ResponseEntity<String> response = restTemplate.postForEntity(searchUrl, entity, String.class);
            
            if (response.getBody() != null) {
                Map<String, Object> responseMap = objectMapper.readValue(response.getBody(), Map.class);
                List<Map<String, Object>> hits = (List<Map<String, Object>>)((Map<String, Object>) responseMap.get("hits")).get("hits");
                
                for (Map<String, Object> hit : hits) {
                    Map<String, Object> source = (Map<String, Object>) hit.get("_source");
                    String historicalPrompt = (String) source.get("prompt");
                    String historicalResponse = (String) source.get("response");
                    String role = (String) source.get("role");
                    String content = (String) source.get("content");

                    // Use prompt and response directly from the source
                    if (historicalPrompt != null && historicalResponse != null) {
                        history.add("用户: " + historicalPrompt + "\nAI: " + historicalResponse);
                    } else if (role != null && content != null) {
                        history.add(role + ": " + content);
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("从Elasticsearch获取历史记录失败: " + e.getMessage());
        }
        return history;
    }
} 
package com.example.demo.logging;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class ChatHistoryLogger {
    private static final Logger logger = LoggerFactory.getLogger(ChatHistoryLogger.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final RestTemplate restTemplate = new RestTemplate();

    @Value("${ollama.base-url:http://localhost:11434}")
    private String ollamaBaseUrl;

    public void logChat(String model, String prompt, String response) {
        try {
            // 获取向量嵌入
            List<Float> promptEmbedding = getEmbedding(prompt);
            List<Float> responseEmbedding = getEmbedding(response);

            // 构建内部日志条目
            Map<String, Object> innerLogEntry = new HashMap<>();
            innerLogEntry.put("timestamp", LocalDateTime.now().toString());
            innerLogEntry.put("model", model);
            innerLogEntry.put("prompt", prompt);
            innerLogEntry.put("response", response);
            innerLogEntry.put("prompt_embedding", promptEmbedding);
            innerLogEntry.put("response_embedding", responseEmbedding);
            
            // 将内部日志条目包装在新的顶级字段中
            Map<String, Object> logEntry = new HashMap<>();
            logEntry.put("chat_log_data", innerLogEntry);

            String jsonLog = objectMapper.writeValueAsString(logEntry);
            logger.info(jsonLog);
        } catch (Exception e) {
            logger.error("Failed to log chat history", e);
        }
    }

    /**使用 Ollama 的 embeddings API 生成向量 */
    public List<Float> getEmbedding(String text) {
        try {
            Map<String, Object> body = new HashMap<>();
            body.put("model", "qwen:7b");
            body.put("prompt", text);

            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);

            ResponseEntity<Map> response = restTemplate.postForEntity(
                ollamaBaseUrl + "/api/embeddings",
                entity,
                Map.class
            );

            if (response.getBody() != null && response.getBody().containsKey("embedding")) {
                @SuppressWarnings("unchecked")
                List<Float> embedding = (List<Float>) response.getBody().get("embedding");
                return embedding;
            }
        } catch (Exception e) {
            logger.error("Failed to get embedding", e);
        }
        return new ArrayList<>();
    }
} 

下面时工作空间结构,由于文件太大,这里就不贴源码了,后面会上传到csdn,从csdn下载,谢谢大家的支持。

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐