RAG_文本解析&分片

文本解析方式对比

    1. Pymupdf
    1. Mineru
    1. Markitdown
    1. PaddleOCR

"""
文档解析工具对比:Markitdown vs PyMuPDF vs MinerU vs PaddleOCR (PP-Structure)

适用场景:将 PDF(尤其是中文技术文档)转换为结构化 Markdown,用于 RAG 等下游任务。
"""

# ==============================================================================
# 1. Markitdown
# ------------------------------------------------------------------------------
# ✅ 优点:
#   - 安装简单: pip install markitdown
#   - 支持多格式 (PDF, DOCX, PPTX, HTML)
#   - 输出标准 Markdown
# ❌ 缺点:
#   - 表格识别弱(依赖 PDF 文本结构)
#   - 无语义理解(无法区分标题/普通文本)
#   - 不支持 OCR(扫描版 PDF 无效)
# 🎯 适用: 快速转换可复制文本 PDF 为 Markdown
# =============================================================================
# ==============================================================================
# 2. PyMuPDF (fitz)
# ------------------------------------------------------------------------------
# ✅ 优点:
#   - 极快、轻量、稳定
#   - 精确控制文本位置、字体、图像
#   - 可提取元数据、注释
# ❌ 缺点:
#   - 无结构化能力(返回文本流)
#   - 表格支持弱(需手动拼接)
#   - 无 OCR(扫描 PDF 无效)
# 🎯 适用: 底层文本/图像提取,自研 pipeline 基础组件
# ==============================================================================
# ==============================================================================
# 3. MinerU (ModelScope)
# ------------------------------------------------------------------------------
# ✅ 优点:
#   - 端到端结构化(标题/段落/表格/公式/图像)
#   - 公式支持(LaTeX 格式)
#   - 标题带编号(如 # 8.2.1)
#   - Apple Silicon 友好(自动 Metal GPU 加速)
# ❌ 缺点:
#   - 闭源模型(需从 ModelScope 下载)
#   - 依赖 GPU(CPU 推理极慢)
#   - PDF 需先转图像(如 pdf2image)
# 🎯 适用: 高质量中文技术文档解析(白皮书、论文)
# ==============================================================================
# ==============================================================================
# 4. PaddleOCR (PP-StructureV3)
# ------------------------------------------------------------------------------
# ✅ 优点:
#   - 完全开源(Apache 2.0,可商用)
#   - 内置 OCR(支持扫描 PDF)
#   - 表格识别强(输出 HTML → Markdown)
#   - Apple Silicon 支持(Metal 后端)
# ❌ 缺点:
#   - 安装复杂(需 PaddlePaddle)
#   - 无公式识别
#   - 标题识别依赖后处理规则
# 🎯 适用: 开源项目首选,需处理扫描件
# ==============================================================================
# ==============================================================================
# 选型建议总结
# ------------------------------------------------------------------------------
# - 追求效果 & 有 GPU → MinerU
# - 开源 + 扫描 PDF → PaddleOCR
# - 简单文本提取 → PyMuPDF
# - 快速原型 → Markitdown
# ==============================================================================

1. Pymupdf

1.1 文本解析

"""
生产级 PDF 到 Markdown 转换脚本
使用 PyMuPDF4LLM,专为 RAG/Embedding 前处理设计。
"""

import fitz  # PyMuPDF
from pymupdf4llm import to_markdown

def main():
    # --- 硬编码的输入输出路径 ---
    input_pdf = "/Users/ /Projects/my_project_text/datas/2025人工智能发展白皮书.pdf"
    output_file = "/Users/*****/Projects/my_project_text/datas/md_data/025人工智能发展白皮书.md"
    
    try:
        # 使用 PyMuPDF4LLM 的 to_markdown 函数进行转换
        # 该函数会智能处理文本、表格,并为图片生成占位符和描述
        md_text = to_markdown(input_pdf)
        
        # 将结果写入文件
        with open(output_file, "w", encoding="utf-8") as f:
            f.write(md_text)
            
        print(f"✅ PDF 已成功转换为 Markdown: {output_file}")
        
    except Exception as e:
        print(f"❌ 处理 PDF 时发生错误: {e}")
        raise

if __name__ == "__main__":
    main()

1.2 文本切片

"""
将 Markdown 文件按标题结构分片,并输出为单个 JSON 文件。
每个分片(chunk)的 `text` 字段包含其完整的标题上下文路径(如 "# 第一章\n\n# 1.1 背景\n\n..."),
确保即使脱离原始文档,该 chunk 仍具备语义独立性和可理解性。
"""

import os
import re
import json


def split_markdown_to_chunks_with_header_in_text(
    md_text: str, 
    max_chunk_size: int = 2048, 
    chunk_overlap: int = 100
) -> list[dict]:
    """
    将 Markdown 文本按标题层级切分为语义完整的 chunks,并在每个 chunk 的 text 前插入其完整标题路径。

    Args:
        md_text (str): 原始 Markdown 文本内容。
        max_chunk_size (int): 单个 chunk 的最大字符长度(默认 2048,约 512 tokens)。
        chunk_overlap (int): 相邻 chunks 之间的重叠字符数(用于保持上下文连贯性,默认 100)。

    Returns:
        list[dict]: 每个元素为一个 chunk 字典,包含:
            - "text": 包含完整标题前缀的文本内容(语义独立)
            - "title_path": 标题路径字符串,如 "第一章 / 1.1 背景"
            - "token_est": 粗略 token 估算(按 1 token ≈ 4 字符)
    """
    # 第一步:按 Markdown 标题结构解析文档,得到各段落及其所属标题路径
    sections = _split_by_headers(md_text)
    all_chunks = []

    for sec in sections:
        raw_content = sec["content"]          # 当前段落的纯内容(不含标题)
        title_path = sec["title_path"]        # 标题路径字符串,如 "第一章 / 1.2 方法"
        headers_list = sec["headers_list"]    # 标题列表,如 ["第一章", "1.2 方法"]

        # 构建标题前缀:将 headers_list 转换为 Markdown 标题格式
        # 例如:["第一章", "1.1 背景"] → "# 第一章\n\n# 1.1 背景\n\n"
        header_prefix = ""
        for h in headers_list:
            if h.strip():  # 忽略空标题(理论上不应出现,但防御性处理)
                header_prefix += f"# {h}\n\n"

        # 将标题前缀拼接到内容前,形成语义完整的文本块
        full_text_with_header = header_prefix + raw_content.strip()

        # 若整体未超限,直接作为一个 chunk
        if len(full_text_with_header) <= max_chunk_size:
            all_chunks.append({
                "text": full_text_with_header,
                "title_path": title_path,
                "token_est": len(full_text_with_header) // 4  # 粗略估算 token 数
            })
        else:
            # 若超限,则对带标题的文本进行智能分片(保留语义边界)
            sub_texts = _smart_split(full_text_with_header, max_chunk_size, chunk_overlap)
            for txt in sub_texts:
                # 注意:_smart_split 已处理分片逻辑,此处直接使用分片结果
                # 所有子 chunk 共享相同的 title_path(因来自同一标题段落)
                all_chunks.append({
                    "text": txt,
                    "title_path": title_path,
                    "token_est": len(txt) // 4
                })

    return all_chunks


def _split_by_headers(text: str) -> list[dict]:
    """
    解析 Markdown 文本中的标题(# 至 ######),按标题层级组织内容段落。

    逻辑说明:
    - 遇到新标题时,将上一个标题到当前标题之间的内容归入上一个标题上下文。
    - 使用字典 `current_headers` 维护当前有效的标题栈(key: 层级, value: 标题文本)。
    - 标题层级下降时(如从 ### 回到 ##),自动清除更深的子标题。

    Returns:
        list[dict]: 每个元素代表一个“标题-内容”段落,包含:
            - "content": 该标题下的正文内容(不含任何标题行)
            - "title_path": 标题路径字符串(如 "引言 / 1.1 背景")
            - "headers_list": 从根到当前的标题列表(按层级排序)
    """
    # 匹配所有 Markdown 标题行:如 "# 标题"、"## 子标题"
    header_pattern = re.compile(r'^(#{1,6})\s*(.+)$', re.MULTILINE)
    matches = list(header_pattern.finditer(text))
    
    # 若全文无标题,则整个文档视为一个无标题段落
    if not matches:
        return [{"content": text.strip(), "title_path": "", "headers_list": []}]
    
    sections = []
    current_headers = {}  # 当前有效的标题栈:{1: "第一章", 2: "1.1 背景", ...}
    last_end = 0          # 上一个标题结束位置(用于截取内容)

    for m in matches:
        start, end = m.span()      # 标题在原文中的起止位置
        level = len(m.group(1))    # 标题层级(1~6)
        title = m.group(2).strip() # 标题文本

        # 提取上一个标题到当前标题之间的内容(即上一段正文)
        content = text[last_end:start].strip()
        if content:
            # 按层级顺序(1,2,3...)提取当前有效的标题列表
            ordered_headers = [current_headers[i] for i in sorted(current_headers)]
            title_path = " / ".join(ordered_headers)
            sections.append({
                "content": content,
                "title_path": title_path,
                "headers_list": ordered_headers
            })
        
        # 更新标题栈:设置当前层级标题,并清除更深的子标题
        current_headers[level] = title
        # 删除所有比当前层级更深的标题(如当前是 ##,则删除 ### 及以下)
        for l in list(current_headers.keys()):
            if l > level:
                del current_headers[l]
        
        last_end = end  # 更新上一个标题结束位置

    # 处理文档末尾:最后一个标题之后的内容
    final_content = text[last_end:].strip()
    if final_content:
        ordered_headers = [current_headers[i] for i in sorted(current_headers)]
        title_path = " / ".join(ordered_headers)
        sections.append({
            "content": final_content,
            "title_path": title_path,
            "headers_list": ordered_headers
        })
    
    return sections


def _smart_split(text: str, max_size: int, overlap: int) -> list[str]:
    """
    对长文本进行智能分片,优先在语义边界(如段落、句子)处切割,避免在单词中间断开。

    策略:
    1. 按优先级尝试不同分隔符:段落 > 换行 > 句号 > 逗号 > 空格
    2. 若某分隔符存在,则按其分割并尝试组合成不超过 max_size 的块
    3. 若组合后仍超限,则递归处理
    4. 若无合适分隔符,则强制按字符切分(最后手段)

    Args:
        text (str): 待分片的长文本
        max_size (int): 单个 chunk 最大长度
        overlap (int): 相邻 chunks 的重叠长度(用于上下文衔接)

    Returns:
        list[str]: 分片后的文本列表
    """
    if len(text) <= max_size:
        return [text]
    
    # 分隔符优先级:从语义粒度大到小
    separators = ["\n\n", "\n", "。", "!", "?", ". ", "; ", ", ", " "]
    
    for sep in separators:
        if sep in text:
            parts = text.split(sep)
            chunks = []
            current = ""  # 当前正在构建的 chunk

            for part in parts:
                # 尝试将当前 part 加入 current(注意:首次不加 sep)
                candidate = current + (sep if current else "") + part
                
                if len(candidate) <= max_size:
                    current = candidate
                else:
                    # 当前 chunk 已满,保存并开启新 chunk
                    if current:
                        chunks.append(current)
                        # 计算重叠部分:从 current 末尾取 overlap 长度作为新 chunk 起始
                        overlap_start = max(0, len(current) - overlap)
                        current = current[overlap_start:] + sep + part
                    else:
                        # 极端情况:单个 part 就超限,直接截断
                        current = part[:max_size]
                        chunks.append(current)
                        current = part[max_size:]  # 剩余部分继续处理
            
            # 添加最后一个未保存的 chunk
            if current:
                chunks.append(current)
            
            # 递归处理仍超限的 chunk(例如某段落极长)
            final_chunks = []
            for c in chunks:
                if len(c) > max_size:
                    final_chunks.extend(_smart_split(c, max_size, overlap))
                else:
                    final_chunks.append(c)
            return final_chunks
    
    # 万不得已:按字符强制切分(保留 overlap)
    return [text[i:i + max_size] for i in range(0, len(text), max_size - overlap)]


# --- 主程序:读取 Markdown 文件,分片后输出为单个 JSON ---
def main():
    """
    主流程:
    1. 读取指定路径的 Markdown 文件
    2. 调用分片函数生成带标题上下文的 chunks
    3. 将结果写入同名 .json 文件
    4. 打印统计信息和示例
    """
    # 硬编码输入输出路径(实际项目中建议通过命令行参数或配置文件指定)
    md_file = "/Users/*****/Projects/my_project_text/datas/md_data/空间智能研究报告.md"
    output_json = "/Users/*****/Projects/my_project_text/datas/md_data/空间智能研究报告.json"
    
    # 确保输出目录存在
    os.makedirs(os.path.dirname(output_json), exist_ok=True)
    
    # 读取 Markdown 文件
    with open(md_file, "r", encoding="utf-8") as f:
        md_text = f.read()
    
    # 执行分片(默认 2048 字符上限,100 字符重叠)
    chunks = split_markdown_to_chunks_with_header_in_text(
        md_text,
        max_chunk_size=2048,
        chunk_overlap=100
    )
    
    # 写入 JSON 文件(格式化缩进,支持中文)
    with open(output_json, "w", encoding="utf-8") as f:
        json.dump(chunks, f, ensure_ascii=False, indent=2)
    
    # 打印执行结果
    print(f"✅ 分片完成!共 {len(chunks)} 个 chunks")
    print(f"✅ 已保存至: {output_json}")
    print("\n📄 示例 chunk:")
    if chunks:
        ex = chunks[0]
        print(f"  title_path: {ex['title_path']}")
        print(f"  token_est: {ex['token_est']}")
        print(f"  text preview:\n{ex['text'][:200]}...")


if __name__ == "__main__":
    main()

2. Mineru

2.1 下载

https://mineru.net/
https://www.modelscope.cn/models/OpenDataLab/MinerU2.5-2509-1.2B

在这里插入图片描述

2.2 文本解析

"""
使用 MinerU 2.5 视觉语言模型(基于 Qwen2-VL)对 PDF 或图像文档进行结构化内容提取,
并输出为 Markdown 格式,同时保留标题、段落、表格、公式等语义结构。

支持输入格式:
  - PDF(自动转为图像,DPI=200)
  - 图像:.jpg, .jpeg, .png, .bmp

输出:
  - 1. 结构化 Markdown 文件(带标题层级、表格转为 Markdown 表格、公式用 $...$ 包裹)
  - 2. 原始提取结果 JSON(用于调试和分析)

依赖:
  - modelscope(加载 Qwen2-VL 模型)
  - pdf2image(PDF 转图像)
  - Pillow(图像处理)
  - BeautifulSoup(HTML 表格解析)
  - 自定义模块:mineru_vl_utils(封装 MinerU 推理接口)
"""

import os
import re
import json
from pathlib import Path

# 视觉语言模型相关
from modelscope import AutoProcessor, Qwen2VLForConditionalGeneration
from PIL import Image
from mineru_vl_utils import MinerUClient  # 自定义封装类,简化调用

# 文档处理
from pdf2image import convert_from_path  # 将 PDF 转为 PIL Image 列表

# HTML 解析(用于表格转换)
from bs4 import BeautifulSoup


# ==================== 配置区 ====================
# 模型本地路径(需提前下载)
LOCAL_MODEL_PATH = "/Users/*****/Projects/my_project_text/model_path/MinerU2.5-2509-1.2B"

# 输入文档路径(支持 PDF 或图像)
INPUT_DOC_PATH = "/Users/*****/Projects/my_project_text/datas/AI原生应用架构白皮书_part.pdf"


# ==================== 模型加载 ====================
def load_model():
    """
    加载本地 MinerU 2.5 视觉语言模型(基于 Qwen2-VL)。
    
    使用 `device_map="auto"` 自动分配 GPU/CPU,`dtype="auto"` 自动选择精度(如 bfloat16)。
    
    Returns:
        MinerUClient: 封装后的推理客户端,提供 high-level API(如 two_step_extract)。
    """
    print("🔍 Loading MinerU model...")
    model = Qwen2VLForConditionalGeneration.from_pretrained(
        LOCAL_MODEL_PATH,
        dtype="auto",          # 自动选择最佳数据类型(如 torch.bfloat16)
        device_map="auto"      # 自动分配设备(多卡/单卡/CPU)
    )
    processor = AutoProcessor.from_pretrained(
        LOCAL_MODEL_PATH,
        use_fast=True          # 使用快速 tokenizer(若可用)
    )
    # 使用自定义封装客户端,隐藏底层细节
    client = MinerUClient(backend="transformers", model=model, processor=processor)
    return client


# ==================== 工具函数:HTML 表格 → Markdown 表格 ====================
def html_table_to_markdown(html_str):
    """
    将 MinerU 输出的 HTML 表格字符串转换为标准 Markdown 表格。
    
    示例:
        输入: <table><tr><th>A</th><th>B</th></tr><tr><td>1</td><td>2</td></tr></table>
        输出: A | B\n---|---\n1 | 2
    
    Args:
        html_str (str): 包含 <table> 的 HTML 字符串。
    
    Returns:
        str: 转换后的 Markdown 表格,若解析失败则原样返回输入。
    """
    try:
        soup = BeautifulSoup(html_str, "html.parser")
        table = soup.find("table")
        if not table:
            return html_str  # 非表格内容,直接返回

        rows = []
        # 遍历所有 <tr> 行
        for tr in table.find_all("tr"):
            cells = []
            # 遍历 <td> 或 <th> 单元格
            for td in tr.find_all(["td", "th"]):
                # 提取纯文本并压缩空白字符
                cell_text = " ".join(td.stripped_strings).replace("\n", " ")
                cells.append(cell_text)
            if cells:  # 忽略空行
                rows.append(cells)

        if not rows:
            return ""

        # 构建 Markdown 表格
        header = rows[0]
        markdown_lines = [" | ".join(header)]
        # 分隔线:--- | ---
        markdown_lines.append(" | ".join(["---"] * len(header)))
        # 数据行
        for row in rows[1:]:
            # 补齐列数(防止错位)
            while len(row) < len(header):
                row.append("")
            markdown_lines.append(" | ".join(row))
        return "\n".join(markdown_lines)
    except Exception as e:
        print(f"⚠️  Table conversion error: {e}")
        return html_str  # 出错时回退到原始 HTML


# ==================== 工具函数:文本清洗 ====================
def clean_text(text):
    """
    清洗文本:将连续空白字符(空格、换行、制表符等)压缩为单个空格,并去除首尾空白。
    
    Args:
        text (str): 原始文本。
    
    Returns:
        str: 清洗后的文本。
    """
    text = re.sub(r"\s+", " ", text).strip()
    return text


# ==================== 文档转图像 ====================
def doc_to_images(doc_path):
    """
    将输入文档(PDF 或图像)统一转换为 PIL Image 列表。
    
    - 图像文件:直接加载为 RGB 图像。
    - PDF 文件:使用 pdf2image 转为多页图像(DPI=200,平衡清晰度与性能)。
    
    Args:
        doc_path (str or Path): 输入文件路径。
    
    Returns:
        List[PIL.Image.Image]: 图像列表,每页一个 Image 对象。
    
    Raises:
        FileNotFoundError: 文件不存在。
        ValueError: 不支持的文件格式。
    """
    doc_path = Path(doc_path)
    if not doc_path.exists():
        raise FileNotFoundError(f"Input file not found: {doc_path}")

    suffix = doc_path.suffix.lower()
    if suffix in {'.jpg', '.jpeg', '.png', '.bmp'}:
        print(f"🖼️  Loading image: {doc_path.name}")
        return [Image.open(doc_path).convert("RGB")]
    elif suffix == '.pdf':
        print(f"📄 Converting PDF to images (DPI=200): {doc_path.name}")
        images = convert_from_path(str(doc_path), dpi=200)
        print(f"✅ Converted to {len(images)} page(s)")
        return images
    else:
        raise ValueError(f"Unsupported format: {suffix}. Only PDF and images (jpg/png/bmp) allowed.")


# ==================== Block 转 Markdown 行 ====================
def block_to_md(block):
    """
    将 MinerU 提取的单个 block(字典)转换为对应的 Markdown 片段。
    
    支持类型:
      - "title"     → ## 标题
      - "text"/"paragraph" → 普通段落(清洗后)
      - "table"     → Markdown 表格(通过 html_table_to_markdown)
      - "formula"   → 行内公式 $...$
    
    Args:
        block (dict): 包含 "type" 和 "text" 字段的结构化块。
    
    Returns:
        str: 对应的 Markdown 字符串(可能为空)。
    """
    block_type = block.get("type", "text").lower()
    text = block.get("text", "").strip()
    if not text:
        return ""

    if block_type == "table":
        # 表格:前后加换行,确保独立成块
        return "\n" + html_table_to_markdown(text) + "\n"

    if block_type == "formula":
        # 公式:若含 LaTeX 特殊字符,则用 $...$ 包裹;否则视为普通文本
        if re.search(r"[\\_{}]", text):
            return f" ${text}$ "
        else:
            return f" {text} "

    if block_type == "title":
        # 标题:统一用二级标题(##),便于后续分片处理
        return f"\n## {clean_text(text)}\n"

    if block_type in ("text", "paragraph"):
        return clean_text(text)

    # 兜底:未知类型也清洗后返回
    return clean_text(text)


# ==================== 主处理流程 ====================
def main():
    """
    主执行流程:
      1. 加载输入文档(PDF/图像)→ 转为图像列表
      2. 加载 MinerU 模型
      3. 逐页调用 two_step_extract 提取结构化 blocks
      4. 将 blocks 转为 Markdown
      5. 保存 Markdown 和原始 JSON(用于调试)
    """
    print(f"🚀 Processing: {INPUT_DOC_PATH}")
    images = doc_to_images(INPUT_DOC_PATH)

    client = load_model()

    full_markdown = []          # 最终 Markdown 内容
    raw_data_for_debug = []     # 原始提取结果(用于调试)

    for i, image in enumerate(images):
        print(f"🧠 Processing page {i + 1}/{len(images)}...")
        # 调用 MinerU 的两阶段提取(布局分析 + 内容识别)
        blocks = client.two_step_extract(image)
        raw_data_for_debug.append({"page": i + 1, "blocks": blocks})

        # 将当前页所有 blocks 转为 Markdown 行
        page_md = []
        for block in blocks:
            md_line = block_to_md(block)
            if md_line.strip():  # 忽略空行
                page_md.append(md_line)
        
        # 若当前页有内容,则加入全文,并添加页面分隔符
        if page_md:
            full_markdown.append("\n".join(page_md))
            full_markdown.append("\n\n---\n\n")  # Markdown 水平线作为页分隔

    # 合并所有页面内容,并去除首尾空白
    final_md = "".join(full_markdown).strip()

    # 保存 Markdown 文件(同名 + _mineru.md)
    output_md_path = Path(INPUT_DOC_PATH).stem + "_mineru.md"
    with open(output_md_path, "w", encoding="utf-8") as f:
        f.write(final_md)
    print(f"\n✅ Markdown saved to: {output_md_path}")

    # 保存原始提取结果(JSON 格式,便于分析模型输出)
    with open("extracted_raw.json", "w", encoding="utf-8") as f:
        json.dump(raw_data_for_debug, f, ensure_ascii=False, indent=2)
    print("💾 Raw extraction result saved to: extracted_raw.json")


# ==================== 脚本入口 ====================
if __name__ == "__main__":
    main()

2.3 文本分片

"""
智能分片器(Smart Chunker)——专为 MinerU 提取的 Markdown 文档设计

目标:
  将结构化 Markdown(含编号标题如 `# 8.2.1 中兴通讯`)按 **三级标题(h3)** 切分为语义独立的 chunks。
  每个 chunk 自动继承其最近的有效一级(h1)和二级(h2)标题,形成完整上下文。

特点:
  ✅ 仅在出现三级标题(或更深)时才触发分片
  ✅ 保留原始编号(如 "8.2.1"),不删除、不重写
  ✅ 标题路径(title_path)用于展示,内容(text)保留原始 Markdown 格式
  ✅ 自动提取标题前的“前言”内容(如封面、摘要)
  ⚠️ 假设标题顺序正确(MinerU 输出通常满足此条件)

输出格式:
  每个 chunk 为 dict,包含:
    - "text": 完整上下文文本(文件名 + h1 + h2 + h3 + 内容)
    - "title_path": 可读路径,如 "白皮书 / 8 第8章 / 8.2 基础层 / 8.2.1 中兴通讯"
    - "token_est": 粗略 token 估算(字符数 // 4)
"""

import re
import json
from pathlib import Path
from typing import List, Dict


def parse_markdown_to_chunks(md_text: str, file_name: str) -> List[Dict]:
    """
    将 MinerU 生成的 Markdown 文本按三级标题分片,构建带完整标题上下文的 chunks。

    设计假设:
      - 标题编号格式为:`# <数字>[.<数字>]* <标题文本>`,如 `# 8.2.1 中兴通讯`
      - 标题层级由编号中的点数决定(8 → h1, 8.2 → h2, 8.2.1 → h3)
      - 文档结构顺序正确(即 h2 出现在 h1 之后,h3 出现在 h2 之后)

    Args:
        md_text (str): 原始 Markdown 文本。
        file_name (str): 源文件名(不含扩展名),用于上下文标识。

    Returns:
        List[Dict]: 分片结果列表,每个元素包含 text、title_path、token_est。
    """
    lines = md_text.split('\n')
    chunks = []

    # 当前标题上下文(仅保留最近出现的有效标题)
    current_h1 = ""   # 一级标题行,如 "# 8 第8章 基础层"
    current_h2 = ""   # 二级标题行,如 "# 8.2 基础层"
    current_h3 = ""   # 三级标题行,如 "# 8.2.1 中兴通讯"
    current_content = []  # 当前三级标题下的正文内容

    # ==================== 第一步:提取标题前的“前言”内容 ====================
    # 例如:封面、版权声明、摘要等无标题段落
    pre_lines = []
    i = 0
    while i < len(lines):
        line = lines[i]
        # 一旦遇到 Markdown 一级标题(# ...),即认为正文开始
        if re.match(r'^#\s+.+', line):
            break
        # 保留非空行,或已有内容时的空行(保持段落结构)
        if line.strip() or pre_lines:
            pre_lines.append(line)
        i += 1

    # 若存在前言内容,则作为一个独立 chunk
    if pre_lines:
        # 将文件名作为“虚拟根标题”加入上下文
        full_pre_text = "\n\n".join([file_name] + pre_lines).strip() or file_name
        chunks.append({
            "text": full_pre_text,
            "title_path": f"{file_name} / 前言",
            "token_est": len(full_pre_text) // 4
        })

    # ==================== 辅助函数:解析标题编号与格式化显示 ====================
    def extract_number(line: str) -> str:
        """
        从标题行提取编号部分(如 "# 8.2.1 XXX" → "8.2.1")。
        若无编号(如 "# 引言"),返回空字符串。
        """
        m = re.match(r'^#\s+(\d+(?:\.\d+)*)', line)
        return m.group(1) if m else ""

    def format_display(line: str) -> str:
        """
        将原始标题行转换为可读显示格式,保留编号并规范化空格。
        
        示例:
          输入: "# 8.2.1中兴通讯" → 输出: "8.2.1 中兴通讯"
          输入: "# 8 第8章"      → 输出: "8 第8章"
        """
        num = extract_number(line)
        if num:
            rest = line[2:].strip()  # 去掉 "# "
            # 如果剩余部分以编号开头(常见情况),则分离编号与标题文本
            if rest.startswith(num):
                title_part = rest[len(num):].strip()
                return f"{num} {title_part}" if title_part else num
            else:
                # 编号与标题间无空格,但仍保留编号前缀
                return f"{num} {rest}"
        # 无编号标题:直接返回标题文本
        return line[2:].strip()

    # ==================== 第二步:主解析循环 ====================
    for line in lines[i:]:
        stripped = line.rstrip()
        # 空行处理:仅当已进入三级标题上下文时才保留(维持段落结构)
        if not stripped:
            if current_h3:
                current_content.append(line)
            continue

        # 检测是否为标题行(以 "# " 开头)
        if re.match(r'^#\s+.+', line):
            num = extract_number(line)
            # 根据编号中的点数判断层级(8 → 1级, 8.2 → 2级, 8.2.1 → 3级)
            depth = num.count('.') + 1 if num else 1

            # ====== 触发:保存当前三级标题 chunk ======
            if current_h3:
                # 构建完整文本:文件名 + h1 + h2 + h3 + 内容
                header = [file_name]
                if current_h1: header.append(current_h1)
                if current_h2: header.append(current_h2)
                header.append(current_h3)
                full_text = "\n\n".join(header + current_content).strip()
                if full_text:
                    # 构建可读 title_path(使用 format_display 规范化显示)
                    path_parts = [file_name]
                    if current_h1: path_parts.append(format_display(current_h1))
                    if current_h2: path_parts.append(format_display(current_h2))
                    path_parts.append(format_display(current_h3))
                    chunks.append({
                        "text": full_text,
                        "title_path": " / ".join(path_parts),
                        "token_est": len(full_text) // 4
                    })
                # 重置内容缓冲区
                current_content = []

            # ====== 更新标题上下文 ======
            if not num:
                # 无编号标题:视为一级标题(如 "# 摘要")
                current_h1 = line
                current_h2 = ""
                current_h3 = ""
            elif depth == 1:
                current_h1 = line
                current_h2 = ""
                current_h3 = ""
            elif depth == 2:
                current_h2 = line
                current_h3 = ""
            elif depth >= 3:
                # 所有三级及以上标题均视为分片单元(h3, h4...)
                current_h3 = line
            # 注意:不校验编号连续性(依赖 MinerU 输出顺序正确)

        else:
            # 普通文本行:仅当处于三级标题上下文时才收集
            if current_h3:
                current_content.append(line)

    # ==================== 第三步:处理最后一个 chunk ====================
    if current_h3:
        header = [file_name]
        if current_h1: header.append(current_h1)
        if current_h2: header.append(current_h2)
        header.append(current_h3)
        full_text = "\n\n".join(header + current_content).strip()
        if full_text:
            path_parts = [file_name]
            if current_h1: path_parts.append(format_display(current_h1))
            if current_h2: path_parts.append(format_display(current_h2))
            path_parts.append(format_display(current_h3))
            chunks.append({
                "text": full_text,
                "title_path": " / ".join(path_parts),
                "token_est": len(full_text) // 4
            })

    return chunks


def chunk_file(file_path: str, output_json: str = None) -> List[Dict]:
    """
    文件级入口函数:读取 Markdown 文件,执行分片,并可选保存为 JSON。

    Args:
        file_path (str): 输入 Markdown 文件路径。
        output_json (str, optional): 输出 JSON 路径。若提供,则保存结果。

    Returns:
        List[Dict]: 分片结果列表。
    """
    file_path_obj = Path(file_path)
    with open(file_path_obj, 'r', encoding='utf-8') as f:
        text = f.read()
    file_name = file_path_obj.stem  # 例如 "白皮书"(不含 .md)
    chunks = parse_markdown_to_chunks(text, file_name)
    
    if output_json:
        # 确保输出目录存在
        Path(output_json).parent.mkdir(parents=True, exist_ok=True)
        with open(output_json, 'w', encoding='utf-8') as f:
            json.dump(chunks, f, ensure_ascii=False, indent=2)
        print(f"✅ 分片完成: {len(chunks)} 个 chunks -> {output_json}")
    return chunks


# === 开发调试入口 ===
if __name__ == "__main__":
    """
    本地调试用例:指定输入 Markdown 和输出 JSON 路径,运行分片流程。
    实际部署时,建议通过 CLI 或 API 调用 `chunk_file`。
    """
    from pathlib import Path

    # 配置你的路径(开发时硬编码,生产环境应通过参数传入)
    md_file = Path("/Users/*****/Projects/my_project_text/RAG_PRO/raw_datas/《2025人工智能发展白皮书》_MinerU__20251003063221.md")
    output_dir = Path("/Users/*****/Projects/my_project_text/RAG_PRO/chunk_datas/")
    out_file = output_dir / f"{md_file.stem}_chunks.json"

    # 执行分片并保存
    chunk_file(str(md_file), str(out_file))

3. Markitdown

3.1 文本解析

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from markitdown import MarkItDown
import os

# result = markitdown.convert("https://arxiv.org/pdf/2305.10401.pdf")

# === 配置区 ===
PDF_PATH = r"/Users/*****/Projects/my_project_text/datas/核心智慧与行动指南.pdf"          # 替换成你的 PDF 文件名
OUTPUT_MD = "output_pdf.md"            # 输出的 Markdown 文件名
OUTPUT_DIR = "./markitdown_output" # 用于保存图片等资源

# 确保路径正确(当前目录)
pdf_full_path = os.path.abspath(PDF_PATH)

if not os.path.exists(pdf_full_path):
    print(f"❌ 错误:找不到 PDF 文件 {pdf_full_path}")
    exit(1)

# 初始化 MarkItDown
print("🚀 正在加载 MarkItDown...")
markitdown = MarkItDown()

# 转换 PDF 为 Markdown
print(f"📄 正在处理: {pdf_full_path}")
result = markitdown.convert(
    pdf_full_path,
    output_dir=OUTPUT_DIR  # 图片会保存到这里
)

# 保存 Markdown 内容
with open(OUTPUT_MD, "w", encoding="utf-8") as f:
    f.write(result.text_content)

print(f"✅ 提取完成!")
print(f"📝 Markdown 已保存到: {os.path.abspath(OUTPUT_MD)}")
print(f"🖼️  资源文件夹: {os.path.abspath(OUTPUT_DIR)}")

3.2 fastapi

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# http://localhost:8101/docs
# port=8101
"""
FastAPI 服务:使用 markitdown 将多种文档格式转换为 Markdown

支持的格式包括:
  - PDF (.pdf)
  - Word (.doc, .docx)
  - PowerPoint (.ppt, .pptx)
  - Excel (.xls, .xlsx)
  - HTML (.html, .htm)
  - Text (.txt)
  - Markdown (.md) —— 透传或简单处理

注意:markitdown 依赖底层工具(如 LibreOffice、pdf2image、pandoc 等),
请确保系统已安装必要依赖。
"""

from fastapi import FastAPI, File, UploadFile, HTTPException, status
from fastapi.responses import JSONResponse
from markitdown import MarkItDown
import os
import tempfile
import shutil
from pathlib import Path

# 支持的文件扩展名(小写)
SUPPORTED_EXTENSIONS = {
    ".pdf", ".doc", ".docx",
    ".ppt", ".pptx",
    ".xls", ".xlsx",
    ".html", ".htm",
    ".txt", ".md"
}

app = FastAPI(
    title="Universal Document to Markdown Converter",
    description="使用 markitdown 将多种文档格式(PDF/Word/PPT/Excel/HTML等)转换为 Markdown",
    version="1.1.0"
)

# 默认资源输出目录
DEFAULT_OUTPUT_DIR = "./markitdown_output"
os.makedirs(DEFAULT_OUTPUT_DIR, exist_ok=True)

@app.post("/convert", summary="上传文档并转换为 Markdown")
async def convert_document_to_markdown(
    document_file: UploadFile = File(..., description="支持 PDF、Word、PPT、Excel、HTML、TXT、MD 等格式")
):
    """
    接收任意支持的文档文件,使用 markitdown 转换为 Markdown。

    支持格式:
      - PDF: .pdf
      - Word: .doc, .docx
      - PowerPoint: .ppt, .pptx
      - Excel: .xls, .xlsx
      - Web: .html, .htm
      - Text: .txt, .md

    资源(如图片、嵌入对象)将保存到 resource_dir。
    """
    # 获取文件扩展名(转为小写)
    file_ext = Path(document_file.filename).suffix.lower()

    # 检查是否支持该格式
    if file_ext not in SUPPORTED_EXTENSIONS:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"不支持的文件格式: '{file_ext}'。支持的格式: {sorted(SUPPORTED_EXTENSIONS)}"
        )

    # 创建临时目录保存上传文件
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_file_path = Path(temp_dir) / document_file.filename

        # 保存上传文件
        try:
            with open(temp_file_path, "wb") as buffer:
                shutil.copyfileobj(document_file.file, buffer)

        except Exception as e:
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"保存上传文件失败: {str(e)}"
            )

        # 初始化并转换
        try:
            markitdown = MarkItDown()
            result = markitdown.convert(
                str(temp_file_path),
                output_dir=DEFAULT_OUTPUT_DIR
            )
        except Exception as e:
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"文档转换失败: {str(e)}"
            )

    # 返回结果
    return JSONResponse(
        content={
            "success": True,
            "filename": document_file.filename,
            "format": file_ext,
            "markdown_content": result.text_content or "",
            "resource_dir": os.path.abspath(DEFAULT_OUTPUT_DIR),
            "message": "文档转换成功!"
        }
    )

@app.get("/health", summary="健康检查")
async def health_check():
    return {"status": "ok", "service": "universal-doc-to-markdown"}

@app.get("/supported-formats", summary="获取支持的文件格式")
async def get_supported_formats():
    """返回当前服务支持的文件扩展名列表"""
    return {
        "supported_formats": sorted(SUPPORTED_EXTENSIONS),
        "count": len(SUPPORTED_EXTENSIONS)
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8101)

4. PaddleOCR

"""
使用 PaddleOCR PP-StructureV3 对 PDF 文档进行结构化解析(标题、段落、表格、图像),
并输出为带图片引用的 Markdown 文件,同时启动本地 HTTP 服务器供浏览器预览。

特点:
  ✅ 自动识别标题层级(章/节/编号)
  ✅ 表格 → Markdown 表格(使用 pandas + markdown)
  ✅ 图像 → 压缩保存 + OCR 辅助识别 + Markdown 引用
  ✅ 启动本地服务器,支持浏览器直接查看带图 Markdown
  ✅ 针对 Apple Silicon(M1/M2/M3)优化:自动启用 Metal GPU 加速

依赖:
  - paddleocr >= 2.7(需支持 PPStructureV3)
  - pandas + tabulate(用于 HTML 表格转 Markdown)
  - Pillow(图像处理)
  - Python 内置 http.server(用于预览)

输出结构:
  output/
  ├── report.md                 # 主 Markdown 文件
  └── <报告名>/
      └── images/
          ├── 0.jpg
          ├── 1.jpg
          └── ...
"""

import os
import re
import webbrowser
from pathlib import Path
from io import BytesIO
import pandas as pd
from PIL import Image
from paddleocr import PPStructureV3, PaddleOCR


# ==================== 配置区 ====================
# 输入 PDF 路径
PDF_PATH = "/Users/*****/Projects/my_project_text/datas/空间智能研究报告.pdf"

# 输出目录(包含 Markdown 和图像)
OUTPUT_DIR = "./output"

# 本地预览服务器端口
SERVER_PORT = 8901
BASE_URL = f"http://localhost:{SERVER_PORT}"
# ==============================================


def compress_and_save_image(pil_img, save_path, quality=75, max_size=(1920, 1080)):
    """
    压缩并保存 PIL 图像,控制文件大小与清晰度平衡。
    
    Args:
        pil_img (PIL.Image): 原始图像对象。
        save_path (str): 保存路径(含文件名)。
        quality (int): JPEG 质量(0-100),默认 75。
        max_size (tuple): 最大尺寸(宽, 高),默认 1920x1080。
    """
    # 按比例缩放(保持宽高比)
    pil_img.thumbnail(max_size, Image.LANCZOS)
    # 保存为 JPEG,启用优化减少体积
    pil_img.save(save_path, "JPEG", quality=quality, optimize=True)


def ocr_single_image(pil_img, ocr_engine):
    """
    对单张图像执行 OCR,提取纯文本内容。
    
    Args:
        pil_img (PIL.Image): 待识别图像。
        ocr_engine (PaddleOCR): 已初始化的 OCR 引擎。
    
    Returns:
        str: 识别出的文本,若无结果则返回空字符串。
    """
    result = ocr_engine.ocr(pil_img, cls=True)  # cls=True 启用方向分类
    if not result or not result[0]:
        return ""
    # 提取每行识别结果的文本(忽略置信度和坐标)
    return " ".join([line[1][0] for line in result[0]]).strip()


def convert_html_table_to_md(html_str):
    """
    将 PPStructure 输出的 HTML 表格字符串转换为 GitHub 风格 Markdown 表格。
    
    使用 pandas.read_html 解析 HTML,再转为 markdown。
    
    Args:
        html_str (str): 包含 <table> 的 HTML 字符串。
    
    Returns:
        str: Markdown 表格,失败时返回原始 HTML(便于调试)。
    """
    try:
        # pandas 自动解析 HTML 表格为 DataFrame
        dfs = pd.read_html(html_str, encoding="utf-8")
        if dfs:
            # 转为 GitHub 风格 Markdown 表格(需安装 tabulate)
            return dfs[0].to_markdown(index=False, tablefmt="github")
    except Exception as e:
        print(f"⚠️ 表格转换失败: {e}")
    return html_str  # 回退到原始 HTML


def detect_heading_level(text):
    """
    根据文本内容智能判断标题层级(1=章,2=节,3=子节)。
    
    规则优先级从高到低:
      1. "第一章" → level 1
      2. "第一节" → level 2
      3. "8.2.1 ..." → level 3
      4. "8.2 ..." → level 2
      5. "8. ..." 或 "A.1 ..." → level 2
      6. 特定关键词(摘要、引言等)→ level 1
      7. 默认 → level 2(保守策略)
    
    Args:
        text (str): 候选标题文本。
    
    Returns:
        int: 标题层级(1, 2, 或 3)。
    """
    s = text.strip()
    if re.match(r"^第[一二三四五六七八九十百]+章", s):
        return 1
    elif re.match(r"^第[一二三四五六七八九十百]+节", s):
        return 2
    elif re.match(r"^\d+\.\d+\.\d+", s):  # 如 8.2.1
        return 3
    elif re.match(r"^\d+\.\d+", s):       # 如 8.2
        return 2
    elif re.match(r"^\d+[\..\s]", s) or re.match(r"^[A-Z]\.\d+", s):
        return 2
    elif s in {"摘要", "引言", "参考文献", "附录"}:
        return 1
    return 2  # 默认视为二级标题


def is_likely_heading(text):
    """
    初步过滤:判断一段文本是否“可能”是标题。
    
    启发式规则:
      - 长度 ≤ 50 字符(排除长段落)
      - 匹配常见标题模式(编号、关键词等)
    
    Args:
        text (str): 待判断文本。
    
    Returns:
        bool: 是否可能为标题。
    """
    s = text.strip()
    if len(s) > 50:
        return False
    patterns = [
        r"^第[一二三四五六七八九十百]+[章节]",          # 第一章、第一节
        r"^\d+(\.\d+)*[\s\.].{2,30}$",                # 8.2.1 标题内容(2-30字)
        r"^[A-Z]\.\d+[\s\.].{2,30}$",                 # A.1 Introduction
        r"^(摘要|引言|目录|结论|参考文献|附录)$"       # 固定关键词
    ]
    return any(re.match(p, s) for p in patterns)


def main():
    """
    主执行流程:
      1. 初始化 PPStructureV3(文档结构分析)和 PaddleOCR(图像 OCR)
      2. 解析 PDF,获取每页的结构化区域(标题/文本/表格/图像)
      3. 按区域类型转换为 Markdown
      4. 保存图像并生成引用链接
      5. 启动本地 HTTP 服务器,浏览器打开预览
    """
    print("🚀 启动 PaddleOCR PPStructureV3(Apple Silicon GPU 加速)...")
    
    # ✅ 初始化 PPStructureV3(自动启用 layout/table/ocr 模块)
    # device="gpu" 在 Apple Silicon 上会自动使用 Metal 后端
    doc_analyzer = PPStructureV3(
        device="gpu",
        lang="ch"  # 中文模型
        # 注意:PPStructureV3 默认已启用布局分析、表格识别、OCR,无需额外参数
    )
    
    # 初始化独立 OCR 引擎(用于图像区域的二次识别)
    img_ocr = PaddleOCR(
        device="gpu",
        lang="ch",
        use_angle_cls=True,        # 启用方向分类(处理旋转文本)
        det_db_box_thresh=0.3,     # 检测框阈值(降低可检出更多小图)
        rec_batch_num=6            # OCR 批处理大小(提升 GPU 利用率)
    )

    # 创建输出目录结构
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    report_name = Path(PDF_PATH).stem  # 如 "空间智能研究报告"
    images_dir = os.path.join(OUTPUT_DIR, report_name, "images")
    os.makedirs(images_dir, exist_ok=True)

    print(f"📄 正在解析: {PDF_PATH}")
    # PPStructureV3 直接支持 PDF 输入,返回每页的结构化结果
    pages = doc_analyzer(PDF_PATH)

    md_content = []  # 累积 Markdown 内容
    img_count = 0    # 图像计数器

    # 遍历每一页
    for page in pages:
        regions = page.get("regions", [])
        # 按垂直位置(y 坐标)排序,确保从上到下解析
        regions.sort(key=lambda r: r.get("bbox", [0, 0, 0, 0])[1])

        # 遍历当前页的每个区域
        for region in regions:
            r_type = region.get("type", "")
            res = region.get("res", [])

            if r_type == "title":
                # 合并多行标题文本
                title_text = "\n".join([line["text"] for line in res])
                if is_likely_heading(title_text):
                    level = detect_heading_level(title_text)
                    md_content.append(f"{'#' * level} {title_text}\n\n")
                else:
                    # 非标题文本(如强调句),用粗体表示
                    md_content.append(f"**{title_text}**\n\n")

            elif r_type == "figure":
                pil_img = region.get("img")  # PIL Image 对象
                if pil_img is None:
                    continue
                # 保存图像
                img_filename = f"{img_count}.jpg"
                img_path = os.path.join(images_dir, img_filename)
                compress_and_save_image(pil_img, img_path, quality=75)
                # 生成相对 URL(供本地服务器访问)
                img_url = f"{BASE_URL}/{report_name}/images/{img_filename}"
                md_content.append(f"![image]({img_url})\n\n")
                # 对图像执行 OCR,辅助可访问性
                img_text = ocr_single_image(pil_img, img_ocr)
                if img_text:
                    md_content.append(f"<!-- 图片 OCR 内容 -->\n{img_text}\n\n")
                img_count += 1

            elif r_type == "table":
                # PPStructure 输出的表格为 HTML 字符串
                html_table = region.get("res", {}).get("html", "")
                if html_table:
                    md_table = convert_html_table_to_md(html_table)
                    md_content.append(f"{md_table}\n\n")

            elif r_type == "text":
                # 普通段落:合并多行
                text = "\n".join([line["text"] for line in res])
                md_content.append(f"{text}\n\n")

        # 每页末尾添加分隔线(便于阅读)
        md_content.append("---\n\n")

    # 保存最终 Markdown
    final_md = os.path.join(OUTPUT_DIR, "report.md")
    with open(final_md, "w", encoding="utf-8") as f:
        f.write("".join(md_content))
    
    print(f"✅ 解析完成!图片: {img_count} 张 | 表格: 已转 Markdown | 标题: 已结构化")
    
    # 启动本地 HTTP 服务器(后台运行)
    print(f"🌐 启动服务器: {BASE_URL}")
    os.system(f"cd '{OUTPUT_DIR}' && python3 -m http.server {SERVER_PORT} &")
    
    # 自动打开浏览器预览
    webbrowser.open(f"{BASE_URL}/report.md")
    print(f"🎉 请在浏览器中查看: {BASE_URL}/report.md")


if __name__ == "__main__":
    main()
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐