RAG(检索增强生成)的文档处理流程详解
RAG(检索增强生成)的文档处理流程详解

文章目录
一、RAG技术概述
检索增强生成(Retrieval-Augmented Generation,简称RAG)是近年来自然语言处理领域的一项重要技术,它将信息检索与文本生成相结合,有效解决了传统生成模型容易产生"幻觉"(hallucination)的问题。RAG的核心思想是在生成回答时,先从外部知识库中检索相关文档片段,然后将这些片段与问题一起输入生成模型,从而产生基于事实的准确回答。
RAG系统通常由三个主要组件构成:
- 检索器(Retriever):负责从大规模文档集合中查找与输入问题相关的文档片段
- 生成器(Generator):基于检索到的文档和原始问题生成最终回答
- 文档处理流水线:将原始文档转换为便于检索的格式
本文将重点解析RAG系统中的文档处理流程,这是构建高效RAG系统的关键基础。
二、RAG文档处理整体流程
一个完整的RAG文档处理流程通常包含以下几个关键步骤:
- 文档加载:从各种来源获取原始文档
- 文档预处理:清洗和规范化文本
- 文档分块:将大文档分割为适当大小的片段
- 向量化处理:将文本块转换为向量表示
- 索引构建:创建高效的检索索引
- 存储:将处理后的数据持久化保存
下面我们通过流程图来直观展示这一过程:
三、文档加载
文档加载是RAG流程的第一步,需要从各种来源获取原始文档数据。文档可能存在于多种格式和位置:
- 本地文件系统(PDF、Word、TXT等)
- 远程URL(网页抓取)
- 数据库(SQL、NoSQL)
- 云存储(S3、Google Drive等)
- 专业数据库(PubMed、arXiv等)
代码示例:使用LangChain加载多种格式文档
from langchain.document_loaders import (
PyPDFLoader,
Docx2txtLoader,
TextLoader,
WebBaseLoader
)
# 加载PDF文档
pdf_loader = PyPDFLoader("example.pdf")
pdf_pages = pdf_loader.load()
# 加载Word文档
docx_loader = Docx2txtLoader("example.docx")
docx_pages = docx_loader.load()
# 加载纯文本文件
txt_loader = TextLoader("example.txt")
txt_pages = txt_loader.load()
# 加载网页内容
web_loader = WebBaseLoader(["https://example.com"])
web_pages = web_loader.load()
四、文档预处理
加载原始文档后,需要进行预处理以提高后续处理的质量。预处理步骤通常包括:
-
文本清洗:
- 去除特殊字符、乱码
- 标准化标点符号
- 处理HTML/XML标签(如果是网页内容)
- 修正编码问题
-
文本规范化:
- 大小写统一
- 拼写检查与纠正
- 缩写扩展
- 数字规范化(如"100万"转换为"1000000")
-
语言特定处理:
- 分词(对中文等非空格分隔语言尤为重要)
- 词干提取/词形还原(英文)
- 停用词移除
代码示例:文档预处理实现
import re
import unicodedata
from typing import List
def clean_text(text: str) -> str:
"""基础文本清洗函数"""
# 标准化Unicode字符
text = unicodedata.normalize("NFKC", text)
# 移除特殊字符和多余空白
text = re.sub(r"[^\w\s.,!?;:()\-]", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def normalize_text(text: str) -> str:
"""文本规范化"""
# 统一为小写(英文场景)
text = text.lower()
# 处理缩写
abbreviations = {
"can't": "cannot",
"won't": "will not",
"i'm": "i am",
# 可添加更多缩写映射
}
for abbr, full in abbreviations.items():
text = text.replace(abbr, full)
return text
def preprocess_documents(docs: List[str]) -> List[str]:
"""完整的文档预处理流程"""
processed = []
for doc in docs:
cleaned = clean_text(doc)
normalized = normalize_text(cleaned)
processed.append(normalized)
return processed
五、文档分块(Chunking)
文档分块是将大文档分割为较小片段的过程,这对RAG系统至关重要,原因包括:
- 上下文窗口限制:生成模型有最大token限制
- 检索精度:小块文档能更精确匹配查询
- 计算效率:小块向量更容易处理和比较
常见的分块策略包括:
- 固定大小分块:简单但可能切断语义连贯性
- 滑动窗口分块:重叠分块保留上下文
- 基于语义分块:利用文本结构(段落、标题等)
- 递归分块:混合不同粒度级别
代码示例:高级文档分块实现
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter
)
# 基本分块方法
def basic_chunking(text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]:
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
length_function=len,
separators=["\n\n", "\n", "。", "!", "?", ";", ",", " "]
)
return splitter.split_text(text)
# 针对Markdown文档的智能分块
def markdown_chunking(md_text: str) -> List[str]:
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
return splitter.split_text(md_text)
# 中文文本的特殊处理
def chinese_text_chunking(text: str) -> List[str]:
# 使用句号、问号、感叹号等中文标点作为分隔符
splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", "。", "!", "?", ";", ",", "、", " "],
chunk_size=300,
chunk_overlap=30
)
return splitter.split_text(text)
六、文本向量化
文本向量化是将文本转换为数值向量(嵌入)的过程,使计算机能够理解和比较文本语义。常见的嵌入模型包括:
-
基于Transformer的模型:
- OpenAI的text-embedding-ada-002
- Google的Universal Sentence Encoder
- HuggingFace的sentence-transformers
-
传统方法:
- TF-IDF
- Word2Vec
- GloVe
代码示例:使用多种嵌入模型
from sentence_transformers import SentenceTransformer
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
# 使用OpenAI的嵌入模型
def get_openai_embeddings(texts: List[str]) -> List[List[float]]:
embedder = OpenAIEmbeddings(model="text-embedding-ada-002")
return embedder.embed_documents(texts)
# 使用HuggingFace的句子转换器
def get_hf_embeddings(texts: List[str]) -> List[List[float]]:
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
return model.encode(texts).tolist()
# 本地运行的轻量级嵌入模型
def get_local_embeddings(texts: List[str]) -> List[List[float]]:
embedder = HuggingFaceEmbeddings(
model_name="GanymedeNil/text2vec-large-chinese"
)
return embedder.embed_documents(texts)
七、索引构建与向量存储
为了高效检索相关文档,我们需要将向量化的文本存储在专门的向量数据库中,并构建适当的索引。常见的向量数据库包括:
- Pinecone:全托管的向量数据库
- Weaviate:开源的向量搜索引擎
- FAISS:Facebook的高效相似性搜索库
- Chroma:轻量级嵌入式向量数据库
- Milvus:高性能开源向量数据库
代码示例:构建和查询向量索引
import faiss
import numpy as np
from typing import List, Tuple
class VectorIndex:
def __init__(self, dimension: int = 768):
self.dimension = dimension
self.index = faiss.IndexFlatIP(dimension) # 使用内积作为相似度度量
self.documents = []
def add_documents(self, embeddings: List[List[float]], documents: List[str]):
"""添加文档到索引"""
if len(documents) != len(embeddings):
raise ValueError("文档数量和嵌入数量不匹配")
# 转换为numpy数组
emb_array = np.array(embeddings).astype('float32')
self.index.add(emb_array)
self.documents.extend(documents)
def search(self, query_embedding: List[float], k: int = 5) -> List[Tuple[str, float]]:
"""检索最相似的k个文档"""
query_array = np.array([query_embedding]).astype('float32')
distances, indices = self.index.search(query_array, k)
results = []
for i, idx in enumerate(indices[0]):
if idx >= 0: # FAISS可能返回-1表示无效索引
doc = self.documents[idx]
score = distances[0][i]
results.append((doc, float(score)))
return results
# 使用示例
if __name__ == "__main__":
# 假设我们已经有一些文档和它们的嵌入
docs = ["文档1内容", "文档2内容", "文档3内容", "..."]
embeddings = [[0.1, 0.2, ...], [0.3, 0.4, ...], ...] # 实际应用中这些是真实的嵌入
# 创建索引并添加文档
index = VectorIndex(dimension=len(embeddings[0]))
index.add_documents(embeddings, docs)
# 查询
query = "示例查询"
query_embedding = [0.15, 0.25, ...] # 实际应用中需要先用嵌入模型转换查询
results = index.search(query_embedding, k=3)
for doc, score in results:
print(f"相似度: {score:.4f}, 文档: {doc[:50]}...")
八、完整RAG文档处理流水线实现
下面我们整合上述所有步骤,实现一个完整的RAG文档处理流水线:
from typing import List, Dict, Any
import hashlib
import json
from pathlib import Path
class RAGDocumentProcessor:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.embedding_model = self._init_embedding_model()
self.text_splitter = self._init_text_splitter()
self.vector_db = self._init_vector_db()
# 创建缓存目录
self.cache_dir = Path(config.get("cache_dir", ".rag_cache"))
self.cache_dir.mkdir(exist_ok=True)
def _init_embedding_model(self):
"""初始化嵌入模型"""
model_type = self.config["embedding"]["model_type"]
if model_type == "openai":
return OpenAIEmbeddings(model="text-embedding-ada-002")
elif model_type == "huggingface":
return HuggingFaceEmbeddings(
model_name="GanymedeNil/text2vec-large-chinese"
)
else:
raise ValueError(f"不支持的嵌入模型类型: {model_type}")
def _init_text_splitter(self):
"""初始化文本分割器"""
return RecursiveCharacterTextSplitter(
chunk_size=self.config["chunking"]["size"],
chunk_overlap=self.config["chunking"]["overlap"],
separators=["\n\n", "\n", "。", "!", "?", ";", ",", " "]
)
def _init_vector_db(self):
"""初始化向量数据库"""
db_type = self.config["vector_db"]["type"]
if db_type == "faiss":
return FAISS # 实际应用中需要更详细的初始化
elif db_type == "chroma":
return Chroma # 实际应用中需要更详细的初始化
else:
raise ValueError(f"不支持的向量数据库类型: {db_type}")
def _get_cache_key(self, document: str) -> str:
"""生成文档缓存键"""
return hashlib.md5(document.encode()).hexdigest()
def _load_from_cache(self, cache_key: str) -> List[List[float]]:
"""从缓存加载嵌入"""
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file, "r") as f:
return json.load(f)
return None
def _save_to_cache(self, cache_key: str, embeddings: List[List[float]]):
"""保存嵌入到缓存"""
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, "w") as f:
json.dump(embeddings, f)
def process_document(self, document: str) -> List[Dict[str, Any]]:
"""处理单个文档"""
# 1. 预处理
cleaned = clean_text(document)
normalized = normalize_text(cleaned)
# 2. 分块
chunks = self.text_splitter.split_text(normalized)
# 3. 向量化(使用缓存)
cache_key = self._get_cache_key(normalized)
cached_embeddings = self._load_from_cache(cache_key)
if cached_embeddings is not None and len(cached_embeddings) == len(chunks):
embeddings = cached_embeddings
else:
embeddings = self.embedding_model.embed_documents(chunks)
self._save_to_cache(cache_key, embeddings)
# 4. 准备元数据
processed_chunks = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
processed_chunks.append({
"text": chunk,
"embedding": embedding,
"chunk_id": f"{cache_key}_{i}",
"metadata": {
"chunk_index": i,
"total_chunks": len(chunks)
}
})
return processed_chunks
def process_documents(self, documents: List[str]) -> List[List[Dict[str, Any]]]:
"""批量处理文档"""
return [self.process_document(doc) for doc in documents]
def add_to_vector_db(self, processed_chunks: List[List[Dict[str, Any]]]):
"""将处理后的文档块添加到向量数据库"""
# 展平列表
all_chunks = [chunk for sublist in processed_chunks for chunk in sublist]
# 提取文本和嵌入
texts = [chunk["text"] for chunk in all_chunks]
embeddings = [chunk["embedding"] for chunk in all_chunks]
metadatas = [chunk["metadata"] for chunk in all_chunks]
# 添加到向量数据库
self.vector_db.add_embeddings(
texts=texts,
embeddings=embeddings,
metadatas=metadatas
)
# 配置示例
config = {
"embedding": {
"model_type": "huggingface",
"model_name": "GanymedeNil/text2vec-large-chinese"
},
"chunking": {
"size": 512,
"overlap": 50
},
"vector_db": {
"type": "chroma",
"persist_dir": "./chroma_db"
},
"cache_dir": "./embedding_cache"
}
# 使用示例
if __name__ == "__main__":
# 初始化处理器
processor = RAGDocumentProcessor(config)
# 加载文档(实际应用中可能从文件或网络加载)
documents = [
"这是第一个文档的内容...",
"这是第二个文档的内容...",
# 更多文档...
]
# 处理文档
processed = processor.process_documents(documents)
# 添加到向量数据库
processor.add_to_vector_db(processed)
print(f"成功处理并存储了{len(documents)}个文档")
九、性能优化与高级技巧
构建生产级RAG文档处理系统时,需要考虑以下优化策略:
-
并行处理:
- 使用多线程/多进程加速文档处理
- 批量处理嵌入计算
-
缓存机制:
- 缓存嵌入结果避免重复计算
- 实现增量更新,只处理变更文档
-
混合检索策略:
- 结合密集向量检索和稀疏检索(如BM25)
- 使用重排序(re-ranking)提高结果质量
-
分块优化:
- 动态调整分块大小
- 基于语义的分块(使用LLM识别最佳分界点)
-
元数据增强:
- 为每个块添加丰富元数据(来源、时间、重要性等)
- 支持基于元数据的过滤检索
代码示例:带重排序的混合检索
from rank_bm25 import BM25Okapi
from typing import List, Tuple
class HybridRetriever:
def __init__(self, vector_retriever, documents: List[str]):
self.vector_retriever = vector_retriever
self.bm25 = BM25Okapi([doc.split() for doc in documents])
self.documents = documents
def retrieve(self, query: str, k: int = 10) -> List[Tuple[str, float]]:
# 向量检索
vector_results = self.vector_retriever.search(query, k=k*2)
# BM25检索
tokenized_query = query.split()
bm25_scores = self.bm25.get_scores(tokenized_query)
bm25_results = sorted(
zip(self.documents, bm25_scores),
key=lambda x: x[1],
reverse=True
)[:k*2]
# 合并结果
all_results = {}
for doc, score in vector_results:
all_results[doc] = score * 0.7 # 向量检索权重
for doc, score in bm25_results:
if doc in all_results:
all_results[doc] += score * 0.3 # BM25权重
else:
all_results[doc] = score * 0.3
# 重排序
reranked = sorted(
all_results.items(),
key=lambda x: x[1],
reverse=True
)[:k]
return reranked
十、评估与监控
构建RAG系统后,需要建立评估机制来监控其性能:
-
检索质量评估:
- 命中率(Hit Rate)
- 平均倒数排名(MRR)
- 精确率@k(Precision@k)
-
生成质量评估:
- 事实一致性(Factual Consistency)
- 相关性(Relevance)
- 流畅度(Fluency)
-
系统性能监控:
- 处理延迟
- 吞吐量
- 资源利用率
代码示例:基础评估实现
from typing import List, Dict
class RAGEvaluator:
def __init__(self, golden_data: Dict[str, List[str]]):
"""
golden_data: 包含查询和预期文档ID的字典
"""
self.golden_data = golden_data
def evaluate_retrieval(self, query: str, retrieved_ids: List[str], k: int = 5) -> Dict[str, float]:
"""评估检索结果"""
relevant_ids = self.golden_data.get(query, [])
# 计算指标
hits = len(set(retrieved_ids[:k]) & set(relevant_ids))
precision = hits / k if k > 0 else 0
recall = hits / len(relevant_ids) if relevant_ids else 0
mrr = 0
for i, id in enumerate(retrieved_ids[:k], 1):
if id in relevant_ids:
mrr = 1 / i
break
return {
"precision@k": precision,
"recall@k": recall,
"mrr@k": mrr,
"hit@k": 1 if hits > 0 else 0
}
def evaluate_generation(self, generated: str, reference: str) -> Dict[str, float]:
"""评估生成质量(简化版)"""
# 实际应用中可以使用更复杂的指标如BERTScore、BLEU等
generated_words = set(generated.lower().split())
reference_words = set(reference.lower().split())
overlap = generated_words & reference_words
precision = len(overlap) / len(generated_words) if generated_words else 0
recall = len(overlap) / len(reference_words) if reference_words else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return {
"precision": precision,
"recall": recall,
"f1": f1
}
十一、实际应用中的挑战与解决方案
在实际部署RAG文档处理系统时,可能会遇到以下挑战:
-
文档质量参差不齐:
- 解决方案:实现强大的预处理流水线,包括质量检测模块
-
多语言支持:
- 解决方案:使用多语言嵌入模型(如paraphrase-multilingual-MiniLM-L12-v2)
-
领域适应:
- 解决方案:领域特定微调嵌入模型
- 解决方案:添加领域特定词汇和同义词扩展
-
实时更新:
- 解决方案:实现增量索引更新机制
- 解决方案:设置文档过期时间并定期刷新
-
规模化问题:
- 解决方案:分布式文档处理
- 解决方案:分层索引结构
十二、未来发展方向
RAG技术仍在快速发展中,未来可能的方向包括:
- 动态分块:根据查询内容动态调整分块策略
- 多模态RAG:结合文本、图像、表格等多种数据形式
- 主动检索:生成模型主动决定何时及如何检索
- 迭代检索:基于初步结果进行多轮精炼检索
- 解释性增强:提供检索结果的来源和可信度解释
十三、总结
本文详细介绍了RAG系统中的文档处理流程,从原始文档加载到最终向量存储的完整过程。我们探讨了每个关键步骤的技术细节,并提供了实用的代码实现。一个高效的文档处理流水线是构建高质量RAG系统的基础,需要根据具体应用场景调整和优化各个组件。
随着大语言模型和向量数据库技术的不断发展,RAG系统的文档处理流程也将持续演进,为企业知识管理、智能问答等应用场景提供更强大的支持。

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)