引言:为什么检索后处理比模型更重要?

当我们谈论 RAG(检索增强生成)系统的优化时,我们大多数人的第一反应是:

  • "哎?要不换个更好的 embedding 模型?"
  • "或者试试不同的向量数据库?"
  • "调调 prompt 让它更精准?"

但实际上,RAG 最大的痛点不是"检索不到",而是"检索到了但没排对/没选对/没展示好"

想象一个场景:你用向量检索找到了 50 个相关文档片段,其中第 23 个包含了最准确的答案,但你只把前 5 个送给了 LLM。结果就是:明明答案在库里,模型却答不出来,或者给出了基于前 5 个噪声内容的错误答案。

核心观点:改善匹配 ≠ 完成检索。关键是把最有用的信息以最适合 LLM 消化的方式送给大模型

这就是我们今天所讨论的检索后处理(Post-Retrieval Processing)要解决的问题。

一、什么是 RAG 的检索后处理?

检索后处理是指在完成初步检索后、送入 LLM 之前,对检索结果进行的一系列优化操作。

在 RAG 流程中的位置:

用户查询
    ↓
【检索阶段】向量检索 / BM25 / 混合检索
    ↓
【检索后处理】← 我们在这里
    ├─ 重排(Rerank)
    └─ 压缩(Compression)
    └─ 其他
    ↓
【生成阶段】LLM 生成答案

所以,检索后处理不是为了增加召回,而是为了:

  1. 提升信息质量 - 去除噪声,保留关键内容
  2. 优化排序 - 让最相关的内容排在最前面
  3. 减少幻觉 - 给 LLM 更干净、更聚焦的上下文

这里介绍两种主要的检索后处理技术: 重排 上下文压缩

二、重排(Rerank)——让真正相关的内容排在最前面

首先,我们需要知道为什么需要重排?

这是因为原始向量检索的 Top-K 结果存在三大问题:

  1. 排序不稳定 - 余弦相似度 0.78 和 0.76 的差异可能并不代表真实相关性
  2. 易被噪声稀释 - 包含查询关键词但语义无关的文档可能排名靠前
  3. 单一检索源局限 - 仅用向量检索容易遗漏精确匹配的内容

举个栗子:用户问"Python 如何读取 CSV 文件?",向量检索返回的前 3 条是:

  1. "Python 数据处理库介绍"(泛泛而谈)
  2. "CSV 文件格式规范"(相关但不是答案)
  3. "使用 pandas.read_csv() 方法"(真正的答案!)

重排的目标:重新计算 Query 与 chunk 的相关性分数,调整排名。没有重排,LLM 可能基于前两条生成含糊的回答。

常见重排方式:

重排方案 核心逻辑 适用场景 优缺点
RRF (Reciprocal Rank Fusion) 基于多个召回列表的倒数排名融合:
score(d) = Σ 1/(k + rank_i)
k=60为默认平滑参数,忽略原始分数,只用排名位置
• 多路召回融合(BM25 + 向量)
• 无标注数据场景
• Elasticsearch/OpenSearch混合检索
: 简单、快速、无需训练、对分数尺度不敏感、对召回不一致鲁棒
: 不处理深层语义,仅基于排名位置
Cross-Encoder 交叉编码器(BERT-based)输入query-doc对,输出相关性分数。基于[CLS] token进行二分类或回归 • 二阶段检索精排(top-100→top-10)
• 高精度技术文档/指令匹配
• RAG中过滤噪声文档
: 语义捕捉强,精度高(nDCG@10提升10-20%)
: 计算密集(逐对forward),推荐GPU加速,不适合大批量实时场景
ColBERT Late Interaction架构:query和doc分别编码为token embeddings,通过MaxSim(Qi, Dj)计算相关性。支持离线索引 • 大规模检索(百万级文档预索引)
• token级细粒度匹配
• 速度和精度平衡的生产场景
: 可预计算、速度快(比Cross-Encoder快20-100x)、精度接近Cross-Encoder
: 索引占用空间大,需离线建索引
Cohere Rerank 商业跨编码器API(rerank-english-v3.0/multilingual-v3.0),支持100+语言、长上下文(4096 tokens)。Listwise评分 • 企业RAG/搜索(AWS Bedrock集成)
• 多语言场景
• 需要开箱即用的生产方案
: API易用、多语言优秀(100+种)、精度高(BEIR上领先5-10%)
: 按token计费(成本较高),仅API无私有部署,依赖网络
RankLLM LLM-based(GPT-4/Claude)通过prompt实现pointwise/pairwise/listwise重排。支持零样本或few-shot,可用滑动窗口处理大候选集 • 复杂推理查询(多意图/模糊需求)
• 低资源场景(零-shot rerank)
• 结合RAG的代理系统
: 灵活(prompt可调)、零-shot能力强(TREC DL上nDCG@10提升2-5%)
: 延迟高、token消耗大、不稳定(幻觉风险),适合小候选集(<50)
时效性重排 (Temporal Reranking) 在相关性分数上乘以时效因子:
score' = score × exp(-λ × age)
λ控制衰减速率。可结合季节性权重或分段函数
• 新闻/社交媒体搜索
• 动态内容推荐(视频/博客)
• 与其他方法混合使用
: 简单、提升新鲜度感知(MRR提升10-15%)
: 忽略语义质量,需调参,过度依赖时间可能损失相关性
1、RRF (Reciprocal Rank Fusion) - 基础融合

核心思想:不同检索方法各有优势,通过投票机制融合结果

示例代码如下:

from typing import List, Dict
from collections import defaultdict

def reciprocal_rank_fusion(
    results_list: List[List[Dict]], 
    k: int = 60
) -> List[Dict]:
    """
    RRF 重排算法
    results_list: 多个检索源的结果列表
    k: RRF 常数(通常取 60)
    """
    scores = defaultdict(float)
    doc_map = {}
    
    for results in results_list:
        for rank, doc in enumerate(results, start=1):
            doc_id = doc['id']
            # RRF 公式: 1 / (k + rank)
            scores[doc_id] += 1.0 / (k + rank)
            doc_map[doc_id] = doc
    
    # 按分数降序排序
    ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
    return [doc_map[doc_id] for doc_id in ranked_ids]

# 使用示例
vector_results = [
    {'id': 'doc1', 'text': 'pandas read_csv 教程', 'score': 0.85},
    {'id': 'doc2', 'text': 'Python 数据处理', 'score': 0.80},
]

bm25_results = [
    {'id': 'doc3', 'text': 'CSV 文件读取方法', 'score': 12.5},
    {'id': 'doc1', 'text': 'pandas read_csv 教程', 'score': 11.2},
]

reranked = reciprocal_rank_fusion([vector_results, bm25_results])
print(f"最相关文档: {reranked[0]['text']}")
2. Cross-Encoder - 精准成对打分

核心思想: 将 query 和 document 拼接后直接判断相关性

示例代码如下:

from sentence_transformers import CrossEncoder

def cross_encoder_rerank(
    query: str, 
    documents: List[str], 
    top_k: int = 5
) -> List[Dict]:
    """
    使用 Cross-Encoder 对文档重排
    """
    model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
    
    # 构建 query-document 对
    pairs = [[query, doc] for doc in documents]
    
    # 批量打分
    scores = model.predict(pairs)
    
    # 排序
    ranked_indices = sorted(
        range(len(scores)), 
        key=lambda i: scores[i], 
        reverse=True
    )[:top_k]
    
    return [
        {
            'text': documents[i], 
            'score': float(scores[i]),
            'rank': rank + 1
        } 
        for rank, i in enumerate(ranked_indices)
    ]

# 使用示例
query = "如何用 Python 读取 CSV"
docs = [
    "Python 是一种编程语言",
    "使用 pandas.read_csv() 可以读取 CSV 文件",
    "CSV 是逗号分隔值文件格式",
    "pandas 提供了强大的数据处理功能"
]

reranked = cross_encoder_rerank(query, docs, top_k=2)
for item in reranked:
    print(f"Rank {item['rank']}: {item['text']} (score: {item['score']:.3f})")
3. ColBERT - Late Interaction 架构

核心思想: Query 和 Document 分别编码成 token embeddings,通过 MaxSim 操作计算相关性

示例代码如下:

from colbert import Searcher
from colbert.infra import Run, RunConfig

# 初始化 ColBERT (需要先训练或下载模型)
def setup_colbert_index(documents: List[str], index_name: str):
    """
    构建 ColBERT 索引
    """
    with Run().context(RunConfig(nranks=1, experiment="myexp")):
        from colbert.data import Queries, Collection
        from colbert import Indexer
        
        # 准备文档集合
        collection = Collection(data=documents)
        
        # 建立索引
        indexer = Indexer(checkpoint="colbert-ir/colbertv2.0")
        indexer.index(
            name=index_name,
            collection=collection,
            overwrite=True
        )

def colbert_rerank(
    query: str,
    index_name: str,
    top_k: int = 5
) -> List[Dict]:
    """
    使用 ColBERT 重排
    """
    with Run().context(RunConfig(experiment="myexp")):
        searcher = Searcher(index=index_name)
        results = searcher.search(query, k=top_k)
        
        return [
            {
                'passage_id': pid,
                'rank': rank,
                'score': score
            }
            for rank, (pid, _, score) in enumerate(results, start=1)
        ]

# 使用示例
documents = [
    "Python 是一种高级编程语言",
    "使用 pandas.read_csv() 函数可以轻松读取 CSV 文件",
    "CSV 文件格式使用逗号分隔数据",
    "pandas 是 Python 最流行的数据分析库"
]

# 建立索引 (只需执行一次)
setup_colbert_index(documents, "my_docs")

# 查询
query = "Python 如何读取 CSV 文件"
results = colbert_rerank(query, "my_docs", top_k=3)
for res in results:
    print(f"Rank {res['rank']}: Document {res['passage_id']} (score: {res['score']:.3f})")
4. Cohere Rerank - 生产级商业方案

示例代码如下:

from langchain_cohere import CohereRerank
from langchain_core.documents import Document
from langchain_community.retrievers import BM25Retriever
from dotenv import load_dotenv
load_dotenv()

# 准备示例文档
documents = [
    Document(
        page_content="深度学习是一种通过多层神经网络自动学习数据特征的机器学习方法,常用于图像识别和自然语言处理。",
        metadata={"source": "AI教程·深度学习基础"}
    ),
    Document(
        page_content="强化学习是一种基于奖励机制的学习范式,智能体通过与环境交互不断优化决策策略。",
        metadata={"source": "AI教程·强化学习指南"}
    ),
    Document(
        page_content="知识图谱用于表示实体及其关系,常用于推荐系统、智能搜索与问答系统中。",
        metadata={"source": "AI教程·知识图谱入门"}
    ),
    Document(
        page_content="大语言模型通过使用海量文本数据训练,用于生成自然语言回答、代码补全、文本摘要等任务,例如 GPT 和 Qwen。",
        metadata={"source": "AI教程·大模型技术"}
    )
]

# 创建 BM25 检索器
retriever = BM25Retriever.from_documents(documents)
retriever.k = 4  # 返回前 4 个检索结果

# 设置 Cohere 重排序器
reranker = CohereRerank(model="rerank-multilingual-v2.0")

# 查询
query = "人工智能中用于处理自然语言的核心技术有哪些?"

# 先获取初始检索结果
initial_docs = retriever.invoke(query)

# 使用重排序器对结果进行重排
reranked_docs = reranker.compress_documents(documents=initial_docs, query=query)

# 打印重排结果
print(f"查询:{query}\n")
print("重排序后的结果:")
for i, doc in enumerate(reranked_docs, 1):
    print(f"{i}. {doc.page_content}")
5. RankLLM - LLM 驱动的重排

核心思想: 利用大语言模型的理解能力进行排序

示例代码如下:

from typing import List, Dict
from openai import OpenAI

def rank_with_llm(
    query: str,
    documents: List[str],
    top_k: int = 5
) -> List[Dict]:
    """
    使用 OpenAI 大模型对文档排序
    """
    client = OpenAI()  # 会自动读取环境变量 OPENAI_API_KEY
    
    # 构建文档列表
    doc_list = "\n".join([
        f"[{i+1}] {doc}"
        for i, doc in enumerate(documents)
    ])
    
    prompt = f"""你是一个文档相关性排序专家。给定用户查询和候选文档列表,请按相关性从高到低排序。

用户查询: {query}

候选文档:
{doc_list}

要求:
1. 只返回文档编号,用逗号分隔
2. 从最相关到最不相关
3. 只返回前 {top_k} 个
4. 不要返回其他说明或内容

排序结果示例(格式: 2,5,3...):"""

    resp = client.chat.completions.create(
        model="gpt-4o-mini",  # 可换成 gpt-4.1 / gpt-4o / o3-mini 等
        messages=[{"role": "user", "content": prompt}],
        max_tokens=50
    )

    ranking_str = resp.choices[0].message.content.strip()
    ranked_indices = [int(x.strip()) - 1 for x in ranking_str.split(',')]

    return [
        {
            'text': documents[idx],
            'rank': rank + 1,
            'original_index': idx
        }
        for rank, idx in enumerate(ranked_indices[:top_k])
    ]


# 使用示例
if __name__ == "__main__":
    query = "深度学习在自然语言处理中的应用"
    docs = [
        "深度学习是机器学习的一个子集",
        "BERT 和 GPT 是基于 Transformer 的自然语言处理模型",
        "Python 是数据科学的主要编程语言",
        "注意力机制极大地提升了 NLP 任务的性能",
        "卷积神经网络主要用于计算机视觉"
    ]

    ranked = rank_with_llm(query, docs, top_k=3)
    for item in ranked:
        print(f"{item['rank']}. {item['text']}")
6.时效性重排 (Temporal Reranking)

核心思想:指数衰减

示例代码如下:

import math
from datetime import datetime, timedelta
from typing import List, Dict

def temporal_rerank_exponential(
    documents: List[Dict],
    query_score_key: str = 'relevance_score',
    lambda_decay: float = 0.01,
    current_time: datetime = None
) -> List[Dict]:
    """
    使用指数衰减对文档进行时效性重排
    
    参数:
        documents: 包含相关性分数和时间戳的文档列表
        query_score_key: 原始相关性分数的字段名
        lambda_decay: 衰减率,越大则时间影响越大
        current_time: 当前时间,默认为now()
    
    公式: final_score = relevance_score × exp(-λ × age_in_days)
    """
    if current_time is None:
        current_time = datetime.now()
    
    for doc in documents:
        # 计算文档年龄(天数)
        doc_time = doc.get('timestamp', current_time)
        age_days = (current_time - doc_time).total_seconds() / 86400
        
        # 计算时效因子
        time_factor = math.exp(-lambda_decay * age_days)
        
        # 计算最终分数
        original_score = doc.get(query_score_key, 0)
        doc['temporal_factor'] = time_factor
        doc['final_score'] = original_score * time_factor
    
    # 按最终分数排序
    return sorted(documents, key=lambda x: x['final_score'], reverse=True)

# 使用示例
documents = [
    {
        'id': 'doc1',
        'text': 'Python 3.12 新特性发布',
        'relevance_score': 0.95,
        'timestamp': datetime.now() - timedelta(days=1)  # 1天前
    },
    {
        'id': 'doc2',
        'text': 'Python 编程基础教程',
        'relevance_score': 0.90,
        'timestamp': datetime.now() - timedelta(days=365)  # 1年前
    },
    {
        'id': 'doc3',
        'text': 'Python 3.11 性能提升分析',
        'relevance_score': 0.88,
        'timestamp': datetime.now() - timedelta(days=180)  # 半年前
    }
]

# 使用较小的lambda(0.01),时间影响温和
reranked = temporal_rerank_exponential(documents, lambda_decay=0.01)

print("重排结果 (lambda=0.01):")
for doc in reranked:
    print(f"{doc['id']}: 原始分数={doc['relevance_score']:.3f}, "
          f"时效因子={doc['temporal_factor']:.3f}, "
          f"最终分数={doc['final_score']:.3f}")
```

**输出示例**:
```
重排结果 (lambda=0.01):
doc1: 原始分数=0.950, 时效因子=0.990, 最终分数=0.941
doc3: 原始分数=0.880, 时效因子=0.835, 最终分数=0.735
doc2: 原始分数=0.900, 时效因子=0.694, 最终分数=0.625

三、上下文压缩(Compression)——让模型只看到重点

同样的,我需要思考为什么需要压缩?

即使经过重排,我们仍然面临三个问题:

  1. LLM 对噪声敏感 - 一段 500 字的文档中,可能只有 50 字是关键信息
  2. 长文档稀释关键词 - "迷失在中间"(Lost in the Middle)效应:LLM 容易忽略长上下文中段的信息
  3. 过多上下文增加幻觉 - 信息越多,LLM 越容易"编造"不存在的关联

举个栗子:

  • 错误示例:检索结果 800 字 → 喂给模型模型无法判断优先级 → 回答混乱甚至幻觉
  • 正确方式:检索结果 800 字→ 压缩成与 Query 高度相关的200~400字 → 送入模型生成回答

研究数据:

  • 上下文长度从 2K 增加到 8K tokens,答案准确率可能下降 15-30%
  • 噪声比例超过 60% 时,幻觉率显著上升
  • 关键信息在文档中间位置时,被忽略的概率最高

注意:压缩不是摘要,而是“去噪保真”

常见压缩方式:

方法

核心逻辑 适用场景 优缺点

Contextual-CompressionRetriever

包装基础检索器,使用 LLM 或嵌入模型(如 LLMChainExtractor)根据查询上下文过滤/压缩文档,支持管道式处理(文本分割 + 相似度过滤) • RAG 后检索精炼(top-100 → top-10)
• 长文档场景(PDF/网页去噪)
• LangChain 生态集成
• 需要高精度但预算有限的查询

优:上下文感知强,精准度高、压缩比 2-3x,准确性提升 10-20%、与 LangChain 生态无缝集成
劣:依赖 LLM 调用,延迟增加 0.5-1s、 需要调整阈值参数

LLMLingua

使用小模型(GPT-2/LLaMA)计算 perplexity/entropy 移除低信息 token;LLMLingua-2 用 GPT-4 数据训练 BERT 分类器实现任务无关压缩 • 超长上下文场景(40k+ tokens)
• 需要极高压缩比的应用(如 CoT/ICL)
• 多任务场景(总结、QA、代码生成)
• API 成本敏感的生产环境

优:缩比极高(4-20x),成本降低 1.6-2.9x、任务无关,泛化能力强、保持自然性和可读性
劣:可能引入少量幻觉、需要部署小模型,有一定技术门槛

Sentence-EmbeddingOptimizer

用句子级嵌入相似度(BGE 模型)比较查询与句子,保留高相似度片段(如 percentile=0.5),常作为 LlamaIndex postprocessor • 句子/短 chunk 检索(知识库 QA)
• 嵌入驱动 RAG 精炼
• LlamaIndex 生态
• 关注 token 效率的实时查询

优:速度极快(~7 tokens/查询)、Token 降低约 50%,时间节省 20-30%、计算开销低,适合实时场景

劣:仅句子级,忽略深层语义、对长文档效果较弱,需结合分割器

1. Contextual Compression - 基于上下文的动态过滤

核心思想: 结合检索和压缩,只保留与查询直接相关的片段

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import ChatOpenAI
from langchain.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.schema import Document
from typing import List
import os

class ContextualCompressor:
    """
    使用 LangChain 的 ContextualCompressionRetriever
    """
    def __init__(self, openai_api_key: str):
        # 初始化 OpenAI LLM
        self.llm = ChatOpenAI(
            model="gpt-4o-mini",  # 使用更经济的模型
            temperature=0,
            openai_api_key=openai_api_key
        )
        
        # 初始化压缩器
        self.compressor = LLMChainExtractor.from_llm(self.llm)
        
        # 初始化 embeddings
        self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
    
    def build_retriever(self, documents: List[str]):
        """
        构建基础检索器
        """
        # 转换为 Document 对象
        docs = [Document(page_content=doc) for doc in documents]
        
        # 创建向量存储
        vectorstore = FAISS.from_documents(docs, self.embeddings)
        
        # 创建基础检索器
        base_retriever = vectorstore.as_retriever(
            search_kwargs={"k": 10}  # 先检索 10 个文档
        )
        
        # 创建压缩检索器
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=self.compressor,
            base_retriever=base_retriever
        )
        
        return compression_retriever
    
    def compress_and_retrieve(
        self, 
        query: str, 
        documents: List[str]
    ) -> List[dict]:
        """
        检索并压缩文档
        """
        # 构建检索器
        retriever = self.build_retriever(documents)
        
        # 执行压缩检索
        compressed_docs = retriever.get_relevant_documents(query)
        
        # 格式化结果
        results = []
        for idx, doc in enumerate(compressed_docs, 1):
            results.append({
                'rank': idx,
                'content': doc.page_content,
                'length': len(doc.page_content)
            })
        
        return results

# 使用示例
def test_contextual_compression():
    # 设置 API Key
    api_key = os.getenv("OPENAI_API_KEY")
    
    # 准备文档
    documents = [
        """
        Python 是一种高级编程语言,由 Guido van Rossum 于 1991 年创建。
        它以简洁的语法和强大的功能而闻名,广泛应用于 Web 开发、数据分析、
        人工智能、科学计算等多个领域。Python 的设计哲学强调代码的可读性。
        """,
        """
        pandas 是 Python 中最流行的数据分析库之一。它提供了 DataFrame 数据结构,
        使得数据处理变得简单高效。要读取 CSV 文件,可以使用 pd.read_csv() 函数。
        这个函数支持多种参数,如 sep(分隔符)、header(表头)、encoding(编码)等。
        """,
        """
        处理大型 CSV 文件时,需要注意内存管理。可以使用 chunksize 参数分块读取,
        例如: df = pd.read_csv('large_file.csv', chunksize=10000)。
        这样可以逐块处理数据,避免内存溢出。另外,使用 usecols 参数可以只读取需要的列。
        """,
        """
        CSV 是 Comma-Separated Values 的缩写,是一种常见的数据交换格式。
        它使用逗号作为字段分隔符,每行代表一条记录。CSV 文件可以用文本编辑器打开,
        也可以被 Excel、Google Sheets 等电子表格软件识别。
        """,
        """
        Python 的标准库也包含 csv 模块,可以用来读写 CSV 文件。
        相比 pandas,csv 模块更底层,但提供了更精细的控制。
        适合处理特殊格式的 CSV 文件或需要逐行处理的场景。
        """
    ]
    
    # 创建压缩器
    compressor = ContextualCompressor(api_key)
    
    # 查询
    query = "如何用 Python 读取大型 CSV 文件"
    
    print(f"查询: {query}\n")
    print("=" * 80)
    
    # 执行压缩检索
    results = compressor.compress_and_retrieve(query, documents)
    
    # 输出结果
    original_length = sum(len(doc) for doc in documents)
    compressed_length = sum(r['length'] for r in results)
    
    print(f"\n压缩前总长度: {original_length} 字符")
    print(f"压缩后总长度: {compressed_length} 字符")
    print(f"压缩率: {compressed_length/original_length*100:.1f}%\n")
    
    for result in results:
        print(f"\n--- 文档 {result['rank']} (长度: {result['length']}) ---")
        print(result['content'])
        print("-" * 80)

# test_contextual_compression()
2. LLMLingua - LLM 驱动的智能压缩
from openai import OpenAI
import re
from typing import List, Dict

class LLMLinguaCompressor:
    """
    使用 OpenAI API 实现类似 LLMLingua 的压缩逻辑
    通过让 GPT 识别并删除低信息量的 token/句子
    """
    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)
    
    def compress(
        self,
        query: str,
        context: str,
        compression_rate: float = 0.5,
        preserve_question_related: bool = True
    ) -> Dict:
        """
        使用 GPT 进行智能压缩
        
        参数:
            query: 用户查询
            context: 需要压缩的上下文
            compression_rate: 目标压缩率 (0.5 表示压缩到 50%)
            preserve_question_related: 是否优先保留与问题相关的内容
        """
        target_length = int(len(context) * compression_rate)
        
        # 构建压缩提示
        system_prompt = """你是一个文本压缩专家。你的任务是压缩文本,同时保留最重要的信息。

压缩原则:
1. 删除冗余和重复的内容
2. 删除过于宽泛的背景信息
3. 保留与用户问题直接相关的核心事实
4. 保留关键的数字、名称、术语
5. 使用更简洁的表达,但不改变原意
6. 删除修饰性词语和不必要的细节"""

        user_prompt = f"""请压缩以下文本到大约 {target_length} 字符。

用户问题: {query}

原始文本:
{context}

要求:
- 目标长度: 约 {target_length} 字符
- 保留与问题最相关的信息
- 删除冗余和无关内容
- 保持原文的准确性

压缩后的文本:"""

        # 调用 OpenAI API
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            temperature=0.3,
            max_tokens=2000
        )
        
        compressed_text = response.choices[0].message.content.strip()
        
        # 计算统计信息
        original_tokens = len(context.split())
        compressed_tokens = len(compressed_text.split())
        
        return {
            'compressed_text': compressed_text,
            'original_length': len(context),
            'compressed_length': len(compressed_text),
            'original_tokens': original_tokens,
            'compressed_tokens': compressed_tokens,
            'compression_ratio': len(compressed_text) / len(context),
            'token_reduction': f"{(1 - compressed_tokens/original_tokens) * 100:.1f}%"
        }
    
    def batch_compress(
        self,
        query: str,
        documents: List[str],
        compression_rate: float = 0.5
    ) -> List[Dict]:
        """
        批量压缩多个文档
        """
        results = []
        for idx, doc in enumerate(documents, 1):
            print(f"压缩文档 {idx}/{len(documents)}...")
            result = self.compress(query, doc, compression_rate)
            result['doc_index'] = idx
            results.append(result)
        
        return results

# 使用示例
def test_llmlingua():
    api_key = os.getenv("OPENAI_API_KEY")
    compressor = LLMLinguaCompressor(api_key)
    
    # 长文本示例
    context = """
    Python 是一种广泛使用的高级编程语言,最初由 Guido van Rossum 在 1991 年设计并发布。
    Python 的设计哲学强调代码的可读性和简洁的语法,特别是使用空格缩进来划分代码块。
    Python 支持多种编程范式,包括面向对象、命令式、函数式和过程式编程。
    
    在数据处理领域,pandas 是最受欢迎的 Python 库之一。pandas 提供了高性能、
    易于使用的数据结构和数据分析工具。其中最重要的数据结构是 DataFrame,
    它是一个二维的、大小可变的、潜在异构的表格数据结构。
    
    要读取 CSV 文件,pandas 提供了 read_csv() 函数。这个函数非常强大,
    支持众多参数来处理各种格式的 CSV 文件。基本用法很简单: 
    df = pd.read_csv('filename.csv')。
    
    对于大型 CSV 文件,有几种优化策略。首先,可以使用 chunksize 参数将文件
    分成多个小块逐一处理,这样可以避免将整个文件加载到内存中导致内存溢出。
    例如: for chunk in pd.read_csv('large_file.csv', chunksize=10000): process(chunk)。
    
    其次,可以使用 usecols 参数只读取需要的列,而不是读取所有列。
    这可以显著减少内存使用。还可以指定 dtype 参数来优化数据类型,
    例如将整数列指定为 int32 而不是默认的 int64,可以节省一半的内存。
    """
    
    query = "如何用 Python 处理大型 CSV 文件"
    
    print(f"查询: {query}\n")
    print("=" * 80)
    
    # 压缩到 40%
    result = compressor.compress(query, context, compression_rate=0.4)
    
    print(f"\n原始长度: {result['original_length']} 字符 ({result['original_tokens']} tokens)")
    print(f"压缩后长度: {result['compressed_length']} 字符 ({result['compressed_tokens']} tokens)")
    print(f"压缩率: {result['compression_ratio']*100:.1f}%")
    print(f"Token 减少: {result['token_reduction']}")
    
    print(f"\n压缩后的文本:")
    print("-" * 80)
    print(result['compressed_text'])
    print("-" * 80)

# test_llmlingua()
3. SentenceEmbeddingOptimizer - LlamaIndex
from openai import OpenAI
import numpy as np
from typing import List, Dict
import re

class SentenceEmbeddingOptimizer:
    """
    使用 OpenAI Embeddings 进行句子级别的相似度过滤
    """
    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)
        self.embedding_model = "text-embedding-3-small"  # 更经济的模型
    
    def get_embedding(self, text: str) -> List[float]:
        """
        获取文本的 embedding
        """
        response = self.client.embeddings.create(
            model=self.embedding_model,
            input=text
        )
        return response.data[0].embedding
    
    def split_sentences(self, text: str) -> List[str]:
        """
        分句 (支持中英文)
        """
        # 使用正则表达式分句
        sentences = re.split(r'[。.!?;!?;]\s*', text)
        sentences = [s.strip() for s in sentences if len(s.strip()) > 10]
        return sentences
    
    def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """
        计算余弦相似度
        """
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    def compress(
        self,
        query: str,
        document: str,
        threshold: float = 0.5,
        max_sentences: int = 5,
        percentile: float = None
    ) -> Dict:
        """
        使用句子 embedding 相似度进行压缩
        
        参数:
            query: 用户查询
            document: 需要压缩的文档
            threshold: 相似度阈值 (0-1)
            max_sentences: 最多保留句子数
            percentile: 百分位数 (如 0.5 表示保留前 50% 的句子)
        """
        # 分句
        sentences = self.split_sentences(document)
        
        if len(sentences) == 0:
            return {
                'compressed_text': document,
                'selected_sentences': 0,
                'total_sentences': 0,
                'compression_ratio': 1.0
            }
        
        print(f"文档包含 {len(sentences)} 个句子")
        
        # 获取 query embedding
        print("计算查询 embedding...")
        query_embedding = self.get_embedding(query)
        
        # 批量获取句子 embeddings
        print("计算句子 embeddings...")
        sentence_embeddings = []
        for idx, sent in enumerate(sentences, 1):
            print(f"  处理句子 {idx}/{len(sentences)}", end='\r')
            emb = self.get_embedding(sent)
            sentence_embeddings.append(emb)
        print()  # 换行
        
        # 计算相似度
        similarities = []
        for emb in sentence_embeddings:
            sim = self.cosine_similarity(query_embedding, emb)
            similarities.append(sim)
        
        # 选择句子
        if percentile is not None:
            # 使用百分位数
            threshold = np.percentile(similarities, percentile * 100)
            print(f"使用百分位数 {percentile*100}%,阈值为 {threshold:.3f}")
        
        selected = []
        for i, (sent, sim) in enumerate(zip(sentences, similarities)):
            if sim > threshold:
                selected.append({
                    'index': i,
                    'text': sent,
                    'similarity': sim
                })
        
        # 按相似度排序,取 top_k
        selected.sort(key=lambda x: x['similarity'], reverse=True)
        selected = selected[:max_sentences]
        
        # 恢复原文顺序
        selected.sort(key=lambda x: x['index'])
        
        # 拼接文本
        compressed_text = '。'.join([s['text'] for s in selected])
        if compressed_text and not compressed_text.endswith(('。', '.', '!', '?')):
            compressed_text += '。'
        
        return {
            'compressed_text': compressed_text,
            'selected_sentences': len(selected),
            'total_sentences': len(sentences),
            'compression_ratio': len(compressed_text) / len(document),
            'avg_similarity': np.mean([s['similarity'] for s in selected]) if selected else 0,
            'sentence_details': selected
        }
    
    def compress_multiple_docs(
        self,
        query: str,
        documents: List[str],
        threshold: float = 0.5,
        max_sentences_per_doc: int = 3
    ) -> List[Dict]:
        """
        压缩多个文档
        """
        results = []
        for idx, doc in enumerate(documents, 1):
            print(f"\n压缩文档 {idx}/{len(documents)}")
            print("=" * 60)
            result = self.compress(
                query, doc, 
                threshold=threshold, 
                max_sentences=max_sentences_per_doc
            )
            result['doc_index'] = idx
            results.append(result)
        
        return results

# 使用示例
def test_sentence_embedding_optimizer():
    api_key = os.getenv("OPENAI_API_KEY")
    optimizer = SentenceEmbeddingOptimizer(api_key)
    
    document = """
    Python 是一种解释型、面向对象、动态数据类型的高级程序设计语言。
    它由 Guido van Rossum 于 1989 年底发明,第一个公开发行版发行于 1991 年。
    Python 语法简洁清晰,特色之一是强制用空白符作为语句缩进。
    pandas 是基于 NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
    pandas 纳入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的工具。
    要读取 CSV 文件,最简单的方法是使用 pandas 的 read_csv() 函数。
    这个函数会返回一个 DataFrame 对象,包含了 CSV 文件中的所有数据。
    对于大型 CSV 文件,可以使用 chunksize 参数来分块读取数据。
    例如: df_chunk = pd.read_csv('large_file.csv', chunksize=10000)。
    这样可以避免一次性将整个文件加载到内存中,防止内存溢出。
    另外,使用 usecols 参数可以只读取指定的列,进一步减少内存使用。
    """
    
    query = "如何用 pandas 读取大型 CSV"
    
    print(f"查询: {query}\n")
    print("=" * 80)
    
    # 方法 1: 使用固定阈值
    print("\n方法 1: 使用固定阈值 0.5")
    print("-" * 80)
    result1 = optimizer.compress(
        query, document, 
        threshold=0.5, 
        max_sentences=4
    )
    
    print(f"\n选中句子: {result1['selected_sentences']}/{result1['total_sentences']}")
    print(f"压缩率: {result1['compression_ratio']*100:.1f}%")
    print(f"平均相似度: {result1['avg_similarity']:.3f}")
    print(f"\n压缩后文本:")
    print(result1['compressed_text'])
    
    # 方法 2: 使用百分位数
    print("\n\n方法 2: 使用百分位数 (保留前 50%)")
    print("-" * 80)
    result2 = optimizer.compress(
        query, document, 
        percentile=0.5, 
        max_sentences=5
    )
    
    print(f"\n选中句子: {result2['selected_sentences']}/{result2['total_sentences']}")
    print(f"压缩率: {result2['compression_ratio']*100:.1f}%")
    print(f"\n压缩后文本:")
    print(result2['compressed_text'])
    
    # 显示详细的句子相似度
    print("\n\n句子相似度详情:")
    print("-" * 80)
    for detail in result2['sentence_details']:
        print(f"相似度 {detail['similarity']:.3f}: {detail['text'][:60]}...")

# test_sentence_embedding_optimizer()

四、结语

RAG 的效果提升,不来自换更大的模型、调更多 prompt,而来自优化检索管线

重排解决:信息“排不对”
压缩解决:信息“太嘈杂”

最终目标不是“找到很多内容”,
而是——

让 LLM 看到决定答案的那段内容

ps:感谢观看,如有不对请各位指正,代码只是举例,按照自己实际需求写就行

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐