RAG数据准备:维基百科知识库本地数据处理全流程技术详解
本文系统介绍了维基百科数据在构建RAG知识库中的全流程处理,包括数据获取、清洗与存储。主要内容包括:维基百科提供多种格式的数据转储文件(XML、SQL、JSON等),可通过wget或BitTorrent下载;使用WikiExtractor工具进行多进程并行处理,提取文本并去除标记语言;详细的数据清洗流程涵盖HTML实体解码、特殊页面过滤等步骤。文章还解析了维基百科XML文件结构和命名规范,为构建高
一、引言
在构建基于检索增强生成(Retrieval-Augmented Generation, RAG)的智能系统时,高质量的知识库是系统性能的基石。维基百科作为全球最大的开放知识库之一,提供了丰富、结构化且持续更新的数据源。
本文将深入探讨维基百科数据的完整处理流程,从原始数据获取、数据清洗、文本切分到最终存储,为构建企业级知识库系统提供技术参考。
二、维基百科数据获取
2.1 维基百科数据转储(Database Dumps)概述
维基百科提供免费的数据库副本供用户下载和使用,这些数据可用于镜像、个人使用、离线访问或数据库查询等场景。
所有文本内容遵循知识共享署名-相同方式共享4.0许可协议(CC-BY-SA),大部分内容还额外遵循GNU自由文档许可协议(GFDL)。
维基百科的数据转储文件托管在官方服务器上,可通过以下地址访问:
- 英文维基:
https://dumps.wikimedia.org/enwiki/latest/ - 中文维基:
https://dumps.wikimedia.org/zhwiki/latest/
2.2 文件格式详解
维基百科提供多种格式的数据转储文件,每种格式针对不同的使用场景:
| 文件格式 | 压缩算法 | 适用场景 | 优势 |
|---|---|---|---|
.xml.bz2 |
Bzip2 | 完整文章内容提取 | 高压缩率,适合大规模数据 |
.xml.gz |
Gzip | 快速解压场景 | 解压速度快 |
.sql.gz |
Gzip | 数据库直接导入 | 便于MySQL等数据库系统 |
.jsonl |
无/Gzip | 流式处理 | 逐行解析,内存友好 |
2.3 文件命名规范解析
维基百科转储文件遵循特定命名规范:
{language}wiki-{date}-{type}.xml.bz2
示例:
zhwiki-20241020-pages-articles.xml.bz2- 中文维基完整文章enwiki-20241020-pages-articles-multistream.xml.bz2- 英文维基多流格式
关键文件类型说明:
| 文件类型后缀 | 内容描述 | 推荐用途 |
|---|---|---|
pages-articles |
完整文章内容,包含文本、链接和元数据 | NLP任务和数据挖掘 |
pages-articles-multistream |
多数据流格式,支持快速随机访问 | 大规模并行处理 |
abstract |
文章摘要 | 快速了解文章主题 |
pages-meta-current |
最新版本的元数据和编辑历史 | 分析编辑动态 |
2.4 数据下载最佳实践
2.4.1 使用wget下载
# 基础下载命令
wget https://dumps.wikimedia.org/zhwiki/latest/zhwiki-latest-pages-articles1.xml-p1p187712.bz2
# 支持断点续传的下载
wget -c https://dumps.wikimedia.org/zhwiki/latest/zhwiki-latest-pages-articles1.xml-p1p187712.bz2
# 限速下载(避免占用过多带宽)
wget --limit-rate=1m https://dumps.wikimedia.org/zhwiki/latest/zhwiki-latest-pages-articles1.xml-p1p187712.bz2
2.4.2 使用BitTorrent下载
维基百科推荐使用BitTorrent客户端下载数据转储,这种方式具有诸多优势,包括减轻服务器负载和节约带宽成本。
# 安装transmission-cli
sudo apt-get install transmission-cli
# 使用torrent文件下载
transmission-cli zhwiki-latest-pages-articles.xml.bz2.torrent
2.5 XML数据结构解析
维基百科转储的XML文件结构由两类主要对象组成:siteinfo对象和page对象。
2.5.1 Siteinfo节点
<siteinfo>
<sitename>Wikipedia</sitename>
<dbname>zhwiki</dbname>
<base>https://zh.wikipedia.org/wiki/Wikipedia:首页</base>
<generator>MediaWiki 1.41.0-wmf.11</generator>
<case>first-letter</case>
<namespaces>
<namespace key="0" case="first-letter" />
<namespace key="1" case="first-letter">Talk</namespace>
<!-- 更多命名空间 -->
</namespaces>
</siteinfo>
2.5.2 Page节点
<page>
<title>人工智能</title>
<ns>0</ns>
<id>12345</id>
<revision>
<id>67890</id>
<timestamp>2024-10-20T12:00:00Z</timestamp>
<contributor>
<username>Example</username>
<id>54321</id>
</contributor>
<text xml:space="preserve">
<!-- 文章的维基标记文本 -->
</text>
</revision>
</page>
三、数据提取与清洗
3.1 WikiExtractor工具详解
WikiExtractor是一个Python脚本,用于从维基百科数据库备份转储中提取和清洗文本。该工具能够有效去除Wiki标记语言、HTML标签和其他格式信息,生成纯文本数据。
3.1.1 WikiExtractor核心特性
- 模板扩展机制:
- WikiExtractor通过预处理整个转储文件并提取模板定义来执行模板扩展
- 支持自定义模板文件,加速重复提取过程
- 多进程并行处理:
- 利用多进程并行处理文章,显著提升处理速度
- 保持解析模板的缓存,优化重复提取性能
- 输出格式灵活性:
- 支持JSON格式输出
- 支持自定义文档格式
- 可配置输出文件大小
3.1.2 WikiExtractor安装与使用
# 克隆WikiExtractor仓库
git clone https://github.com/attardi/wikiextractor.git
cd wikiextractor
# 或直接通过pip安装
pip install wikiextractor
# 基础使用命令
python -m wikiextractor.WikiExtractor \
--json \
--filter_disambig_pages \
--processes 4 \
-o output_directory \
zhwiki-latest-pages-articles.xml.bz2
# 高级用法:保存模板以加速后续处理
python -m wikiextractor.WikiExtractor \
--json \
--templates extracted_templates.txt \
--processes 8 \
-o output_directory \
zhwiki-latest-pages-articles.xml.bz2
参数说明:
| 参数 | 说明 | 推荐值 |
|---|---|---|
--json |
输出JSON格式 | 推荐开启 |
--filter_disambig_pages |
过滤消歧义页面 | 推荐开启 |
--processes |
并行进程数 | CPU核心数 |
--no-templates |
不展开模板(加速) | 根据需求 |
-b, --bytes |
每个输出文件大小 | 默认1M |
3.1.3 WikiExtractor输出格式
JSON格式输出示例:
{
"id": "12345",
"url": "https://zh.wikipedia.org/wiki/人工智能",
"title": "人工智能",
"text": "人工智能(Artificial Intelligence, AI)是指由人制造出来的机器所表现出来的智能..."
}
3.2 高级文本清洗技术
维基百科文本包含大量标记语言和特殊格式,需要进行深度清洗。
3.2.1 清洗流程架构
3.2.2 HTML实体解码
import html
def decode_html_entities(text):
"""
解码HTML实体,将编码的特殊字符恢复为原始字符
示例:
" -> "
& -> &
< -> <
> -> >
"""
return html.unescape(text)
# 示例
raw_text = "机器学习&深度学习"人工智能""
clean_text = decode_html_entities(raw_text)
# 输出: 机器学习&深度学习"人工智能"
3.2.3 消歧义页面和特殊页面过滤
import re
def filter_special_pages(title, text):
"""
过滤不适合作为知识库内容的特殊页面
过滤规则:
1. 消歧义页面
2. 列表页面
3. 索引页面
4. 重定向页面
"""
# 消歧义页面检测
if '(disambiguation)' in title.lower():
return None, None
if '(disambiguation page)' in title.lower():
return None, None
# 列表、索引、大纲页面检测
if re.match(r'(List of .+)|(Index of .+)|(Outline of .+)', title):
return None, None
# 重定向页面检测
if text.startswith("REDIRECT") or text.startswith("redirect"):
return None, None
return title, text
3.2.4 Wiki标记和格式清理
Wiki标记语言包含大量用于排版和引用的特殊语法,需要系统性清除:
import re
def clean_wiki_markup(text):
"""
清理Wiki标记语言和格式标签
"""
# 移除引用标记
text = re.sub(r'\{\{cite .*?\}\}', ' ', text, flags=re.DOTALL)
# 移除表格标记
text = text.replace(r"TABLETOREPLACE", " ")
# 移除Wiki链接标记
text = text.replace(r"[[", " ")
text = text.replace(r"]]", " ")
text = text.replace(r"{{", " ")
text = text.replace(r"}}", " ")
# 移除粗体和斜体标记
text = text.replace(r"'''", " ")
text = text.replace(r"''", " ")
# 移除HTML标签
text = text.replace("<br>", " ")
text = text.replace("<br/>", " ")
# 处理HTML实体
text = text.replace(""", '"')
text = text.replace("&", "&")
text = text.replace(" ", " ")
# 移除数学公式标签(保留内容不现实,直接移除)
text = re.sub(r'<math.*?</math>', '', text, flags=re.DOTALL)
text = re.sub(r'<chem.*?</chem>', '', text, flags=re.DOTALL)
text = re.sub(r'<score.*?</score>', '', text, flags=re.DOTALL)
return text
3.2.5 样式属性和格式清理
维基百科表格和信息框包含大量样式属性,这些对文本理解无益:
def remove_style_attributes(text):
"""
移除表格和信息框的样式属性
"""
# 移除各类style属性
style_patterns = [
r'\| ?item[0-9]?_?style= ?.*? ',
r'\| ?col[0-9]?_?style= ?.*? ',
r'\| ?row[0-9]?_?style= ?.*? ',
r'\| ?style= ?.*? ',
r'\| ?bodystyle= ?.*? ',
r'\| ?frame_?style= ?.*? ',
r'\| ?data_?style= ?.*? ',
r'\| ?label_?style= ?.*? ',
r'\| ?headerstyle= ?.*? ',
r'\| ?list_?style= ?.*? ',
r'\| ?title_?style= ?.*? ',
r'\| ?ul_?style= ?.*? ',
r'\| ?li_?style= ?.*? ',
r'\| ?border-style= ?.*? ',
]
for pattern in style_patterns:
text = re.sub(pattern, ' ', text)
# 移除HTML属性
html_attrs = [
r'\|? ?style=".*?"',
r'\|? ?rowspan=".*?"',
r'\|? ?colspan=".*?"',
r'\|? ?scope=".*?"',
r'\|? ?align=".*?"',
r'\|? ?valign=".*?"',
r'\|? ?lang=".*?"',
r'\|? ?bgcolor=".*?"',
r'\|? ?width=".*?"',
r'\|? ?height=[0-9]+',
r'\|? ?width=[0-9]+',
r'\|? ?rowspan=[0-9]+',
r'\|? ?colspan=[0-9]+',
]
for pattern in html_attrs:
text = re.sub(pattern, '', text)
return text
3.2.6 文本规范化
def normalize_text(text):
"""
文本规范化处理
"""
# 替换换行符和制表符为空格
text = re.sub(r'[\n\t]', ' ', text)
# 移除空标签
text = re.sub(r'<.*?/>', '', text)
# 移除ref标签内容
text = re.sub(r'<ref>.*?</ref>', ' ', text)
text = re.sub(r'<.*?>', ' ', text)
# 移除文件链接
text = re.sub(r'File:[A-Za-z0-9 ]+\.[a-z]{3,4}(\|[0-9]+px)?', '', text)
# 移除来源标注
text = re.sub(r'Source: \[.*?\]', '', text)
# 移除多余空格
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
3.3 关键词过滤与语料筛选
在实际应用中,往往不需要全部维基百科内容,而是需要特定领域的语料(可使用ac自动机提高检索速度,后续其他文章介绍):
def filter_by_keywords(corpus, keywords, max_count=None):
"""
根据关键词过滤语料
Args:
corpus: 语料列表
keywords: 关键词列表
max_count: 最大保留数量
Returns:
过滤后的语料列表
"""
filtered_corpus = []
for item in corpus:
text = item.get('text', '')
# 检查是否包含任何关键词
if any(keyword in text for keyword in keywords):
filtered_corpus.append(item)
# 达到最大数量则停止
if max_count and len(filtered_corpus) >= max_count:
break
return filtered_corpus
# 使用示例
keywords = [
'人工智能',
'机器学习',
'深度学习',
'自然语言处理',
'计算机视觉',
'神经网络'
]
filtered_data = filter_by_keywords(corpus, keywords, max_count=10000)
四、文本切分(Chunking)策略
文本切分是RAG系统中至关重要的一环,有效的切分策略能够确保搜索结果准确捕捉用户查询的本质。
4.1 文本切分的重要性
4.1.1 为什么需要文本切分
- 模型Token限制:
- 许多AI模型(如BERT)有最大输入长度限制(例如512或1024个token)
- 超过限制的段落需要进一步分割或截断
- 检索精度提升:
- 通过找到有效的切分策略,可以确保搜索结果准确反映用户查询的精髓
- 合适大小的块更容易匹配查询意图
- 上下文保持:
- 切分的文本块如果对人类来说在没有周围上下文的情况下仍有意义,那么对语言模型也同样有意义
- 保持语义完整性是切分的核心原则
- 计算效率:
- 适当大小的块可以优化嵌入计算和检索速度
- 减少不必要的计算开销
4.2 常见切分策略对比
文本切分方法各有优缺点,需要根据具体用例选择最合适的方法。
| 策略类型 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| 固定大小切分 | 实现简单,处理快速 | 可能破坏语义边界 | 对语义要求不高的场景 |
| 句子切分 | 保持语义完整性 | 句子长度不一 | 需要精确语义的任务 |
| 段落切分 | 保持主题连贯性 | 段落大小差异大 | 结构化文档 |
| 递归切分 | 平衡结构和大小 | 实现复杂 | 多层次文档 |
| 语义切分 | 语义连贯性最佳 | 计算成本高 | 高质量要求场景 |
4.3 基于spaCy的智能切分
spaCy是一个高性能的NLP库,提供复杂的句子分割功能,能够有效地将文本分成独立的句子,从而在结果块中实现更好的上下文保留。
4.3.1 spaCy中文模型详解
spaCy提供多个中文模型,各有特点:
| 模型名称 | 模型大小 | 词向量数量 | 适用场景 |
|---|---|---|---|
zh_core_web_sm |
~50MB | 无词向量 | 快速分词、轻量级应用 |
zh_core_web_md |
~200MB | 20,000个向量 | 一般NLP任务 |
zh_core_web_lg |
~500MB | 500,000个向量 | 高精度要求任务 |
zh_core_web_trf |
~400MB | Transformer | 最高精度 |
中文语言类支持三种分词选项:char(字符)、jieba和pkuseg。
在spaCy v3.0中,默认分词器已从Jieba切换到字符分割。
4.3.2 安装和加载spaCy中文模型
# 安装spaCy
pip install spacy
# 下载中文大模型(推荐用于知识库处理)
python -m spacy download zh_core_web_lg
# 或下载中型模型(平衡性能和精度)
python -m spacy download zh_core_web_md
4.3.3 使用spaCy进行句子切分
import spacy
# 加载中文模型
nlp = spacy.load("zh_core_web_lg")
def sentence_based_chunking(text, max_sentences=5):
"""
基于句子的文本切分
Args:
text: 输入文本
max_sentences: 每个块的最大句子数
Returns:
文本块列表
"""
# 使用spaCy处理文本
doc = nlp(text)
# 提取句子
sentences = [sent.text.strip() for sent in doc.sents]
# 按指定句子数分块
chunks = []
for i in range(0, len(sentences), max_sentences):
chunk = " ".join(sentences[i:i + max_sentences])
chunks.append(chunk)
return chunks
# 使用示例
text = """
人工智能是计算机科学的一个分支。它试图理解智能的本质。
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。
该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。
"""
chunks = sentence_based_chunking(text, max_sentences=2)
for i, chunk in enumerate(chunks, 1):
print(f"块 {i}: {chunk}\n")
4.4 滑动窗口切分策略
滑动窗口技术通过在连续片段之间创建共享内容来实现重叠切分,确保连续性并在边界处保留语义流。
def sliding_window_chunking(text, chunk_size=100, stride=50):
"""
滑动窗口切分策略
Args:
text: 输入文本
chunk_size: 每个块的词数
stride: 滑动步长(词数)
Returns:
文本块列表
"""
doc = nlp(text)
# 提取所有词元(tokens)
tokens = [token.text for token in doc if not token.is_space]
chunks = []
for i in range(0, len(tokens), stride):
# 提取chunk_size个词
chunk_tokens = tokens[i:i + chunk_size]
# 如果剩余词数不足,仍然保存
if len(chunk_tokens) > 0:
chunk = " ".join(chunk_tokens)
chunks.append(chunk)
# 如果已经到达末尾,停止
if i + chunk_size >= len(tokens):
break
return chunks
# 使用示例
text = "这是一段需要切分的长文本..." * 50
chunks = sliding_window_chunking(text, chunk_size=100, stride=50)
print(f"生成了 {len(chunks)} 个文本块")
滑动窗口示意图:
4.5 基于词数的固定切分
特别适合中文文本处理:
def fixed_word_count_chunking(text, words_per_chunk=100, nlp_model=None):
"""
基于固定词数的切分策略
特点:
1. 每个块包含固定数量的词(不包括标点和空格)
2. 保持原始的空格和标点格式
3. 适合中文等不以空格分词的语言
Args:
text: 输入文本
words_per_chunk: 每块的词数
nlp_model: spaCy模型(如未提供则使用全局nlp)
Returns:
文本块列表
"""
if nlp_model is None:
nlp_model = nlp
# 使用spaCy处理文本
doc = nlp_model(text)
chunks = []
word_count = 0
chunk_tokens = []
for token in doc:
# 添加token(保留空格)
chunk_tokens.append(token.text_with_ws)
# 只计数非空格非标点的词
if not token.is_space and not token.is_punct:
word_count += 1
# 达到指定词数,生成一个块
if word_count == words_per_chunk:
chunks.append(''.join(chunk_tokens))
chunk_tokens = []
word_count = 0
# 处理剩余的词
if chunk_tokens:
# 如果剩余词数太少,尝试继续收集直到达到词数或文档结束
if word_count < words_per_chunk:
for token in doc:
if len(chunk_tokens) > 0: # 确保不重复添加
continue
chunk_tokens.append(token.text_with_ws)
if not token.is_space and not token.is_punct:
word_count += 1
if word_count == words_per_chunk:
break
# 保存最后一块
chunks.append(''.join(chunk_tokens))
return chunks
4.6 语义切分策略
语义切分涉及根据意义或主题连贯性而非预定义大小或结构元素来分割文档。
这种策略利用自然语言处理技术将文本分组为代表不同想法、主题或子主题的片段。
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def semantic_chunking(text, similarity_threshold=0.5):
"""
基于语义相似度的切分策略
原理:
1. 将文本分成句子
2. 计算相邻句子的嵌入向量
3. 根据相似度阈值决定是否分割
Args:
text: 输入文本
similarity_threshold: 相似度阈值
Returns:
语义连贯的文本块列表
"""
doc = nlp(text)
sentences = [sent.text for sent in doc.sents]
if len(sentences) <= 1:
return sentences
# 获取句子嵌入(使用spaCy的词向量平均)
embeddings = []
# spaCy 的中文模型(如 zh_core_web_lg)确实包含词向量,但质量不如专用嵌入模型
for sent in doc.sents:
if sent.vector_norm > 0: # 确保有词向量
embeddings.append(sent.vector)
else:
# 如果没有词向量,使用零向量
embeddings.append(np.zeros(300)) # spaCy默认300维
# 计算相邻句子的相似度
similarities = []
for i in range(len(embeddings) - 1):
sim = cosine_similarity(
embeddings[i].reshape(1, -1),
embeddings[i + 1].reshape(1, -1)
)[0][0]
similarities.append(sim)
# 根据相似度阈值分割
chunks = []
current_chunk = [sentences[0]]
for i, sim in enumerate(similarities):
if sim >= similarity_threshold:
# 相似度高,继续添加到当前块
current_chunk.append(sentences[i + 1])
else:
# 相似度低,开始新块
chunks.append(" ".join(current_chunk))
current_chunk = [sentences[i + 1]]
# 添加最后一块
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
4.7 切分策略选择指南
选择建议:
- 百科类知识库(如维基百科):
- 推荐:句子切分 + 固定词数限制
- 原因:保持语义完整,便于检索
- 技术文档:
- 推荐:基于标题的层次切分
- 原因:保持文档结构,便于定位
- 新闻文章:
- 推荐:段落切分
- 原因:每个段落通常包含完整观点
- 对话数据:
- 推荐:基于轮次的切分
- 原因:保持对话上下文
五、数据存储方案
5.1 JSONL格式详解
JSONL(JSON Lines)是一种文本数据格式,其中每一行代表一个有效的JSON对象。这种格式特别适合大规模NLP任务。
5.1.1 JSONL格式优势
JSONL格式的主要优势包括内存效率和易于逐行处理:
- 内存效率:
- JSONL允许一次处理一个对象,无需将整个文件加载到内存中,非常适合处理大型数据集
- 流式处理:
- 由于每行都是独立的,因此非常适合一次处理一个对象的大型数据集,无需将所有内容加载到内存中
- 易于追加:
- 添加新数据很简单,只需在文件末尾追加一个新行并附上有效的JSON对象
- 易于并行处理:
- 每行独立,便于多线程/多进程处理
5.1.2 JSONL vs JSON对比
| 特性 | JSON | JSONL |
|---|---|---|
| 存储方式 | 单个大对象或数组 | 每行一个对象 |
| 解析方式 | 必须一次性解析整个文件 | 可逐行解析 |
| 内存占用 | 需要加载整个文件 | 只需加载当前行 |
| 修改难度 | 较难 | 容易(每行独立) |
| 流式处理 | 不支持 | 原生支持 |
| 文件大小 | 可能较大(包含完整结构) | 相对较小 |
5.1.3 JSONL格式示例
{"id": 0, "title": "人工智能", "contents": "人工智能是计算机科学的一个分支..."}
{"id": 1, "title": "机器学习", "contents": "机器学习是人工智能的一个子领域..."}
{"id": 2, "title": "深度学习", "contents": "深度学习是机器学习的一种方法..."}
5.2 JSONL文件的读写操作
5.2.1 写入JSONL文件
import json
import os
def write_to_jsonl(data_list, output_path):
"""
将数据列表写入JSONL文件
Args:
data_list: 字典列表,每个字典代表一条记录
output_path: 输出文件路径
"""
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
for item in data_list:
# 将字典转换为JSON字符串,ensure_ascii=False保留中文
json_string = json.dumps(item, ensure_ascii=False)
# 写入文件并添加换行符
f.write(json_string + '\n')
print(f"成功写入 {len(data_list)} 条记录到 {output_path}")
# 使用示例
knowledge_base = [
{
'id': 0,
'title': '人工智能',
'contents': '人工智能是计算机科学的一个分支...'
},
{
'id': 1,
'title': '机器学习',
'contents': '机器学习是人工智能的一个子领域...'
}
]
write_to_jsonl(knowledge_base, 'knowledge_base/ai_corpus.jsonl')
5.2.2 读取JSONL文件
def read_from_jsonl(file_path, max_records=None):
"""
从JSONL文件读取数据
Args:
file_path: 输入文件路径
max_records: 最大读取记录数(None表示读取全部)
Returns:
数据列表
"""
data_list = []
with open(file_path, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
# 解析JSON行
record = json.loads(line)
data_list.append(record)
# 达到最大记录数则停止
if max_records and i + 1 >= max_records:
break
return data_list
# 使用示例
corpus = read_from_jsonl('knowledge_base/ai_corpus.jsonl', max_records=1000)
print(f"读取了 {len(corpus)} 条记录")
5.2.3 流式读取JSONL(内存友好)
def stream_jsonl(file_path):
"""
流式读取JSONL文件,适用于大文件
Args:
file_path: 输入文件路径
Yields:
每次返回一条记录
"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield json.loads(line)
# 使用示例:处理大文件而不占用太多内存
total_length = 0
for record in stream_jsonl('large_knowledge_base.jsonl'):
total_length += len(record.get('contents', ''))
# 这里可以进行其他处理,如向量化、索引等
print(f"总文本长度: {total_length}")
5.3 压缩存储
JSONL文件可以使用gzip或bzip2等流压缩器进行压缩以节省空间,生成.jsonl.gz或.jsonl.bz2文件。
import gzip
import json
def write_to_compressed_jsonl(data_list, output_path):
"""
写入压缩的JSONL文件(.jsonl.gz)
"""
with gzip.open(output_path, 'wt', encoding='utf-8') as f:
for item in data_list:
json_string = json.dumps(item, ensure_ascii=False)
f.write(json_string + '\n')
def read_from_compressed_jsonl(file_path):
"""
读取压缩的JSONL文件
"""
data_list = []
with gzip.open(file_path, 'rt', encoding='utf-8') as f:
for line in f:
data_list.append(json.loads(line))
return data_list
# 使用示例
write_to_compressed_jsonl(knowledge_base, 'knowledge_base/ai_corpus.jsonl.gz')
compressed_corpus = read_from_compressed_jsonl('knowledge_base/ai_corpus.jsonl.gz')
5.4 向量数据库存储
对于RAG系统,除了原始文本存储,还需要向量数据库存储嵌入向量以支持语义检索。
5.4.1 RAG架构中的向量数据库角色
在RAG工作流程中,向量数据库存储文档和查询的嵌入向量,并基于向量相似度进行检索。当发出查询时,数据库根据查询向量与文档向量的相似度检索相关文档。
5.4.2 常见向量数据库对比
| 向量数据库 | 特点 | 优势 | 适用场景 |
|---|---|---|---|
| Pinecone | 全托管云服务 | 易于使用,可扩展性强 | 快速原型和生产部署 |
| Weaviate | 开源,知识图谱 | GraphQL API,丰富的模式 | 复杂的知识图谱场景 |
| Chroma | 轻量级,内嵌式 | 易于集成,适合开发 | 本地开发和小规模应用 |
| FAISS | Meta开源 | 高性能,多种索引 | 大规模相似度搜索 |
| Qdrant | 开源,Rust编写 | 高性能,易部署 | 生产级应用 |
| Milvus | 云原生 | 可扩展,支持多种索引 | 大规模企业应用 |
5.4.3 使用Chroma构建本地向量库
import chromadb
from chromadb.config import Settings
def create_chroma_collection(
corpus_path,
collection_name="wiki_knowledge_base",
embedding_model="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
):
"""
创建Chroma向量数据库集合
Args:
corpus_path: JSONL语料库路径
collection_name: 集合名称
embedding_model: 嵌入模型名称
Returns:
collection对象
"""
# 初始化Chroma客户端
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_db"
))
# 创建或获取集合
collection = client.get_or_create_collection(
name=collection_name,
metadata={"description": "维基百科知识库"}
)
# 读取语料库
documents = []
metadatas = []
ids = []
with open(corpus_path, 'r', encoding='utf-8') as f:
for line in f:
record = json.loads(line)
documents.append(record['contents'])
metadatas.append({
'title': record['title'],
'id': record['id']
})
ids.append(str(record['id']))
# 批量添加文档(Chroma会自动进行向量化)
batch_size = 1000
for i in range(0, len(documents), batch_size):
end_idx = min(i + batch_size, len(documents))
collection.add(
documents=documents[i:end_idx],
metadatas=metadatas[i:end_idx],
ids=ids[i:end_idx]
)
print(f"已添加 {end_idx}/{len(documents)} 条文档")
return collection
# 使用示例
collection = create_chroma_collection('knowledge_base/wiki_corpus.jsonl')
5.4.4 向量检索示例
def semantic_search(collection, query, top_k=5):
"""
语义检索
Args:
collection: Chroma集合对象
query: 查询文本
top_k: 返回前K个结果
Returns:
检索结果
"""
results = collection.query(
query_texts=[query],
n_results=top_k
)
return results
# 使用示例
query = "什么是深度学习?"
results = semantic_search(collection, query, top_k=3)
print(f"查询: {query}\n")
for i, (doc, metadata, distance) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
), 1):
print(f"结果 {i}:")
print(f"标题: {metadata['title']}")
print(f"相似度: {1 - distance:.4f}")
print(f"内容: {doc[:200]}...\n")
5.5 混合存储策略
实际应用中,通常采用混合存储策略:
混合存储优势:
- JSONL文本存储:
- 保存完整原始文本
- 便于全文检索和关键词匹配
- 易于备份和迁移
- 支持快速浏览和审查
- 向量数据库:
- 支持语义检索
- 快速相似度计算
- 处理同义词和语义变体
- 支持多语言检索
- 协同工作:
- 向量检索找到候选文档
- JSONL提供完整文本内容
- 可实现混合排序(BM25 + 向量)
六、完整处理流程实现
6.1 端到端处理脚本
将前面介绍的所有步骤整合为完整的处理流程:
import argparse
import json
import os
import subprocess
import shutil
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool
from tqdm import tqdm
import spacy
import html
import re
class WikiKnowledgeBaseProcessor:
"""维基百科知识库处理器"""
def __init__(self, args):
self.args = args
self.nlp = None
self.corpus = []
def load_corpus(self, dir_path):
"""
从目录加载JSONL语料,支持关键词过滤
"""
keywords = [
'人工智能', '机器学习', '深度学习',
'自然语言处理', '计算机视觉', '神经网络'
]
def iter_files(path):
"""遍历所有文件"""
if os.path.isfile(path):
yield path
elif os.path.isdir(path):
for dirpath, _, filenames in os.walk(path):
for f in filenames:
yield os.path.join(dirpath, f)
def read_jsonl_file(file_path):
"""读取JSONL文件"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
try:
json_data = json.loads(line)
# 关键词过滤
if any(kw in json_data.get('text', '') for kw in keywords):
self.corpus.append(json_data)
# 限制数量
if len(self.corpus) >= self.args.max_documents:
return
except json.JSONDecodeError:
continue
# 收集所有文件
all_files = list(iter_files(dir_path))
# 多线程读取
with ThreadPoolExecutor(max_workers=self.args.num_workers) as executor:
executor.map(read_jsonl_file, all_files)
print(f"加载了 {len(self.corpus)} 条语料")
return self.corpus
def basic_clean(self, title, text):
"""基础文本清洗"""
# HTML实体解码
title = html.unescape(title)
text = html.unescape(text)
text = text.strip()
# 过滤特殊页面
if '(disambiguation)' in title.lower():
return None, None
if re.match(r'(List of .+)|(Index of .+)|(Outline of .+)', title):
return None, None
if text.startswith("REDIRECT") or text.startswith("redirect"):
return None, None
# 清理Wiki标记
text = re.sub(r'\{\{cite .*?\}\}', ' ', text, flags=re.DOTALL)
text = text.replace(r"'''", " ")
text = text.replace(r"[[", " ").replace(r"]]", " ")
text = text.replace(r"{{", " ").replace(r"}}", " ")
text = text.replace("<br>", " ")
# 清理HTML标签
text = re.sub(r'<math.*?</math>', '', text, flags=re.DOTALL)
text = re.sub(r'<chem.*?</chem>', '', text, flags=re.DOTALL)
# 清理样式属性
text = re.sub(r'\| ?style= ?.*? ', ' ', text)
text = re.sub(r'[\n\t]', ' ', text)
text = re.sub(r'\s+', ' ', text)
title = title.replace("\n", " ").replace("\t", " ")
return title, text.strip()
def process_documents(self):
"""处理文档:清洗和去重"""
print("开始文档预处理...")
documents = {}
for item in tqdm(self.corpus):
title = item.get('title', '')
text = item.get('text', '')
# 清洗
title, text = self.basic_clean(title, text)
if title is None:
continue
# 合并同标题文档
if title in documents:
documents[title] += " " + text
else:
documents[title] = text
return list(documents.items())
def chunk_documents(self, documents):
"""文本切分"""
print("开始文本切分...")
# 加载spaCy模型
if self.nlp is None:
self.nlp = spacy.load("zh_core_web_lg")
clean_corpus = []
# 使用spaCy的批处理
titles = [doc[0] for doc in documents]
texts = [doc[1] for doc in documents]
for idx, doc in enumerate(tqdm(
self.nlp.pipe(texts, n_process=self.args.num_workers, batch_size=10),
total=len(texts)
)):
title = titles[idx]
# 基于词数切分
chunks = []
word_count = 0
chunk_tokens = []
for token in doc:
chunk_tokens.append(token.text_with_ws)
if not token.is_space and not token.is_punct:
word_count += 1
if word_count == self.args.words_per_chunk:
chunks.append(''.join(chunk_tokens))
chunk_tokens = []
word_count = 0
# 处理剩余token
if chunk_tokens:
chunks.append(''.join(chunk_tokens))
# 保存切分结果
for chunk in chunks:
clean_text = chunk.replace("\n", " ").replace("\t", " ")
clean_corpus.append({
"title": title,
"text": clean_text
})
return clean_corpus
def save_corpus(self, corpus):
"""保存语料库"""
print(f"保存语料库到 {self.args.save_path}...")
os.makedirs(os.path.dirname(self.args.save_path), exist_ok=True)
with open(self.args.save_path, 'w', encoding='utf-8') as f:
for idx, item in enumerate(corpus):
record = {
'id': idx,
'title': item['title'],
'contents': item['text']
}
json_string = json.dumps(record, ensure_ascii=False)
f.write(json_string + '\n')
print(f"成功保存 {len(corpus)} 条记录!")
def run(self):
"""运行完整流程"""
# 1. WikiExtractor提取
temp_dir = os.path.join(
Path(self.args.save_path).parent,
'temp'
)
os.makedirs(temp_dir, exist_ok=True)
print("运行WikiExtractor...")
subprocess.run([
'python', '-m', 'wikiextractor.WikiExtractor',
'--json',
'--filter_disambig_pages',
'--quiet',
'-o', temp_dir,
'--processes', str(self.args.num_workers),
self.args.dump_path
])
# 2. 加载语料
self.load_corpus(temp_dir)
# 3. 文档处理
documents = self.process_documents()
# 4. 文本切分
clean_corpus = self.chunk_documents(documents)
# 5. 保存结果
self.save_corpus(clean_corpus)
# 6. 清理临时文件
shutil.rmtree(temp_dir)
print("处理完成!")
def main():
parser = argparse.ArgumentParser(
description='维基百科知识库处理工具'
)
parser.add_argument(
'--dump_path',
type=str,
required=True,
help='维基百科dump文件路径'
)
parser.add_argument(
'--save_path',
type=str,
default='knowledge_base/wiki_corpus.jsonl',
help='输出JSONL文件路径'
)
parser.add_argument(
'--words_per_chunk',
type=int,
default=100,
help='每个文本块的词数'
)
parser.add_argument(
'--max_documents',
type=int,
default=10000,
help='最大文档数量'
)
parser.add_argument(
'--num_workers',
type=int,
default=4,
help='并行工作进程数'
)
args = parser.parse_args()
# 运行处理器
processor = WikiKnowledgeBaseProcessor(args)
processor.run()
if __name__ == '__main__':
main()
6.2 使用示例
# 基础使用
python wiki_processor.py \
--dump_path /path/to/zhwiki-latest-pages-articles.xml.bz2 \
--save_path knowledge_base/wiki_ai_corpus.jsonl \
--words_per_chunk 100 \
--max_documents 50000 \
--num_workers 8
# 处理大文件(更多文档,更小的块)
python wiki_processor.py \
--dump_path /path/to/zhwiki-latest-pages-articles.xml.bz2 \
--save_path knowledge_base/wiki_large_corpus.jsonl \
--words_per_chunk 50 \
--max_documents 500000 \
--num_workers 16
6.3 性能优化建议
6.3.1 多进程优化
from multiprocessing import cpu_count
# 自动检测CPU核心数
optimal_workers = max(1, cpu_count() - 1)
print(f"推荐使用 {optimal_workers} 个工作进程")
6.3.2 批处理优化
spaCy支持使用pipe方法进行批处理,能够显著提升处理效率:
# 不推荐:逐个处理
for text in texts:
doc = nlp(text) # 每次都重新初始化
# 推荐:批处理
for doc in nlp.pipe(texts, batch_size=50, n_process=4):
# 处理doc
pass
6.3.3 内存管理
import gc
def process_in_batches(items, batch_size=1000):
"""
批量处理以控制内存使用
"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# 处理批次
process_batch(batch)
# 强制垃圾回收
gc.collect()
七、实际应用场景
7.1 构建问答系统
class WikiQASystem:
"""基于维基百科知识库的问答系统"""
def __init__(self, corpus_path, collection_name="wiki_qa"):
self.corpus_path = corpus_path
self.collection = self._build_vector_store(collection_name)
def _build_vector_store(self, collection_name):
"""构建向量存储"""
import chromadb
from chromadb.config import Settings
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_db"
))
collection = client.get_or_create_collection(name=collection_name)
# 加载语料
print("加载语料到向量数据库...")
with open(self.corpus_path, 'r', encoding='utf-8') as f:
batch_docs, batch_meta, batch_ids = [], [], []
for line in tqdm(f):
record = json.loads(line)
batch_docs.append(record['contents'])
batch_meta.append({'title': record['title']})
batch_ids.append(str(record['id']))
# 批量添加
if len(batch_docs) >= 1000:
collection.add(
documents=batch_docs,
metadatas=batch_meta,
ids=batch_ids
)
batch_docs, batch_meta, batch_ids = [], [], []
# 添加剩余
if batch_docs:
collection.add(
documents=batch_docs,
metadatas=batch_meta,
ids=batch_ids
)
return collection
def answer_question(self, question, top_k=3):
"""回答问题"""
# 检索相关文档
results = self.collection.query(
query_texts=[question],
n_results=top_k
)
# 构建上下文
context = "\n\n".join(results['documents'][0])
# 这里可以集成LLM生成答案
# 简化版本:直接返回最相关的内容
return {
'question': question,
'context': context,
'sources': [m['title'] for m in results['metadatas'][0]]
}
# 使用示例
qa_system = WikiQASystem('knowledge_base/wiki_corpus.jsonl')
response = qa_system.answer_question("什么是深度学习?")
print(f"问题: {response['question']}")
print(f"来源: {', '.join(response['sources'])}")
print(f"上下文:\n{response['context'][:500]}...")
7.2 语义搜索引擎
class SemanticSearchEngine:
"""语义搜索引擎"""
def __init__(self, corpus_path):
self.corpus_path = corpus_path
self.index = self._build_index()
def _build_index(self):
"""构建搜索索引"""
# 这里可以使用更高级的索引结构
# 简化版本:使用字典
index = {}
with open(self.corpus_path, 'r', encoding='utf-8') as f:
for line in f:
record = json.loads(line)
index[record['id']] = record
return index
def search(self, query, top_k=10):
"""执行搜索"""
# 实际应用中应使用向量检索
# 这里用简单的关键词匹配演示
results = []
for doc_id, doc in self.index.items():
if query.lower() in doc['contents'].lower():
results.append(doc)
return results[:top_k]
# 使用示例
search_engine = SemanticSearchEngine('knowledge_base/wiki_corpus.jsonl')
search_results = search_engine.search("人工智能的应用")
7.3 知识图谱构建
维基百科数据可用于构建知识图谱:
八、常见问题与解决方案
8.1 处理超大文件
def process_large_dump(dump_path, chunk_size_mb=100):
"""
分块处理超大dump文件
"""
import bz2
chunk_size = chunk_size_mb * 1024 * 1024
with bz2.open(dump_path, 'rt', encoding='utf-8') as f:
buffer = ""
while True:
chunk = f.read(chunk_size)
if not chunk:
break
buffer += chunk
# 查找完整的</page>标签
while '</page>' in buffer:
end_pos = buffer.find('</page>') + 7
page_xml = buffer[:end_pos]
buffer = buffer[end_pos:]
# 处理页面
process_page(page_xml)
8.2 处理中文分词问题
中文分词是一个挑战,spaCy提供了多种分词器选项,包括jieba、pkuseg和字符级分词:
# 使用pkuseg进行分词
import spacy
# 配置使用pkuseg
config = {
"nlp": {
"tokenizer": {
"@tokenizers": "spacy.zh.ChineseTokenizer",
"segmenter": "pkuseg"
}
}
}
nlp = spacy.blank("zh")
nlp.tokenizer = nlp.tokenizer.from_config(config)
8.3 内存溢出处理
import gc
def process_with_memory_management(items, processor_func, batch_size=1000):
"""
内存安全的批量处理
"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
processor_func(batch)
# 清理内存
gc.collect()
if i % 10000 == 0:
print(f"已处理 {i}/{len(items)} 项, 内存使用: {get_memory_usage()} MB")
def get_memory_usage():
"""获取当前内存使用"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
九、最佳实践总结
9.1 数据获取阶段
✅ 推荐做法:
- 使用BitTorrent下载大文件
- 选择multistream格式以支持并行处理
- 定期下载最新版本保持数据时效性
- 保留原始dump文件作为备份
❌ 避免事项:
- 不要直接爬取维基百科网站(违反使用条款)
- 不要忽视版权和许可协议
- 不要下载不需要的语言版本
9.2 数据清洗阶段
✅ 推荐做法:
- 多阶段清洗(粗清洗 → 细清洗 → 验证)
- 保留清洗日志便于调试
- 使用正则表达式批量处理
- 进行数据质量检查
❌ 避免事项:
- 不要过度清洗导致信息丢失
- 不要忽略边界情况(如公式、表格)
- 不要跳过验证步骤
9.3 文本切分阶段
✅ 推荐做法:
- 确保切分的文本块在没有周围上下文的情况下对人类仍有意义
- 根据应用场景选择合适的切分策略
- 添加适当的重叠以保持上下文
- 测试不同的块大小找到最优值
❌ 避免事项:
- 不要使用过小的块(导致上下文不足)
- 不要使用过大的块(超过模型限制)
- 不要忽视语义边界
9.4 数据存储阶段
✅ 推荐做法:
- 使用JSONL格式存储文本数据
- 同时构建向量数据库支持语义检索
- 添加适当的元数据(标题、来源等)
- 定期备份数据
- 使用压缩减少存储空间
❌ 避免事项:
- 不要存储冗余信息
- 不要忽视数据索引
- 不要混合不同格式的数据
9.5 性能优化
✅ 推荐做法:
- 使用多进程/多线程并行处理
- 批量处理减少I/O开销
- 使用spaCy的pipe方法批处理
- 监控内存使用,及时清理
❌ 避免事项:
- 不要创建过多的进程(受CPU核心数限制)
- 不要在循环中重复加载模型
- 不要忽视内存管理
十、结语
本文详细介绍了维基百科知识库从数据获取到最终存储的完整处理流程。通过WikiExtractor进行初步提取,结合精细的文本清洗、基于spaCy的智能切分,以及JSONL+向量数据库的混合存储策略,可以构建高质量的知识库系统。
这套方法论不仅适用于维基百科,也可推广到其他开放知识源的处理,为RAG系统、问答系统、语义搜索等应用提供坚实的数据基础。
关键要点回顾:
- 数据获取: 选择合适的dump文件格式,使用WikiExtractor高效提取
- 数据清洗: 多层次清洗策略,去除Wiki标记和特殊页面
- 文本切分: 根据应用场景选择合适的切分策略,保持语义完整性
- 数据存储: JSONL文本存储+向量数据库,支持多种检索方式
- 质量保证: 完善的验证机制,确保数据质量
- 性能优化: 多进程并行、批处理、内存管理
参考文献
- Wikipedia Database Download - 维基百科官方数据库下载文档,提供了数据转储的详细说明和使用指南
https://en.wikipedia.org/wiki/Wikipedia:Database_download - WikiExtractor GitHub Repository - Giuseppe Attardi开发的维基百科文本提取工具,支持多进程处理和模板扩展
https://github.com/attardi/wikiextractor - Processing Wikipedia in a Couple of Hours - James Thorne的博客文章,介绍了高效处理维基百科转储的方法
Referenced in Stack Overflow discussions - Chunking Strategies for LLM Applications - Pinecone技术文档,详细介绍了RAG系统中的文本切分策略
https://www.pinecone.io/learn/chunking-strategies/ - S2 Chunking: A Hybrid Framework - 2025年发表的学术论文,提出了结合空间和语义分析的文档分割框架
https://arxiv.org/html/2501.05485v1 - Enhancing RAG with Hierarchical Text Segmentation - 关于使用层次化文本分割增强RAG系统的研究论文
https://arxiv.org/html/2507.09935v1 - spaCy Models & Languages Documentation - spaCy官方文档,介绍了包括中文在内的多种语言模型
https://spacy.io/usage/models - spaCy Chinese Models - spaCy中文模型的详细文档和使用说明
https://spacy.io/models/zh - JSONL Format Specification - JSON Lines格式的官方规范文档
https://jsonlines.org/ - JSONL: The data structure for LLMs - Medium文章,介绍了JSONL在机器学习中的应用
https://medium.com/@ManueleCaddeo/understanding-jsonl-bc8922129f5b - Introduction to RAG and Vector Databases - 详细介绍RAG架构和向量数据库的工作原理
https://medium.com/@sachinsoni600517/introduction-to-rag-retrieval-augmented-generation-and-vector-database-b593e8eb6a94 - Retrieval Augmented Generation for Knowledge-Intensive NLP Tasks - RAG的原始研究论文,由Facebook AI Research发表
- What is RAG in AI - Qdrant技术博客,解释RAG系统的工作原理和应用
https://qdrant.tech/articles/what-is-rag-in-ai/ - Understanding the Wikipedia Dump - DEV Community文章,深入解析维基百科转储的XML结构
https://dev.to/tobiasjc/understanding-the-wikipedia-dump-11f1 - LumberChunker: Long-Form Narrative Document Segmentation - 2024年发表的关于长文档分割的研究论文
https://arxiv.org/html/2406.17526v1 - Mastering Document Chunking for RAG - Medium技术文章,全面介绍文档切分的各种策略
https://medium.com/@sahin.samia/mastering-document-chunking-strategies-for-retrieval-augmented-generation-rag-c9c16785efc7 - From Fixed-Size to NLP Chunking - 深入探讨文本切分技术的演进
https://safjan.com/from-fixed-size-to-nlp-chunking-a-deep-dive-into-text-chunking-techniques/ - Vector Databases for Enterprise Knowledge Base - Shakudo技术博客,介绍企业级知识库的构建
https://www.shakudo.io/blog/talking-to-your-data-at-scale - Amazon Bedrock Knowledge Bases - AWS官方文档,介绍向量数据库在RAG中的应用
https://aws.amazon.com/blogs/machine-learning/dive-deep-into-vector-data-stores-using-amazon-bedrock-knowledge-bases/ - Microsoft Generative AI for Beginners - 微软开源的生成式AI学习资源,包含RAG和向量数据库的详细介绍
https://github.com/microsoft/generative-ai-for-beginners
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐

所有评论(0)