第二部分 RAG构建

摄取就是将文档转换为计算机可以理解和分析的数字,并将其存储在特殊类型的数据库中以便有效检索的过程。这些数字在形式上被称为嵌入,这种特殊类型的数据库被称为向量存储。

  1. 提取文本

  2. 分块

  3. 嵌入

  4. 向量存储

文本提取

纯文本处理

# 使用TextLoader类将来自不同来源的数据加载到由文本和相关元数据组成的document类
from langchain_community.document_loaders import TextLoader
# 换成自己文件地址
loader = TextLoader('./prompt/summarize.txt', encoding="utf-8")
docs = loader.load()
​
print(docs)

网页内容处理

# 前提:pip install beautifulsoup4
# 用WebBaseLoader从web url加载HTML并将其解析为文本
from langchain_community.document_loaders import WebBaseLoader
​
loader = WebBaseLoader('https://www.langchain.com/')
docs = loader.load()
​
print(docs)

PDF处理

# 使用PDFLoader从PDF文档中提取文本
from langchain_community.document_loaders import PyPDFLoader
​
loader = PyPDFLoader('./test/test.pdf')
pages = loader.load()
​
print(pages)

分块

文本分块

RecursiveCharacterTextSplitter类实现了

  • 按重要性排列一个分隔符列表。默认情况下,它们是:

    a.段落分离器:\n\n

    b.行分离器:\n

    c.字分离器:空格字符

  • 为了遵循给定的块大小,例如1000个字符,首先拆分段落。

  • 对于任何超过所需块大小的段落,用下一个分离器分隔:lines。继续执行,直到所有块都小于所需长度,或者没有其他分隔符可以尝试。

  • 将每个块作为Document发出,并传入原始文档的元数据和关于原始文档中位置的附加信息。

# 用RecursiveCharacterTextSplitter对文件进行分块
from langchain_text_splitters import RecursiveCharacterTextSplitter
​
from langchain_community.document_loaders import TextLoader
​
loader = TextLoader('./prompt/summarize.txt', encoding="utf-8")
docs = loader.load()
​
# chunk_size表示允许的最大的分块长度
# chunk_overlep分块之中的重叠部分
splitter = RecursiveCharacterTextSplitter(chunk_size=50, chunk_overlap=20)
splitted_docs = splitter.split_documents(docs)
​
print(splitted_docs)

代码分块

RecursiveCharacterTextSplitter包含了许多流行语言的分隔符,比如Python、JS、Markdown、HTML等等,并实现例如,每个函数体都保持在同一个块中等分块策略。

# 对源代码进行拆分
from langchain_text_splitters import (
    Language,
    RecursiveCharacterTextSplitter,
)
​
PYTHON_CODE = """ def hello_world(): print("Hello, World!") # Call the function hello_world() """
# 使用from_language()创建实例
python_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON, chunk_size=50, chunk_overlap=0
)
​
python_docs = python_splitter.create_documents([PYTHON_CODE])
print(python_docs)
# 对markdown进行拆分
from langchain_text_splitters import (
    Language,
    RecursiveCharacterTextSplitter,
)
markdown_text = """ # 🦜🔗 LangChain ⚡ Building applications with LLMs through composability ⚡ ## Quick Install ```bash pip install langchain ``` As an open source project in a rapidly developing field, we are extremely open     to contributions. """
​
# 文本沿着Markdown文档中的自然停止点拆分;例如,标题放在一个块中,标题下的文本行放在另一个块中,依此类推。
# 对markdown和源代码进行拆分后返回的是一个特殊对象,所以要用到create_documents再进行处理
# 特殊对象<langchain_text_splitters.character.RecursiveCharacterTextSplitter object at 0x000001B3A1E3E120>
md_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.MARKDOWN, chunk_size=60, chunk_overlap=0
)
​
# create_documents方法
# 第一个参数是要进行切分的数据
# 第二个参数是要传入的元数据 类似于来源、文件名之列的数据
md_docs = md_splitter.create_documents(
    [markdown_text], [{"source": "https://www.langchain.com"}])
​
print(md_docs)

文本嵌入

Embeddings类,用于与文本嵌入模型交互,并生成文本的矢量表示。该类提供了两个方法:一个用于嵌入文档,另一个用于嵌入查询。前者接受文本字符串列表作为输入,而后者接受单个文本字符串。

# 翻译过程中我使用了开源的嵌入工具 如果你需要使用API 请自行替换嵌入模型
from langchain_openai import OpenAIEmbeddings
model = OpenAIEmbeddings()
embeddings = model.embed_documents([
"Hi there!",
"Oh, hello!",
"What's your name?",
"My friends call me World",
"Hello World!"
])
# 同时嵌入多个文档
from langchain_huggingface import HuggingFaceEmbeddings
​
embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  # 高效的语义模型
)
​
# 输入文本
texts = ["你好,世界。", "LangChain 是一个强大的工具。"]
​
# 生成嵌入向量
vectors = embeddings.embed_documents(texts)
​
# 打印结果
for idx, vector in enumerate(vectors):
    print(f"文本 {idx + 1}: {texts[idx]}")
    print(f"嵌入向量: {vector[:5]}... (维度: {len(vector)})\n")

复习案例

# 批量进行嵌入操作
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_ollama import OllamaLLM
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
​
# 引用模型
deepseek = OllamaLLM(
    model="deepseek-r1:32b",
)
# 加载文档
loader = TextLoader("./test/deeplearning.txt", encoding="utf-8")
doc = loader.load()
​
# 文档分割
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(doc)
​
# 生成嵌入
embeddings_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  
)
embeddings = embeddings_model.embed_documents(
    [chunk.page_content for chunk in chunks]
)
​
print(embeddings)

向量存储

准备活动

# 需要安装docker
# 运行下面命令 在6024端口启动Postgres
docker run \
    --name pgvector-container \
    -e POSTGRES_USER=langchain \
    -e POSTGRES_PASSWORD=langchain \
    -e POSTGRES_DB=langchain \
    -p 6024:5432 \
    -d pgvector/pgvector:pg16
# 可在docker仪器表观察运行
# 重要连接字符串
postgresql+psycopg://langchain:langchain@localhost:6024/langchain

向量存储

# PGVector初始化参数
1. connection_string PostgreSQL 数据库连接字符串
2. collection_name 向量集合名称,用于在数据库中标识不同的向量集合
3. embedding_function 使用的嵌入模型
可选参数
4. pre_delete_collection 是否在创建新集合前删除同名集合
5. distance_strategy
- 可选值 :
  - "cosine" :余弦相似度
  - "euclidean" :欧几里得距离
  - "manhattan" :曼哈顿距离
- 说明 :用于计算向量间距离的策略
​
PGVector 类提供了以下五个主要方法:
from_documents:从文档和嵌入模型创建一个向量存储实例。
similarity_search:在向量存储中执行相似性搜索,返回与查询最相似的前 k 个文档。
add_documents:向向量存储中添加新文档。
delete:从向量存储中删除文档。
get_by_ids:根据文档 ID 获取文档。
# 需要安装langchain_postgres
from langchain_community.document_loaders import TextLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_postgres.vectorstores import PGVector
from langchain_core.documents import Document
import uuid
​
# 请参阅上面的docker命令以启动启用了pgvector的PostgreSQL实例。
# 用于连接启用了pgvector的PostgreSQL数据库的连接字符串
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
​
# 加载文档并将其拆分为多个小块
# 从指定路径以UTF-8编码加载原始文档
raw_documents = TextLoader('./test/deeplearning.txt', encoding="utf-8").load()
​
# 初始化文本拆分器以将文档拆分成较小的块
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,  # 每个块的最大大小
    chunk_overlap=200  # 连续块之间的重叠部分
)
# 将原始文档拆分成较小的块
documents = text_splitter.split_documents(raw_documents)
​
# 为文档创建嵌入
# 使用预训练的HuggingFace模型初始化用于生成嵌入的模型
embeddings_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  # 高效的语义模型
)
​
# 从文档列表生成一个向量存储。此存储将文档的嵌入向量保存在 PostgreSQL 数据库中,以便后续进行相似性搜索等操作。
db = PGVector.from_documents(
    documents,  # 要存储的文档
    embeddings_model,  # 用于生成嵌入的模型
    connection=connection  # PostgreSQL数据库的连接字符串
)
​
# 在向量存储(db)中找到最相似的文档
results = db.similarity_search("query", k=4)  # 搜索与查询最相似的前4个文档
​
# 向 向量存储中生成新文档
# 为新文档生成唯一ID
ids = [str(uuid.uuid4()), str(uuid.uuid4())]
# 使用生成的ID向向量存储中添加新文档
db.add_documents(
    [
        Document(
            page_content="池塘里有猫",  # 文档的内容
            metadata={"location": "池塘", "topic": "动物"},  # 与文档相关的元数据
        ),
        Document(
            page_content="池塘里也有鸭子",  # 文档的内容
            metadata={"location": "池塘", "topic": "动物"},  # 与文档相关的元数据
        ),
    ],
    ids=ids,  # 新文档的ID
)
​
# 打印文档添加的确认信息
print("文档添加成功。\n获取的文档数量:",
      len(db.get_by_ids(ids)))  # 获取并打印添加的文档数量
​
# 从向量存储中删除文档
print("删除ID为", ids[1], "的文档")
db.delete({"ids": ids})  # 删除具有指定ID的文档
​
# 打印文档删除的确认信息
print("文档删除成功。\n获取的文档数量:",
      len(db.get_by_ids(ids)))  # 获取并打印剩余的文档数量

修改跟踪

SQLRecordManager用于追踪向量数据库中的文档索引状态。

  1. 文档索引追踪:记录哪些文档已被索引到向量数据库中,以及索引的时间

  2. 增量更新支持:识别哪些文档需要新增、更新或保持不变

  3. 文档去重:防止重复索引相同的文档

  4. 高效清理:提供多种清理模式,删除过时的文档向量

主要方法

  • create_schema():创建必要的数据库表结构

  • update():更新记录状态,记录文档索引时间

  • exists():检查文档是否已被索引

  • list_keys():列出符合条件的记录

  • delete_keys():删除指定记录

  • get_time():获取服务器时间戳

# 初始化SQL记录管理器
1. namespace :用于标识记录管理器的命名空间。命名空间可以帮助区分不同的记录集合,避免冲突。
2. db_url :数据库连接字符串,指定了连接到 PostgreSQL 数据库的详细信息。
   - 格式 : postgresql+psycopg://username:password@host:port/database
     - username :数据库用户名
     - password :数据库密码
     - host :数据库主机地址(如 localhost )
     - port :数据库端口号(如 6024 )
     - database :数据库名称(如 langchain )
SQLRecordManager 是 langchain 库中的一个类,用于管理数据库中的记录。它提供了创建、更新、删除和查询记录的功能。通过指定 namespace 和 db_url ,你可以初始化一个记录管理器来操作特定数据库中的记录。
# SQLRecordManager通常与index函数搭配使用
# index部分参数介绍
必需参数
1. docs :一个document对象
2. record_manager :SQLRecordManager 的实例
3. vectorstore :PGVector 的实例
可选参数
4. cleanup :指定如何处理重复文档。
   - 可选值 :
     - "incremental" :增量清理,防止重复添加相同的文档。
     - "full" :完全清理,删除所有旧版本并添加新版本。
5. source_id_key :确保每个文档在数据库中都有唯一的标识。
# 需要安装chroma db
from langchain.indexes import index
from langchain_community.vectorstores import Chroma
from langchain.docstore.document import Document
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.indexes import SQLRecordManager
​
# 初始化SQL记录管理器
record_manager = SQLRecordManager(
    namespace="my_docs_namespace",  # 命名空间,用于隔离不同索引集
    db_url="postgresql+psycopg://langchain:langchain@localhost:6024/langchain"  # 数据库连接URL
)
​
# 创建必要的数据库表结构
record_manager.create_schema()
​
embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  # 高效的语义模型
)
# 这个即将被移除 按照报错进行修改
vectorstore = Chroma(embedding_function=embeddings, collection_name="my_collection")
​
# 准备文档
docs = [
    Document(page_content="这是第一个文档", metadata={"source": "doc1.txt"}),
    Document(page_content="这是第二个文档", metadata={"source": "doc2.txt"}),
]
​
# 索引文档(增量模式)
result = index(
    docs,
    record_manager,
    vectorstore,
    cleanup="incremental",  # 增量清理模式
    source_id_key="source",  # 使用source字段作为文档源标识
)
​
print(f"新增: {result['num_added']}, 更新: {result['num_updated']}, "
      f"跳过: {result['num_skipped']}, 删除: {result['num_deleted']}")
# 输出 新增: 2, 更新: 0, 跳过: 0, 删除: 0
​
# 假设文档内容有变化
updated_docs = [
    Document(page_content="这是修改后的第一个文档", metadata={"source": "doc1.txt"}),
    Document(page_content="这是第二个文档", metadata={"source": "doc2.txt"}),
    Document(page_content="这是新增的第三个文档", metadata={"source": "doc3.txt"}),
]
​
# 再次索引
result = index(
    updated_docs,
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
​
print(f"新增: {result['num_added']}, 更新: {result['num_updated']}, "
      f"跳过: {result['num_skipped']}, 删除: {result['num_deleted']}")

索引优化

使用MultiVectorRetriever对索引进行了优化

from langchain_community.document_loaders import TextLoader
from langchain_ollama import ChatOllama
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_postgres.vectorstores import PGVector
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel
from langchain_core.runnables import RunnablePassthrough
from langchain_core.documents import Document
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
import uuid
from langchain_huggingface import HuggingFaceEmbeddings
# 步骤一:使用大模型生成摘要
# PostgreSQL数据库的连接字符串
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
collection_name = "summaries"
​
# 初始化嵌入模型
embeddings_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  # 高效的语义模型
)
​
# 加载文档
loader = TextLoader("./test/deeplearning.txt", encoding="utf-8")
docs = loader.load()
​
​
# 分割文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(docs)
​
# 定义提示模板
prompt_text = "Summarize the following document:\n\n{doc}"
prompt = ChatPromptTemplate.from_template(prompt_text)
​
# 初始化Ollama模型
llm = ChatOllama(
    model="deepseek-r1:32b",
)
​
# 定义摘要链
# {"doc": lambda x: x.page_content} 是一个字典
summarize_chain = {
    "doc": lambda x: x.page_content
} | prompt | llm | StrOutputParser()
​
# 并行处理文档块进行摘要
# 同时指定了最大的并发数 {"max_concurrency": 5}
summaries = summarize_chain.batch(chunks, {"max_concurrency": 5})
print(summaries)
​
# 步骤二 vectorstore和store用于存储原始摘要及其嵌入
# 初始化向量存储
vectorstore = PGVector(
    embeddings=embeddings_model,
    collection_name=collection_name,
    connection=connection,
    use_jsonb=True,
)
​
# 初始化内存存储 用于临时数据存储
store = InMemoryStore()
# 定义"doc_id"作为关联两个存储系统的关键字段
id_key = "doc_id"
​
# 初始化多向量检索器
retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    docstore=store,
    id_key=id_key,
)
​
# 为每个文档块生成唯一的ID
doc_ids = [str(uuid.uuid4()) for _ in chunks]
​
# 将摘要文本包装为Document对象,并在元数据中嵌入ID,后添加到向量数据库,进行向量化处理
summary_docs = [
    Document(page_content=s, metadata={id_key: doc_ids[i]})
    for i, s in enumerate(summaries)
]
retriever.vectorstore.add_documents(summary_docs)
​
# 将原始文档存储在内存存储中,并将其与摘要文档通过ID关联
retriever.docstore.mset(list(zip(doc_ids, chunks)))
​
# 从向量存储中检索与查询最相似的摘要文档
sub_docs = retriever.vectorstore.similarity_search(
    "chapter on philosophy", k=2
)
print("sub docs: ", sub_docs[0].page_content)
print("length of sub docs:\n", len(sub_docs[0].page_content))
​
# 步骤三 查询检索相关的完整上下文文档
# 使用检索器进行查询,会返回较大的源文档块
retrieved_docs = retriever.invoke("chapter on philosophy")
print(retrieved_docs)
print("length of retrieved docs: ", len(retrieved_docs[0].page_content))
​
# 输入阶段:从文件系统读取文本文档
# 分块阶段:将大型文档分割成较小的块
# 摘要阶段:使用DeepSeek-R1:32b模型生成每个块的摘要
# 存储阶段:
# - 摘要通过嵌入模型向量化,存储在PostgreSQL数据库
# - 原始文档块保存在内存中
# - 两者通过UUID相互关联
# 检索阶段:
# - 使用语义相似度在向量数据库中查找相关摘要
# - 通过ID映射找到对应的原始文档块
# - 返回完整的原始文档作为最终结果

RAG系统的核心流程

  • 编入索引:对外部数据源进行预处理,并将表示数据的嵌入存储在向量存储中,以便于检索。

  • 检索:根据用户的查询检索存储在向量存储中的相关嵌入和数据。

  • 生成:将原始提示与检索到的相关文档合成,作为发送给模型进行预测的最终提示。

检索 添加 删除

from langchain_postgres.vectorstores import PGVector
from langchain_huggingface import HuggingFaceEmbeddings
​
# 数据库连接配置
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
​
# 初始化嵌入模型
embeddings_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"
)
​
# 连接到现有的向量存储
db = PGVector(
    connection=connection,
    embeddings=embeddings_model,
    collection_name="langchain"  # 确保这是你之前使用的collection_name
)
​
# 检索操作
query = "你想要查询的内容"
# 基础相似度检索
# k=2 表示返回2个最相关的文档
# 直接返回相关文档内容和元数据
docs = db.similarity_search(query, k=2) 
​
# 方法2:使用检索器 实现灵活配置
retriever = db.as_retriever(search_kwargs={"k": 2})
docs = retriever.invoke(query)
​
​
# 方法3:带分数的相似度搜索
# 除了返回相关文档外,还会返回相似度分数
# 适用于需要根据相似度分数进行筛选或排序的场景适用于需要根据相似度分数进行筛选或排序的场景
docs_and_scores = db.similarity_search_with_score(query, k=2)
​
# 输出查询结果
for doc in docs:
    print("文档内容:", doc.page_content)
    print("元数据:", doc.metadata)
    
# 删除操作
# 方法1:删除指定的文档
ids_to_delete = ["021ce213-54bf-44e2-aac1-f70db8bed151", "68bd0b6a-4146-4c8b-98b5-a410cd2ef1b6"]  # 文档ID列表
db.delete(ids_to_delete)
​
# 方法2:删除整个集合
db.delete_collection()
​
# 方法3:条件删除(基于元数据)
filter = {"source": "specific_source"}  # 根据元数据中的source字段筛选
db.delete(filter=filter)
​
# 添加操作
# 方法1:添加单个文档
doc = Document(
    page_content="这是新添加的文档内容",
    metadata={"source": "manual_input", "author": "user"}
)
db.add_documents([doc])
​
# 方法2:批量添加文档
docs = [
    Document(page_content="文档1", metadata={"id": "1"}),
    Document(page_content="文档2", metadata={"id": "2"}),
    Document(page_content="文档3", metadata={"id": "3"})
]
db.add_documents(docs)
​
# 方法3:从文本直接添加
texts = ["这是第一段文本", "这是第二段文本", "这是第三段文本"]
metadatas = [
    {"source": "text1"},
    {"source": "text2"},
    {"source": "text3"}
]
db.add_texts(texts, metadatas=metadatas)

简单索引实战

# 效果不是很好 可能是因为输入的文本实在太大了
from langchain_community.document_loaders import TextLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_postgres.vectorstores import PGVector
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import chain
​
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
​
# 加载数据并进行分块
raw_documents = TextLoader('./test/劳动法.txt', encoding='utf-8').load()
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000, chunk_overlap=200)
documents = text_splitter.split_documents(raw_documents)
​
# 生成嵌入模型
embeddings_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  # 高效的语义模型
)
​
db = PGVector.from_documents(
    documents, embeddings_model, connection=connection)
​
# 创建检索器以检索2个相关文档
retriever = db.as_retriever(search_kwargs={"k": 2})
​
query = '劳动者的收入可以低于当地的最低工资吗?'
​
# 获取相关文档
docs = retriever.invoke(query)
print("输出相关文档")
print(docs)
print("输出检索后的第一部分:")
print(docs[0].page_content)
​
prompt = ChatPromptTemplate.from_template(
    """根据以下内容回答问题:{context} 问题:{question} """
)
llm = ChatOllama(model="deepseek-r1:32b")
llm_chain = prompt | llm
​
# 基于相关文档回答问题
result = llm_chain.invoke({"context": docs, "question": query})
print("最后输出的结果是:")
print(result)
​
# 再次运行,但这次封装逻辑以提高效率
# @chain 装饰器将此函数转换为LangChain可运行对象,
# 使其与LangChain的链操作和管道兼容
​
print("再次运行,但这次封装逻辑以提高效率\n")
@chain
def qa(input):
    # 获取相关文档
    docs = retriever.invoke(input)
    # 格式化提示
    formatted = prompt.invoke({"context": docs, "question": input})
    # 生成答案
    answer = llm.invoke(formatted)
    return answer
​
# 运行
result = qa.invoke(query)
print(result.content)

查询转化

重写-检索-读取策略

RAG过于依赖于用户查询的质量,无法生成准确的输出。在生产环境中,用户可能以不完整、含糊不清或措辞不佳的方式构建查询,从而导致模型幻觉。

from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
​
# 查询以无关信息开头,然后才问相关问题
query = '今天我起床刷牙,然后坐下来阅读新闻。然后我忘记了炉子上的食物。古希腊哲学史上的一些重要人物是谁?'
​
# 重写查询以提高准确性
rewrite_prompt = ChatPromptTemplate.from_template(
    """ 我将输入一段文字 里面是用于的提问 请删去其中无用的描述 仅输出最简介的问题 内容:{x}"""
)
​
def parse_rewriter_output(message):
    return message.content.strip('"').strip("**")
​
rewriter = rewrite_prompt | ChatOllama(model="deepseek-r1:32b") | parse_rewriter_output
new_query = rewriter.invoke({"x": query})
print("重写后的查询:", new_query)
# 去除思考过程后 输出:古希腊哲学史上的一些重要人物是谁?
# 将这个作为RAG大模型的输入提示词

多查询检索策略

多查询检索策略指示大型语言模型根据用户的初始查询生成多个查询,对数据源中的每个查询执行并行检索,然后将检索到的结果插入提示上下文,以生成最终的模型输出。此策略对于单个问题可能依赖于多个透视图来提供答案的用例特别有用。

# 定义提示词
perspectives_prompt = ChatPromptTemplate.from_template(
    """你是一个AI语言模型助手。你的任务是生成五个不同版本的用户问题,以便从向量数据库中检索相关文档。
    通过生成用户问题的多个视角,你的目标是帮助用户克服基于距离的相似性搜索的一些局限性。
    请将这些替代问题用换行符分隔。
    原始问题: {question}""")
​
# 要提的问题
question = "要怎么反应意见"
​
query_gen = perspectives_prompt | llm
# invoke返回的是一个AImessage对象
# deepseek的返回的AImessage对象的conten内容中含有<think></think>
question = remove_think(query_gen.invoke({"question": question}).content)
# 将生成的问题处理成列表结构
questionList = question.split("\n")
# 然后将生成的问题输入到列表中即可
# 下文中的操作就是先使用了多查询检索 然后使用RRF对其进行排序

RAG-Fusion

RAG-Fusion策略将使用互反秩融合(RRF)算法对所有检索到的文档应用最后的重新排序步骤。反秩融合(RRF)算法算法涉及将不同搜索结果的秩组合在一起,以产生单一的、统一的排序。通过组合来自不同查询的排名,我们将最相关的文档拉到最终列表的顶部。RRF非常适合组合来自可能具有不同尺度或分数分布的查询的结果。

from langchain_community.document_loaders import TextLoader
from langchain_core.runnables import chain
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_postgres.vectorstores import PGVector
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
​
from Deepseek import remove_message_think_tag
​
# 文件加载以及一些固定内容
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
raw_documents = TextLoader('document/员工手册.txt', encoding='utf-8').load()
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000, chunk_overlap=200)
documents = text_splitter.split_documents(raw_documents)
embeddings_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"  # 高效的语义模型
)
db = PGVector.from_documents(
    documents, embeddings_model, connection=connection)
retriever = db.as_retriever(search_kwargs={"k": 5})
llm = ChatOllama(model="deepseek-r1:32b", temperature=0)
​
# 相关提示词
prompt_rag_fusion = ChatPromptTemplate.from_template(
    """你是一个有帮助的助手,根据一个输入查询生成多个搜索查询。\n
    生成与以下内容相关的多个搜索查询: {question} \n
    输出(4个查询):""")
​
def parse_queries_output(message):
    return message.content.split('\n')
​
# 问题生成链
query_gen = prompt_rag_fusion | llm |remove_message_think_tag|parse_queries_output
​
query = "女工享受什么权益?"
​
# 查看query_gen输出内容 一个问题列表
# generated_queries = query_gen.invoke({"question": query})
​
​
# RRF的实现
# k值:控制排名权重衰减速度,k越大,排名靠后的项目得分差异越小 通常k值设为60,这是一个经验值,可以根据具体场景调整
def reciprocal_rank_fusion(results, k=60):
    fused_scores = {}
    documents = {}
    for docs in results:
        for rank, doc in enumerate(docs):
            doc_str = doc.page_content
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
                documents[doc_str] = doc
            fused_scores[doc_str] += 1 / (rank + k)
    reranked_doc_strs = sorted(
        fused_scores, key=lambda d: fused_scores[d], reverse=True)
    return [documents[doc_str] for doc_str in reranked_doc_strs]
​
​
# 文件检索链 使用了前面的问题生成链
retrieval_chain = query_gen | retriever.batch | reciprocal_rank_fusion
​
# 返回的是多个Document对象 有id、metadata、page_content几个属性
# result = retrieval_chain.invoke({"question": query})
​
​
prompt = ChatPromptTemplate.from_template(
    """根据以下内容回答问题:{context} 问题:{question} """
)
​
# RAG检索链
@chain
def rag_fusion(input):
    # 获取相关文档
    docs = retrieval_chain.invoke(input)  # 格式化提示
    formatted = prompt.invoke(
        {"context": docs, "question": input})  # 生成答案
    answer = llm.invoke(formatted)
    return answer
​
​
result = rag_fusion.invoke(query)
print(remove_message_think_tag(result).content)
​

假设文本嵌入

假设文档嵌入(Hypothetical Document Embeddings, HyDE)是一种基于用户查询创建假设文档、嵌入文档和基于向量相似性检索相关文档的策略。HyDE背后的直觉是,llm生成的假设文档将比原始查询更类似于最相关的文档。

from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_postgres.vectorstores import PGVector
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import chain
from langchain_core.output_parsers import StrOutputParser
from Util.DeepseekUtil import remove_message_think_tag
​
# 文件加载以及一些固定内容
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
raw_documents = TextLoader('document/员工手册.txt', encoding='utf-8').load()
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000, chunk_overlap=200)
documents = text_splitter.split_documents(raw_documents)
embeddings_model = OllamaEmbeddings(
    model="nomic-embed-text:latest",
)
db = PGVector.from_documents(
    documents, embeddings_model, connection=connection)
retriever = db.as_retriever(search_kwargs={"k": 5})
​
# 1.定义用于生成假设文档的工作流
prompt_hyde = ChatPromptTemplate.from_template(
    """请写一段话来回答这个问题。\n 问题: {question} \n 段落:""")
​
generate_doc = (prompt_hyde | ChatOllama(model="deepseek-r1:32b", temperature=0) | StrOutputParser())
​
# 2.将假设文档作为检索器的输入
retrieval_chain = generate_doc | retriever
query = "妇女在工作中享有什么权益?"
​
prompt = ChatPromptTemplate.from_template(
    """根据以下内容回答问题:{context} 问题: {question} """
)
llm = ChatOllama(model="deepseek-r1:32b", temperature=0)
​
​
@chain
def qa(input):
    # 获取基于假设文档检索到的内容
    docs = retrieval_chain.invoke(input)
    # 格式化提示
    formatted = prompt.invoke({"context": docs, "question": input})
    # 生成答案
    answer = llm.invoke(formatted)
    # 返回的是一个AIMessage类型对象
    return answer
​
​
result = qa.invoke(query)
print(remove_message_think_tag(result).content)

重写查询可以采用多种形式,但通常采用类似的方式:获取用户的原始查询(您编写的提示),并要求大型语言模型编写一个或多个新查询。

检索路由

逻辑路由

在逻辑路由中,我们给大型语言模型提供各种数据源的知识,然后让大型语言模型根据用户的查询判断应用哪个数据源

from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from langchain_ollama import ChatOllama
from langchain_core.runnables import RunnableLambda
​
​
# 数据模型类
class RouteQuery(BaseModel):
    """将用户查询路由到最相关的数据源。"""
    datasource: Literal["python_docs", "js_docs"] = Field(
        ...,
        description="根据用户问题,选择最适合回答问题的数据源",
    )
​
​
# 提示模板
# 带函数调用的LLM
llm = ChatOllama(model="deepseek-r1:32b", temperature=0)
# 使用增强型大模型实例
structured_llm = llm.with_structured_output(RouteQuery)
​
# 提示
system = """你是路由用户问题到适当数据源的专家。根据问题所涉及的编程语言,将其路由到相关的数据源。"""
prompt = ChatPromptTemplate.from_messages(
    [("system", system), ("human", "{question}")]
)
​
# 定义路由器
router = prompt | structured_llm
question = """为什么以下代码不起作用: 
from langchain_core.prompts 
import ChatPromptTemplate 
prompt = ChatPromptTemplate.from_messages(["human", "speak in {language}"]) 
prompt.invoke("french") """
​
result = router.invoke({"question": question})
print("\n路由到: ", result)
# 路由到:  datasource='python_docs'
​
# 定义一个简单的路由选择函数 用于处理大模型的输出
# 简单进行了个字符匹配
def choose_route(result):
    if "python_docs" in result.datasource.lower():
        return "chain for python_docs"
    else:
        return "chain for js_docs"
​
# 新建了一条链 增加了对路由进行处理的方法
full_chain = router | RunnableLambda(choose_route)
result = full_chain.invoke({"question": question})
print("\n选择路由: ", result)
# 选择路由:  chain for python_docs

语义路由

语义路由的工作流程是:当用户提交查询后,系统首先通过嵌入模型将查询文本转换为高维向量表示,同时,各个可能的目标处理路径(如专家模板、知识库或处理器)也已被预先转换为相应的向量;接着,系统计算查询向量与各目标向量之间的余弦相似度或其他相似度指标,找出语义上最匹配的目标;然后,系统将查询自动转发给相似度最高的处理路径,由专门设计的处理器(如特定领域的LLM提示模板)进行响应生成;最后,将处理结果返回给用户,整个过程无需硬编码规则,而是基于语义理解实现智能化分流。

from langchain.utils.math import cosine_similarity
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import chain
from langchain_ollama import ChatOllama, OllamaEmbeddings
​
# 物理学模板
physics_template = """你是一位非常聪明的物理学教授。你擅长以简明易懂的方式回答物理学问题。
当你不知道问题的答案时,你会坦诚地说你不知道。这是一个问题:{query}"""
​
# 数学模板
math_template = """你是一位非常优秀的数学家。你擅长回答数学问题。
你之所以这么擅长,是因为你能够将复杂的问题分解为各个组成部分,回答这些组成部分,然后
再将它们组合起来回答更广泛的问题。这是一个问题:{query}"""
​
# 嵌入提示
embeddings = OllamaEmbeddings(
    model="nomic-embed-text:latest",
)
prompt_templates = [physics_template, math_template]
prompt_embeddings = embeddings.embed_documents(prompt_templates)
​
# 将问题路由至合适的模板
@chain
def prompt_router(query):
    query_embedding = embeddings.embed_query(query)
    # 添加相似度分数的打印,帮助调试
    similarity = cosine_similarity([query_embedding], prompt_embeddings)[0]
    print("物理模板相似度:", similarity[0])
    print("数学模板相似度:", similarity[1])
    most_similar = prompt_templates[similarity.argmax()]
    print("选择模板:", "数学模板" if most_similar == math_template else "物理学模板")
    return PromptTemplate.from_template(most_similar)
​
# 语义路由器
semantic_router = (prompt_router | ChatOllama(model="deepseek-r1:32b", temperature=0) | StrOutputParser())
​
# 调用并输出结果
result = semantic_router.invoke("什么是黑洞")
print("\n语义路由器结果: ", result)

查询构建

查询构建是大模型应用中的核心技术,指的是将用户原始输入转化为能够最大化模型性能的结构化提示的过程。

元数据过滤器

文本到元数据过滤器是一个将非结构化文本转换为结构化元数据的工具。它能够从原始文本中提取有意义的信息,并将其组织成可查询的结构化格式。

SelfQueryRetriever 是 LangChain 中的一个高级检索器组件,它能够将自然语言查询分解为用于相似性搜索的语义查询部分以及用元数据过滤条件。元数据过滤能够基于文档的元数据属性进行精确过滤,例如根据日期、作者、类别等筛选文档。

  1. 用户提供自然语言查询,例如:"找到2023年由张三撰写的关于人工智能的文章"

  2. SelfQueryRetriever 使用 LLM 将此查询解析为:

    • 语义查询:"人工智能"(用于相似性搜索)

    • 过滤条件:{year: 2023, author: "张三"}

  3. 系统先应用过滤条件筛选文档,然后在过滤后的结果中执行语义搜索

from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_postgres.vectorstores import PGVector
from langchain_core.documents import Document
from langchain_ollama import ChatOllama, OllamaEmbeddings
​
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
​
# 定义文档列表
docs = [
    Document(
        page_content="一群科学家带回了恐龙,引发了一场混乱",
        metadata={"year": 1993, "rating": 7.7, "genre": "科幻"},
    ),
    Document(
        page_content="莱昂纳多·迪卡普里奥在梦中迷失,梦中有梦,梦中有梦,梦中有...",
        metadata={"year": 2010, "director": "克里斯托弗·诺兰", "rating": 8.2},
    ),
    Document(
        page_content="一位心理学家/侦探在一系列的梦中迷失,梦中有梦,梦中有梦,《盗梦空间》借用了这个想法",
        metadata={"year": 2006, "director": "今敏", "rating": 8.6},
    ),
    Document(
        page_content="一群正常身材的女性非常温馨,一些男性对她们心生爱慕",
        metadata={"year": 2019, "director": "格蕾塔·葛韦格", "rating": 8.3},
    ),
    Document(
        page_content="玩具们活了过来,玩得很开心",
        metadata={"year": 1995, "genre": "动画"},
    ),
    Document(
        page_content="三个男人走进了禁区,三个男人走出了禁区",
        metadata={
            "year": 1979,
            "director": "安德烈·塔科夫斯基",
            "genre": "惊悚",
            "rating": 9.9,
        },
    ),
]
​
embeddings = OllamaEmbeddings(
    model="nomic-embed-text:latest",
)
​
vectorstore = PGVector.from_documents(
    docs, embedding=embeddings, connection=connection)
​
# 定义查询的字段
fields = [
    AttributeInfo(
        name="genre",
        description="电影的类型",
        type="string 或 list[string]",
    ),
    AttributeInfo(
        name="year",
        description="电影的上映年份",
        type="integer",
    ),
    AttributeInfo(
        name="director",
        description="电影导演的名字",
        type="string",
    ),
    AttributeInfo(
        name="rating",
        description="电影的评分,范围是1-10",
        type="float",
    ),
]
​
description = "电影的简要概述"
llm =  ChatOllama(model="deepseek-r1:32b", temperature=0)
# 参数: 语言模型实例、向量存储实例、文档集合描述、可查询字段定义
# 返回检索器
retriever = SelfQueryRetriever.from_llm(llm, vectorstore, description, fields)
​
# 此示例仅指定一个过滤器 电影的评分
print(retriever.invoke("我想看评分高于8.5的电影"))
​
# 此示例指定了多个过滤器 电影的评分以及电影的类型
print(retriever.invoke(
    "有没有评分高于8.5的科幻电影?"))

自然语言到SQL

# 基本的实现是可以的 但是目前主要的问题是deepseek的输出
import os
from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain
from langchain_community.tools import QuerySQLDatabaseTool
from langchain_ollama import ChatOllama
​
from Util.DeepseekUtil import remove_message_think_tag, remove_think
​
connection = "postgresql+psycopg://langchain:langchain@localhost:6024/langchain"
​
def setup_text_to_sql():
        db = SQLDatabase.from_uri(connection)
        llm = ChatOllama(model="deepseek-r1:32b", temperature=0)
        # 创建SQL查询链
        write_query = create_sql_query_chain(llm, db)
        # 创建SQL执行工具
        execute_query = QuerySQLDatabaseTool(db=db)
        # 组合链:写入查询 -> 执行查询
        combined_chain = write_query|remove_think| execute_query
        return combined_chain, db
​
# 使用自然查询数据库
def query_database(chain, question):
    result = chain.invoke({"question": question})
    return result
​
# 创建示例表格
def create_example_tables(db):
    try:
        # 执行SQL语句创建示例表
        db.run("CREATE TABLE IF NOT EXISTS employees ("
               "employee_id SERIAL PRIMARY KEY, "
               "first_name VARCHAR(50), "
               "last_name VARCHAR(50), "
               "email VARCHAR(100), "
               "hire_date DATE, "
               "department VARCHAR(50), "
               "salary NUMERIC(10, 2))")
​
        db.run("CREATE TABLE IF NOT EXISTS departments ("
               "department_id SERIAL PRIMARY KEY, "
               "department_name VARCHAR(50), "
               "location VARCHAR(100), "
               "manager_id INTEGER)")
​
        # 添加示例数据到employees表
        db.run("INSERT INTO employees (first_name, last_name, email, hire_date, department, salary) "
               "VALUES ('张', '三', 'zhang@example.com', '2023-01-15', '技术', 8500.00), "
               "('李', '四', 'li@example.com', '2023-02-20', '市场', 7800.00), "
               "('王', '五', 'wang@example.com', '2023-03-10', '财务', 9200.00), "
               "('赵', '六', 'zhao@example.com', '2024-01-05', '技术', 8200.00) "
               "ON CONFLICT (employee_id) DO NOTHING")
​
        # 添加示例数据到departments表
        db.run("INSERT INTO departments (department_name, location, manager_id) "
               "VALUES ('技术', '北京', 1), "
               "('市场', '上海', 2), "
               "('财务', '广州', 3), "
               "('人力资源', '深圳', NULL) "
               "ON CONFLICT (department_id) DO NOTHING")
​
        print("示例表和数据创建成功")
    except Exception as e:
        print(f"创建示例表和数据时出错: {str(e)}")
​
def main():
    chain, db = setup_text_to_sql()
    if db is not None:
        create_example_tables(db)
​
    result = query_database(chain, "公司里有所少人")
    print(f"\n查询结果:\n{result}")
​
​
if __name__ == "__main__":
    main()

第X部分 拓展

结构化输出

由于大模型的输出具有随机性,所以要对大模型的输出进行弹性处理。

  1. 可靠性:确保 LLM 输出遵循预定义的数据结构

  2. 简化处理:无需编写额外代码来解析 LLM 的文本输出

  3. 类型安全:得到的是 Python 对象,有类型提示和验证

  4. 集成便捷:输出可以直接用于下游处理逻辑,如路由决策

from typing import List, Literal
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
​
# 定义结构化输出模型
# 客户分析结果类
class FeedbackAnalysis(BaseModel):
    # 限定了只能输出三种类型
    sentiment: Literal["positive", "neutral", "negative"] = Field(
        description="反馈的整体情感倾向"
    )
    main_topics: List[str] = Field(
        description="反馈中提到的主要话题或产品功能",
        max_length=3
    )
    # 使用ge以及le限定输出的范围
    urgency_level: int = Field(
        description="问题的紧急程度,1-5分,5分最紧急",
        ge=1, le=5
    )
    action_needed: bool = Field(
        description="是否需要客服团队跟进"
    )
​
# 创建基础LLM
base_llm = ChatOllama(model="deepseek-r1:32b", temperature=0)
​
# 创建增强型LLM
structured_llm = base_llm.with_structured_output(FeedbackAnalysis)
​
# 定义提示模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是客户反馈分析专家。分析以下客户反馈:"),
    ("human", "{feedback}")
])
​
# 创建分析链
analysis_chain = prompt | structured_llm
​
# 示例客户反馈
feedback = """我昨天购买的新手机壳不合适,完全盖不上我的手机。这已经是第二次从你们网站买到不合适的配件了。真是太失望了,希望能尽快解决。"""
​
# 执行分析
result = analysis_chain.invoke({"feedback": feedback})
​
print("分析结果类型:", type(result))
print("分析结果内容:", result)
print("情感倾向:", result.sentiment)
print("需要行动?", result.action_needed)
​
# 分析结果类型: <class '__main__.FeedbackAnalysis'>
# 分析结果内容: sentiment='negative' main_topics=['product fit', 'customer dissatisfaction'] urgency_level=2 action_needed=False
# 情感倾向: negative
# 需要行动? False
​
normal_chain = prompt | base_llm
normal_result = normal_chain.invoke({"feedback": feedback})
# 效果对比
print(remove_message_think_tag(normal_result).content)
# 根据您提供的客户反馈,以下是分析结果:
# 
# ### 1. **产品适配性问题**
#    - 客户提到手机壳“完全盖不上”手机,这表明产品可能存在设计或尺寸上的问题。建议检查产品的描述、规格和适用设备列表是否准确无误,并确保提供清晰的图片或视频供客户参考。
# 
# ### 2. **客户体验与信任度下降**
#    - 这是客户第二次从网站购买到不合适的配件,显示出客户对网站的信任度已经受到严重影响。重复出现问题可能导致客户流失或负面口碑传播。
# 
# ### 3. **沟通与反馈机制**
#    - 客户表达了“失望”情绪,并希望尽快解决问题。建议建立更高效的客户沟通渠道和售后服务流程,确保客户问题能够及时得到关注和解决。
# 
# ### 4. **潜在改进措施**
#    - 提供更详细的产品信息和用户评价。
#    - 建立快速响应的售后服务团队。
#    - 考虑提供退款、换货或补偿优惠券等解决方案以弥补客户的不满情绪。
# 
# 希望以上分析对您有所帮助!如果需要进一步讨论,请随时告诉我。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐