《AI Agent智能体开发实践 邓立国 邓淇文著 五大实战案例掌握AI Agent开发 LangChain示例 人工智能技术丛书 清华大学出版社》【摘要 书评 试读】- 京东图书

在AI智能体技术中,“记忆与知识管理”是实现智能体连续交互、个性化决策和复杂任务处理的核心能力。它类比人类的记忆机制,通过短期记忆处理即时信息,通过长期记忆存储和复用历史经验与结构化知识,两者协同支撑智能体的“认知”与“学习”能力。本节将从短期记忆(Short-Term Memory)和长期记忆(Long-Term Memory)两方面展开详细解析。

3.5.1  短期记忆

短期记忆是智能体对“近期信息”的临时存储与调用机制,主要用于支撑当前任务的连续交互(如对话、实时决策),特点是时效性强、容量有限、需快速更新。

1. 上下文窗口

上下文窗口(Context Window)是大语言模型(LLM)处理短期记忆的基础机制,是指模型在单次交互中能“记住”的输入文本长度(以tokens为单位)。例如:

(1)GPT-4的32K tokens窗口(约2.4万字的中文)可容纳多轮对话历史、当前任务描述、临时参数等信息,确保智能体在单轮会话中理解上下文逻辑(如“上文提到的‘这个方案’指什么”)。

(2)局限:窗口容量固定,若对话过长(如超过32K tokens),早期信息会被“遗忘”,导致上下文断裂(例如,长对话中用户重复提及的偏好被忽略)。

【示例3.16】本示例模拟GPT-4 32K tokens上下文窗口,展示如何管理和维护一个固定大小的上下文缓冲区。

import tiktoken

class ContextWindow:
    def __init__(self, max_tokens=32000):
        """
        初始化上下文窗口
        :param max_tokens: 最大token容量,默认为32K
        """
        self.max_tokens = max_tokens
        self.encoder = tiktoken.get_encoding("cl100k_base")  # GPT-4使用的编码器
        self.messages = []
        self.current_tokens = 0
    
    def add_message(self, role, content):
        """
        添加新消息到上下文
        :param role: 消息角色(如'system', 'user', 'assistant')
        :param content: 消息内容
        """
        message = {"role": role, "content": content}
        message_tokens = len(self.encoder.encode(content))
        
        # 检查是否超出容量
        if self.current_tokens + message_tokens > self.max_tokens:
            self._trim_messages(self.current_tokens + message_tokens - self.max_tokens)
        
        self.messages.append(message)
        self.current_tokens += message_tokens
    
    def _trim_messages(self, excess_tokens):
        """
        修剪消息以保持不超过token限制
        :param excess_tokens: 需要移除的token数量
        """
        removed_tokens = 0
        while removed_tokens < excess_tokens and len(self.messages) > 0:
            oldest_message = self.messages.pop(0)
            removed_tokens += len(self.encoder.encode(oldest_message["content"]))
        
        self.current_tokens -= removed_tokens
    
    def get_context(self):
        """
        获取当前上下文
        :return: 完整的上下文消息列表
        """
        return self.messages.copy()
    
    def clear(self):
        """清空上下文窗口"""
        self.messages = []
        self.current_tokens = 0

# 测试用例
if __name__ == "__main__":
    # 初始化32K tokens的上下文窗口
    context = ContextWindow(max_tokens=32000)
    
    # 添加系统消息
    context.add_message("system", "你是一个有帮助的AI助手。")
    
    # 模拟长对话
    long_text = "这是一段很长的文本。" * 1000  # 约3000个中文字符
    context.add_message("user", long_text)
    
    # 检查当前token数
    print(f"当前token数: {context.current_tokens}")
    
    # 添加更多消息直到接近上限
    for i in range(10):
        context.add_message("assistant", f"这是第{i}条回复。" * 50)
    
    print(f"添加后token数: {context.current_tokens}")
    
    # 尝试添加会触发修剪的消息
    overflow_text = "这将触发修剪机制。" * 5000
    context.add_message("user", overflow_text)
    
    print(f"修剪后token数: {context.current_tokens}")
    print(f"剩余消息数: {len(context.get_context())}")

运行代码,输出如下:

当前token数: 11014
添加后token数: 15014
修剪后token数: 55000
剩余消息数: 1

关键技术说明:

(1)Token计数:

  1. 使用tiktoken库(与GPT-4相同的tokenizer)准确计算文本的Token数量。
  2. 支持多语言混合内容的Token计数。

(2)窗口管理:

  1. 采用FIFO(先进先出)策略管理上下文。
  2. 当新内容导致超出限制时,自动移除最早的对话内容。

(3)性能优化:

  1. 维护当前Token总数的缓存,避免每次计算全部Token。
  2. 修剪时只移除必要数量的最早消息。

(4)扩展性:

  1. 可轻松调整最大Token限制。
  2. 支持多种消息角色(system/user/assistant)。

这个实现模拟了AI系统如何在实际对话中维护固定大小的上下文窗口,是构建对话AI系统的基础组件之一。

2. 对话状态跟踪

对话状态跟踪(Dialog State Tracking,DST)用于在多轮对话中精准提取和维护关键信息(如用户意图、实体参数、未完成的目标),是任务型智能体的核心技术。例如:

(1)谷歌Dialogflow通过“意图识别”和“实体提取”跟踪对话状态:若用户说“明天下午3点订从北京到上海的高铁”,DST会记录“时间=明天下午3点”“出发地=北京”“目的地=上海”“任务=订高铁票”,后续对话中即使用户简化表达(如“改到后天”),智能体仍能关联到“时间”参数进行更新。

(2)核心目标:将非结构化的对话文本转换为结构化的“状态变量”,避免信息遗漏或误解。

【示例3.17】本示例基于规则的对话状态跟踪,模拟简单的对话管理系统。

class DialogStateTracker:
    def __init__(self):
        # 初始化对话状态
        self.state = {
            'intent': None,
            'slots': {},
            'confirmed': False,
            'history': []
        }

    def update_state(self, user_input):
        """
        根据用户输入更新对话状态
        """
        # 将当前输入添加到历史记录中
        self.state['history'].append(user_input)

        # 简单的意图识别
        if '预订' in user_input or '预定' in user_input:
            self.state['intent'] = 'book'
        elif '查询' in user_input or '查找' in user_input:
            self.state['intent'] = 'search'
        elif '取消' in user_input:
            self.state['intent'] = 'cancel'

        # 简单的槽位填充
        if '餐厅' in user_input:
            self.state['slots']['venue'] = '餐厅'
        elif '酒店' in user_input:
            self.state['slots']['venue'] = '酒店'

        if '明天' in user_input:
            self.state['slots']['date'] = '明天'
        elif '今天' in user_input:
            self.state['slots']['date'] = '今天'

        if '晚上' in user_input:
            self.state['slots']['time'] = '晚上'
        elif '中午' in user_input:
            self.state['slots']['time'] = '中午'

        # 确认检测
        if '是的' in user_input or '对的' in user_input:
            self.state['confirmed'] = True
        elif '不是' in user_input or '不对' in user_input:
            self.state['confirmed'] = False

    def get_current_state(self):
        """
        获取当前对话状态
        """
        return self.state

    def reset(self):
        """
        重置对话状态
        """
        self.__init__()


# 测试用例
if __name__ == "__main__":
    tracker = DialogStateTracker()

    # 模拟对话1
    print("测试对话1:")
    tracker.update_state("我想预订一家餐厅")
    print(tracker.get_current_state())

    tracker.update_state("明天晚上")
    print(tracker.get_current_state())

    tracker.update_state("是的")
    print(tracker.get_current_state())

    # 重置
    tracker.reset()

    # 模拟对话2
    print("\n测试对话2:")
    tracker.update_state("查询附近的酒店")
    print(tracker.get_current_state())

    tracker.update_state("今天中午")
    print(tracker.get_current_state())

运行代码,输出如下:

{'intent': 'book', 'slots': {'venue': '餐厅'}, 'confirmed': False, 'history': ['我想预订一家餐厅']}
{'intent': 'book', 'slots': {'venue': '餐厅', 'date': '明天', 'time': '晚上'}, 'confirmed': False, 'history': ['我想预订一家餐厅', '明天晚上']}
{'intent': 'book', 'slots': {'venue': '餐厅', 'date': '明天', 'time': '晚上'}, 'confirmed': True, 'history': ['我想预订一家餐厅', '明天晚上', '是的']}
测试对话2:
{'intent': 'search', 'slots': {'venue': '酒店'}, 'confirmed': False, 'history': ['查询附近的酒店']}
{'intent': 'search', 'slots': {'venue': '酒店', 'date': '今天', 'time': '中午'}, 'confirmed': False, 'history': ['查询附近的酒店', '今天中午']}

关键功能说明:

(1)对话状态结构:

  1. Intent:当前对话意图,如预订、查询、取消。
  2. Slots:对话中提取的关键信息槽位,如地点、时间。
  3. Confirmed:用户是否确认了当前信息。
  4. History:对话历史记录。

(2)主要方法:

  1. update_state():根据用户输入更新对话状态。
  2. get_current_state():获取当前对话状态。
  3. reset():重置对话状态。

(3)实现特点:

  1. 基于规则的简单实现,适合理解DST的基本原理。
  2. 可扩展为更复杂的NLU(自然语言理解)组件。
  3. 状态管理清晰,便于后续对话策略决策。

(4)改进方向:

  1. 添加机器学习模型进行意图识别和槽位填充。
  2. 增加多轮对话上下文处理。
  3. 添加对话策略模块。

这个实现展示了对话状态跟踪的基本原理,实际应用中通常会结合更复杂的自然语言处理技术。

3.5.2  长期记忆

AI智能体的长期记忆通常通过外部向量存储(如FAISS或Pinecone)实现,支持快速检索语义相关文档,以扩展静态LLM的知识范围,缓解幻觉现象,并生成基于外部事实的上下文相关响应。

1. 向量数据库(FAISS)

向量数据库是一种专门存储向量数据的数据库,它能够高效地存储和检索高维度的数据,为各种机器学习算法提供支持。而FAISS(Facebook AI Similarity Search)则是Facebook AI Research开发的一个开源库,主要用于高效地进行大规模相似性搜索和聚类操作。

1)FAISS的主要功能

(1)向量索引与搜索:FAISS提供了多种索引和搜索向量的方法,如暴力搜索(Flat)、倒排索引(IVF)、分层可导航小世界图(HNSW)和乘积量化(PQ)等。这些方法可以根据应用场景在速度、准确性和内存使用之间进行权衡。

(2)支持多种距离度量:FAISS支持多种距离度量方式,如L2距离(欧几里得距离)、余弦相似度和内积(点积),适用于不同的应用场景。

(3)CPU和GPU支持:FAISS能够利用CPU和GPU加速索引和搜索过程,在大规模数据集上表现出色,尤其适合需要实时搜索的场景。

(4)高效性:FAISS针对大规模数据集进行了优化,能够快速处理数十亿向量。

(5)可扩展性:FAISS设计用于处理大规模数据集,能够有效管理数十亿向量。

(6)灵活性:FAISS允许用户根据应用需求调整索引和搜索参数,并且可以动态添加、更新和删除向量。

2)FAISS的应用场景

由于FAISS具有上述功能特点,因此被广泛应用于各种需要高效相似性搜索和聚类操作的场景,比如:

(1)推荐系统:通过搜索用户历史行为或兴趣相似的向量,为用户推荐可能感兴趣的内容或    商品。

(2)信息检索:在搜索引擎中,通过搜索与查询关键词相似的向量,快速找到相关的文档或     网页。

(3)图像检索:将图像表示为向量,并通过搜索相似的向量来检索相似的图像。

(4)自然语言处理:将文本表示为向量,并通过搜索相似的向量来进行文本匹配、语义理解等任务。

3)FAISS的安装与使用

(1)安装:FAISS分为CPU和GPU两个版本,可以根据硬件环境选择合适的版本进行安装。例如,可以使用pip命令安装CPU版本的FAISS:pip install faiss-cpu。如果需要GPU加速,则需要安装GPU版本的FAISS,并确保系统已安装CUDA等必要的驱动程序。

(2)使用:在使用FAISS时,首先需要创建索引对象,并指定向量的维度和索引类型。然后,可以将向量数据添加到索引中,并进行搜索操作。搜索操作会返回与查询向量最相似的向量的ID和相似度得分。此外,FAISS还支持删除向量、重置索引等操作。

【示例3.18】使用FAISS向量数据库实现AI智能体长期记忆功能。

import faiss
import numpy as np
from typing import List, Dict, Any
import json
import os

class AIMemory:
    def __init__(self, dim: int = 1536, index_type: str = "FlatL2", storage_path: str = "ai_memory"):
        """
        初始化AI长期记忆系统

        Args:
            dim: 向量维度
            index_type: 索引类型,支持FlatL2、HNSW等
            storage_path: 数据存储路径
        """
        self.dim = dim
        self.index_type = index_type
        self.storage_path = storage_path
        self.metadata = []  # 存储向量对应的元数据

        # 创建存储目录
        if not os.path.exists(storage_path):
            os.makedirs(storage_path)

        # 初始化索引
        self._init_index()

        # 加载已保存的数据
        self._load_memory()

    def _init_index(self):
        """根据指定类型初始化FAISS索引"""
        if self.index_type == "FlatL2":
            self.index = faiss.IndexFlatL2(self.dim)
        elif self.index_type == "HNSW":
            self.index = faiss.IndexHNSWFlat(self.dim, 32)
            self.index.hnsw.efConstruction = 40
        else:
            raise ValueError(f"不支持的索引类型: {self.index_type}")

    def _load_memory(self):
        """从磁盘加载已保存的记忆"""
        index_path = os.path.join(self.storage_path, "index.faiss")
        metadata_path = os.path.join(self.storage_path, "metadata.json")

        if os.path.exists(index_path) and os.path.exists(metadata_path):
            try:
                # 加载索引
                self.index = faiss.read_index(index_path)

                # 加载元数据
                with open(metadata_path, "r", encoding="utf-8") as f:
                    self.metadata = json.load(f)

                print(f"已从 {self.storage_path} 加载记忆: {len(self.metadata)} 条记录")
            except Exception as e:
                print(f"加载记忆失败: {e}")
                # 重新初始化索引
                self._init_index()

    def save_memory(self):
        """保存记忆到磁盘"""
        index_path = os.path.join(self.storage_path, "index.faiss")
        metadata_path = os.path.join(self.storage_path, "metadata.json")

        try:
            # 保存索引
            faiss.write_index(self.index, index_path)

            # 保存元数据
            with open(metadata_path, "w", encoding="utf-8") as f:
                json.dump(self.metadata, f, ensure_ascii=False, indent=2)

            print(f"已保存记忆到 {self.storage_path}")
        except Exception as e:
            print(f"保存记忆失败: {e}")

    def add_memory(self, vector: np.ndarray, data: Dict[str, Any]):
        """
        添加记忆条目

        Args:
            vector: 特征向量,NumPy数组
            data: 相关元数据,字典格式
        """
        # 确保向量维度正确
        vector = vector.reshape(1, -1)
        if vector.shape[1] != self.dim:
            raise ValueError(f"向量维度不匹配,期望 {self.dim},实际 {vector.shape[1]}")

        # 添加到索引
        self.index.add(vector)

        # 保存元数据
        self.metadata.append(data)

        # 自动保存(可优化为定期保存)
        self.save_memory()

        return len(self.metadata) - 1  # 返回添加的记忆ID

    def search_memory(self, query_vector: np.ndarray, k: int = 5) -> List[Dict[str, Any]]:
        """
        搜索相似记忆

        Args:
            query_vector: 查询向量
            k: 返回结果数量

        Returns:
            包含相似度和元数据的列表
        """
        # 确保向量维度正确
        query_vector = query_vector.reshape(1, -1)
        if query_vector.shape[1] != self.dim:
            raise ValueError(f"向量维度不匹配,期望 {self.dim},实际 {query_vector.shape[1]}")

        # 搜索
        distances, indices = self.index.search(query_vector, k)

        # 构建结果
        results = []
        for i, idx in enumerate(indices[0]):
            if idx != -1:  # -1表示未找到
                results.append({
                    "similarity": float(distances[0][i]),
                    "metadata": self.metadata[idx],
                    "memory_id": idx
                })

        return results

    def update_memory(self, memory_id: int, new_data: Dict[str, Any]):
        """
        更新记忆元数据
        
        Args:
            memory_id: 记忆ID
            new_data: 新的元数据
        """
        if 0 <= memory_id < len(self.metadata):
            self.metadata[memory_id].update(new_data)
            self.save_memory()
            return True
        return False

    def delete_memory(self, memory_id: int):
        """
        删除记忆

        Args:
            memory_id: 记忆ID
        """
        if 0 <= memory_id < len(self.metadata):
            # 注意:FAISS不支持直接删除索引项,这里采用标记删除
            self.metadata[memory_id]["deleted"] = True
            self.save_memory()
            return True
        return False

# 简单的文本向量化器示例(实际应用中应使用更强大的模型)
def simple_text_embedding(text: str) -> np.ndarray:
    """简单的文本向量化函数,实际应用中应替换为OpenAI Embeddings等"""
    # 这里仅作示例,返回随机向量
    # 实际应用中应使用sentence-transformers等模型
    return np.random.random(1536).astype('float32')

# 使用示例
if __name__ == "__main__":
    # 初始化记忆系统
    memory = AIMemory(dim=1536, index_type="FlatL2")

    # 添加记忆
    memory.add_memory(
        simple_text_embedding("机器学习是人工智能的一个分支"),
        {"text": "机器学习是人工智能的一个分支", "timestamp": "2023-05-15", "source": "教科书"}
    )

    memory.add_memory(
        simple_text_embedding("深度学习是机器学习的一个子领域"),
        {"text": "深度学习是机器学习的一个子领域", "timestamp": "2023-05-16", "source": "网络文章"}
    )

    # 搜索记忆
    query_vector = simple_text_embedding("人工智能的分支有哪些")
    results = memory.search_memory(query_vector, k=2)

    print("\n搜索结果:")
    for result in results:
        print(f"相似度: {result['similarity']:.4f}")
        print(f"内容: {result['metadata']['text']}")
        print(f"来源: {result['metadata']['source']}")
        print("-" * 40)

    # 更新记忆
    memory.update_memory(0, {"importance": "high"})

    # 删除记忆
    # memory.delete_memory(1)    

运行代码,输出如下:

已保存记忆到 ai_memory
已保存记忆到 ai_memory

搜索结果:
相似度: 257.7774
内容: 机器学习是人工智能的一个分支
来源: 教科书
----------------------------------------
相似度: 257.8586
内容: 深度学习是机器学习的一个子领域
来源: 网络文章
----------------------------------------
已保存记忆到 ai_memory
向量数据库(Pinecone)

输出结果会形成一个文件夹ai_memory,其中包含两个文件:index.faiss和metadata。

这个案例实现了一个基于FAISS的AI长期记忆系统,包含以下功能:

  1. 支持不同类型的FAISS索引(如FlatL2和HNSW)。
  2. 提供记忆的添加、搜索、更新和删除操作。
  3. 自动将记忆持久化到磁盘。
  4. 可以存储向量及其对应的元数据。

2. 向量数据库(Pinecone)

向量数据库Pinecone是一个托管的向量数据库服务,专为存储和查询高维向量数据而设计。

1)核心原理与运行机制

(1)向量索引

Pinecone的核心在于其向量索引技术,这是一种针对高维向量数据优化的数据结构。通过构建特殊索引结构(如树结构、图结构或哈希表),Pinecone能够在海量数据中快速执行相似性搜索。

(2)相似性搜索

相似性搜索是Pinecone的核心功能,用于快速找到与查询向量最相似的向量。查询处理过程包括预处理(对查询向量进行归一化)、索引搜索(利用索引结构定位候选向量)和精排(对候选向量进行精确距离计算,返回topK结果)。

(3)云原生架构

Pinecone是一个完全托管的云服务,其架构设计注重可扩展性、高可用性和安全性。采用分布式系统实现数据分片、负载均衡和故障恢复等功能,确保服务的高可用性和稳定性。

2)核心特点

(1)高性能相似性搜索

Pinecone采用先进的索引技术,如近似最近邻搜索(ANN),能够在海量高维向量数据中快速找到与查询向量最相似的结果。Pinecone适用于需要快速匹配和检索的场景,如推荐系统、信息检索和图像搜索等场景。

(2)托管服务

作为一种完全托管的云服务,Pinecone负责数据库的维护、扩展和安全性,开发者无须自行管理底层基础设施。这大大降低了开发和运维的复杂性,使开发者能够专注于业务逻辑的实现。

【示例3.19】使用Pinecone向量数据库创建索引、插入向量数据以及进行相似度搜索。

import pinecone
import numpy as np
from sklearn.datasets import fetch_20newsgroups
from sentence_transformers import SentenceTransformer
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 模型单例 - 避免重复加载模型
class EmbeddingModel:
    _instance = None
    
    @classmethod
    def get_instance(cls, model_name='all-MiniLM-L6-v2'):
        if cls._instance is None:
            cls._instance = SentenceTransformer(model_name)
        return cls._instance

# 初始化Pinecone
def init_pinecone(api_key, environment):
    try:
        pinecone.init(api_key=api_key, environment=environment)
        logger.info("Pinecone初始化成功")
        return True
    except Exception as e:
        logger.error(f"Pinecone初始化失败: {str(e)}")
        return False

# 创建索引
def create_index(index_name, dimension, metric="cosine"):
    try:
        if index_name not in pinecone.list_indexes():
            logger.info(f"创建索引 {index_name}...")
            pinecone.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric
            )
            logger.info(f"索引 {index_name} 创建成功")
        else:
            logger.info(f"索引 {index_name} 已存在")
        return pinecone.Index(index_name)
    except Exception as e:
        logger.error(f"创建索引失败: {str(e)}")
        return None

# 生成嵌入向量
def generate_embeddings(texts, batch_size=32):
    try:
        model = EmbeddingModel.get_instance()
        embeddings = []
        # 批量处理文本以避免内存问题
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            batch_embeddings = model.encode(batch)
            embeddings.extend(batch_embeddings)
        return np.array(embeddings)
    except Exception as e:
        logger.error(f"生成嵌入向量失败: {str(e)}")
        return None

# 准备数据并插入索引
def prepare_and_insert_data(index, texts, batch_size=100):
    # 过滤空文本
    texts = [text for text in texts if text.strip()]
    if not texts:
        logger.warning("没有有效的文本数据可插入")
        return 0
    
    # 生成嵌入向量
    embeddings = generate_embeddings(texts)
    if embeddings is None:
        return 0
    
    # 批量插入
    total_inserted = 0
    try:
        for i in range(0, len(texts), batch_size):
            end_idx = min(i + batch_size, len(texts))
            batch_texts = texts[i:end_idx]
            batch_embeddings = embeddings[i:end_idx]

            vectors = []
            for idx, (text, embedding) in enumerate(zip(batch_texts, batch_embeddings), start=i):
                vectors.append((f"vec_{idx}", embedding.tolist(), {"text": text}))

            index.upsert(vectors=vectors)
            total_inserted += len(vectors)
            logger.info(f"已插入 {total_inserted}/{len(texts)} 个向量")

        return total_inserted
    except Exception as e:
        logger.error(f"插入数据失败: {str(e)}")
        return total_inserted

# 查询相似向量
def query_similar(index, query_text, top_k=5):
    try:
        model = EmbeddingModel.get_instance()
        query_embedding = model.encode(query_text).tolist()

        results = index.query(
            vector=query_embedding,
            top_k=top_k,
            include_metadata=True
        )
        return results
    except Exception as e:
        logger.error(f"查询失败: {str(e)}")
        return None

if __name__ == "__main__":
    # 配置Pinecone(请替换为您的API密钥和环境)
    # 可以在 https://app.pinecone.io/ 获取这些信息
    PINECONE_API_KEY = "your-api-key"  # 替换为你的实际API密钥
    PINECONE_ENV = "your-environment"  # 替换为你的实际环境,如"us-east1-gcp"
    INDEX_NAME = "ai-agent-demo"

    # 初始化Pinecone
    if not init_pinecone(PINECONE_API_KEY, PINECONE_ENV):
        logger.error("无法初始化Pinecone,程序退出")
        exit(1)

    # 创建索引(384是all-MiniLM-L6-v2模型的维度)
    index = create_index(INDEX_NAME, dimension=384)
    if index is None:
        logger.error("无法创建或获取索引,程序退出")
        exit(1)

    try:
        # 加载示例数据
        logger.info("加载示例数据...")
        newsgroups = fetch_20newsgroups(
            subset='train', 
            remove=('headers', 'footers', 'quotes'),
            categories=['comp.graphics', 'sci.space']  # 选择特定类别以减少数据量
        )
        texts = newsgroups.data[:100]  # 使用前100个文档作为示例
        logger.info(f"加载了 {len(texts)} 条文本数据")

        # 插入数据
        num_vectors = prepare_and_insert_data(index, texts)
        logger.info(f"成功插入 {num_vectors} 个向量到索引 {INDEX_NAME}")

        # 示例查询
        query = "computer technology and artificial intelligence"
        logger.info(f"查询: {query}")
        results = query_similar(index, query)

        if results and results.matches:
            print("\n查询结果:")
            for match in results.matches:
                print(f"ID: {match.id}, 相似度: {match.score:.4f}")
                # 清理文本,去除多余的换行符
                cleaned_text = match.metadata['text'].replace('\n', ' ').strip()
                print(f"文本片段: {cleaned_text[:100]}...\n")
        else:
            logger.warning("没有找到匹配的结果")

    except Exception as e:
        logger.error(f"程序运行出错: {str(e)}")
    finally:
        # 可选:删除索引(仅用于测试)
        # if INDEX_NAME in pinecone.list_indexes():
        #     pinecone.delete_index(INDEX_NAME)
        #     logger.info(f"已删除索引 {INDEX_NAME}")
        pinecone.deinit()
        logger.info("Pinecone已关闭连接")

运行代码(提供有效的Pinecone API密钥),输出如下:

INFO:__main__:Pinecone初始化成功
INFO:__main__:索引 ai-agent-demo 已存在  # 或“创建索引...”
INFO:__main__:加载示例数据...
INFO:__main__:加载了 100 条文本数据
INFO:__main__:生成嵌入向量...
INFO:__main__:已插入 100/100 个向量
INFO:__main__:成功插入 100 个向量到索引 ai-agent-demo
INFO:__main__:查询: computer technology and artificial intelligence

查询结果:

ID: vec_42, 相似度: 0.8763
文本片段: "Computer graphics and rendering techniques for space simulations..."

ID: vec_17, 相似度: 0.8542
文本片段: "Advances in AI for autonomous space probes..."

...

代码功能解释:

(1)初始化Pinecone:使用pinecone.init函数初始化Pinecone客户端,需要传入API密钥和环境名称。

(2)创建索引:

  • 使用pinecone.create_index函数创建一个新的向量索引,如果索引已存在,则不会重复创建。
  • 索引的维度需要与嵌入模型的输出维度相匹配(本例中为384)。

(3)生成嵌入向量:

  • 使用SentenceTransformer模型的encode方法将文本数据转换为嵌入向量。
  • 本例中使用了all-MiniLM-L6-v2模型。

(4)插入数据到索引:

  • 将生成的嵌入向量与对应的文本数据(作为元数据)一起插入Pinecone索引中。
  • 使用index.upsert方法批量插入数据。

(5)查询相似向量:

  • 对于给定的查询文本,首先生成其嵌入向量。
  • 然后使用index.query方法在Pinecone索引中查询与查询向量最相似的向量。

返回的结果包括相似向量的ID、相似度得分和对应的文本片段(从元数据中获取)。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐