一、为什么 RAG 是当前 AI 落地的最佳路径?

1.1 大模型的三大痛点

问题 说明 RAG 如何解决
幻觉(Hallucination) 模型编造不存在的事实 仅基于检索到的真实文档回答
知识滞后 训练数据截止 2024 年 可注入最新企业文档(如 2025 年财报)
私有数据无法使用 不能上传敏感数据到公有云 本地向量库 + 私有部署模型

RAG = Retrieval(检索) + Augmentation(增强) + Generation(生成)

1.2 典型企业场景

  • HR 部门:员工问“年假怎么休?” → 自动从《员工手册》找答案
  • IT 支持:问“如何重置密码?” → 返回内部 Wiki 步骤
  • 销售团队:问“竞品 A 的定价策略?” → 检索市场分析报告

💡 核心价值把沉睡的企业文档变成可对话的知识资产


二、整体架构设计

🔑 关键设计

  • 多租户:每个部门(tenant_id)有独立 collection
  • 流式响应:避免长时间等待
  • 模块解耦:文档处理、检索、生成分离

三、环境准备

3.1 安装依赖

# 创建虚拟环境
python -m venv rag-env
source rag-env/bin/activate  # Linux/Mac
# rag-env\Scripts\activate   # Windows

# 安装核心包
pip install langchain==0.2.0 \
            langchain-community==0.2.0 \
            langchain-core==0.2.0 \
            chromadb==0.4.24 \
            unstructured[local-inference]==0.14.9 \  # PDF/Word 解析
            python-dotenv \
            fastapi \
            uvicorn \
            qwen-agent \  # 阿里官方 SDK
            httpx \
            sse-starlette

⚠️ 注意

  • unstructured 需要额外安装 libmagic(Mac: brew install libmagic
  • 中文分块推荐使用 ChineseRecursiveTextSplitter(后文详解)

3.2 获取 Qwen API Key

  1. 访问 阿里云百炼平台
  2. 创建应用,选择 qwen-max 模型
  3. 获取 API_KEY,写入 .env 文件:
# .env
QWEN_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxx
QWEN_MODEL=qwen-max

四、文档加载与向量化(核心!)

4.1 支持多种格式(PDF/Word/TXT)

# document_loader.py
from langchain_community.document_loaders import (
    PyPDFLoader,
    UnstructuredWordDocumentLoader,
    TextLoader
)
from pathlib import Path

def load_documents(file_path: str) -> list:
    """根据文件后缀加载文档"""
    path = Path(file_path)
    if path.suffix.lower() == ".pdf":
        loader = PyPDFLoader(str(path))
    elif path.suffix.lower() in [".docx", ".doc"]:
        loader = UnstructuredWordDocumentLoader(str(path))
    elif path.suffix.lower() == ".txt":
        loader = TextLoader(str(path), encoding="utf-8")
    else:
        raise ValueError(f"Unsupported file type: {path.suffix}")
    
    return loader.load()

Unstructured 库能较好保留 Word/PDF 的段落结构


4.2 中文文本分块(避免语义断裂)

通用 RecursiveCharacterTextSplitter 对中文效果差!改用:

# text_splitter.py
from langchain_text_splitters import RecursiveCharacterTextSplitter
import re

class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter):
    def __init__(self, *args, **kwargs):
        separators = [
            "\n\n", "\n", "。|!|?", "…", "”", "’", "】", 
            ")", "]", "}", " ", ""
        ]
        super().__init__(*args, separators=separators, **kwargs)

# 使用示例
splitter = ChineseRecursiveTextSplitter(
    chunk_size=500,      # 每块约 500 字符
    chunk_overlap=50,    # 重叠 50 字符,避免断句
    length_function=len
)
chunks = splitter.split_documents(documents)

💡 经验

  • 技术文档:chunk_size=300
  • 合同/法律:chunk_size=800(需完整条款)
  • 重叠部分能显著提升检索召回率!

4.3 向量化并存入 Chroma(带租户隔离)

# vector_store.py
import chromadb
from chromadb.utils import embedding_functions
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings

class TenantAwareChroma:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.client = chromadb.PersistentClient(path=persist_directory)
        # 使用中文 Embedding 模型(无需 API Key)
        self.embedding_func = HuggingFaceEmbeddings(
            model_name="GanymedeNil/text2vec-large-chinese"
        )

    def add_documents(self, tenant_id: str, documents: list):
        """为指定租户添加文档"""
        collection_name = f"kb_{tenant_id}"
        
        # 创建或获取 collection
        collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self._get_embedding_func()
        )
        
        # LangChain 封装(便于后续检索)
        vectorstore = Chroma(
            client=self.client,
            collection_name=collection_name,
            embedding_function=self.embedding_func
        )
        vectorstore.add_documents(documents)
        return vectorstore

    def get_retriever(self, tenant_id: str, top_k: int = 4):
        """获取指定租户的检索器"""
        collection_name = f"kb_{tenant_id}"
        vectorstore = Chroma(
            client=self.client,
            collection_name=collection_name,
            embedding_function=self.embedding_func
        )
        return vectorstore.as_retriever(search_kwargs={"k": top_k})

优势

  • 使用 开源中文 Embedding 模型,无调用成本;
  • 每个租户独立 collection,天然隔离;
  • PersistentClient 支持持久化到磁盘。

五、构建 LangChain RAG Pipeline

5.1 核心链:检索 + 生成

# rag_chain.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from qwen_agent.llm import get_chat_model

def create_rag_chain(retriever, model_name: str = "qwen-max"):
    # 1. 定义 Prompt(中文优化!)
    prompt = ChatPromptTemplate.from_template(
        """你是一个专业的企业知识助手,请根据以下上下文回答问题。
        如果不知道答案,请说“根据现有资料无法回答”,不要编造。

        上下文:
        {context}

        问题:{question}
        """
    )

    # 2. 初始化 Qwen 模型(通过阿里百炼)
    llm = get_chat_model({
        "model": model_name,
        "api_key": os.getenv("QWEN_API_KEY"),
        "model_server": "dashscope"  # 阿里 DashScope
    })

    # 3. 构建链
    rag_chain = (
        {"context": retriever, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )
    return rag_chain

💡 Prompt 设计要点

  • 明确禁止幻觉(“不要编造”);
  • 强调“根据上下文”,约束模型行为;
  • 中文指令更符合 Qwen 训练偏好。

5.2 支持流式响应(SSE)

# stream_rag.py
from sse_starlette.sse import EventSourceResponse
import asyncio

async def stream_rag_answer(rag_chain, question: str):
    """异步流式生成答案"""
    loop = asyncio.get_event_loop()
    
    # 使用迭代器逐步获取 token
    async def event_generator():
        full_response = ""
        async for chunk in rag_chain.astream(question):
            full_response += chunk
            yield {"data": chunk}
        
        # 最后发送结束标记
        yield {"data": "[DONE]"}
    
    return EventSourceResponse(event_generator())

✅ 前端可通过 EventSource 接收流式数据,实现“打字机”效果。


六、FastAPI 后端服务

6.1 核心接口

# main.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional

app = FastAPI(title="企业知识库问答系统")

class QueryRequest(BaseModel):
    tenant_id: str  # 部门ID,如 "hr", "it"
    question: str
    stream: Optional[bool] = False

@app.post("/query")
async def query_knowledge_base(request: QueryRequest):
    try:
        # 1. 获取租户专属检索器
        retriever = vector_store.get_retriever(request.tenant_id)
        
        # 2. 构建 RAG 链
        rag_chain = create_rag_chain(retriever)
        
        # 3. 流式 or 非流式
        if request.stream:
            return await stream_rag_answer(rag_chain, request.question)
        else:
            answer = await rag_chain.ainvoke(request.question)
            return {"answer": answer}
            
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

6.2 文档上传接口(支持多文件)

from fastapi import File, UploadFile
import tempfile
import os

@app.post("/upload/{tenant_id}")
async def upload_documents(tenant_id: str, files: list[UploadFile] = File(...)):
    with tempfile.TemporaryDirectory() as tmpdir:
        documents = []
        for file in files:
            # 保存临时文件
            file_path = os.path.join(tmpdir, file.filename)
            with open(file_path, "wb") as f:
                f.write(await file.read())
            
            # 加载并分块
            docs = load_documents(file_path)
            chunks = chinese_splitter.split_documents(docs)
            documents.extend(chunks)
        
        # 向量化存储
        vector_store.add_documents(tenant_id, documents)
    
    return {"message": f"成功上传 {len(files)} 个文件到 {tenant_id} 知识库"}

七、前端简易界面(Vue3 + SSE)

<!-- App.vue -->
<template>
  <div>
    <h1>企业知识库问答</h1>
    <select v-model="tenantId">
      <option value="hr">人力资源部</option>
      <option value="it">IT支持</option>
    </select>
    
    <input v-model="question" @keyup.enter="ask" placeholder="输入问题..." />
    <button @click="ask">提问</button>
    
    <div class="answer">{{ answer }}</div>
  </div>
</template>

<script setup>
import { ref } from 'vue'

const tenantId = ref('hr')
const question = ref('')
const answer = ref('')

const ask = () => {
  answer.value = ''
  const eventSource = new EventSource(
    `/query?tenant_id=${tenantId.value}&question=${encodeURIComponent(question.value)}&stream=true`
  )
  
  eventSource.onmessage = (event) => {
    if (event.data === '[DONE]') {
      eventSource.close()
    } else {
      answer.value += event.data
    }
  }
}
</script>

✅ 实现 实时流式输出,用户体验接近 ChatGPT。


八、企业级增强功能

8.1 权限控制(JWT 验证)

# auth.py
from fastapi.security import HTTPBearer
from jose import jwt

security = HTTPBearer()

def verify_token(token: str = Depends(security)):
    try:
        payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=["HS256"])
        return payload["tenant_id"]  # 从 token 提取租户ID
    except:
        raise HTTPException(status_code=401, detail="Invalid token")

前端登录后获取 JWT,确保用户只能访问授权租户。


8.2 性能优化:重排序(Rerank)

初始检索可能召回不相关片段,用 bge-reranker 二次排序:

from FlagEmbedding import FlagReranker

reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=True)

def rerank_docs(query: str, docs: list, top_k: int = 3):
    scores = reranker.compute_score([[query, doc.page_content] for doc in docs])
    sorted_docs = [doc for _, doc in sorted(zip(scores, docs), reverse=True)]
    return sorted_docs[:top_k]

📊 实测:Rerank 可将问答准确率提升 15–30%


8.3 缓存高频问题

from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_rag_answer(tenant_id: str, question: str):
    retriever = vector_store.get_retriever(tenant_id)
    rag_chain = create_rag_chain(retriever)
    return rag_chain.invoke(question)

对“年假政策”等高频问题,响应速度从 2s → 50ms。


九、Docker 一键部署

# Dockerfile
FROM python:3.10-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3'
services:
  rag-app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - QWEN_API_KEY=your_key_here
    volumes:
      - ./chroma_db:/app/chroma_db  # 持久化向量库

执行 docker-compose up -d 即可启动完整服务!


十、避坑指南(血泪经验)

问题 解决方案
PDF 表格识别乱码 改用 pdfplumber 替代 PyPDFLoader
Qwen 返回 JSON 格式错误 在 Prompt 中明确要求“纯文本回答”
Chroma 内存占用高 使用 PersistentClient + 定期清理旧 collection
中文分块丢失标题 在 ChineseRecursiveTextSplitter 中加入 \n# 分隔符
流式响应卡顿 增加 asyncio.sleep(0.01) 避免事件循环阻塞

十一、效果演示

用户提问

“实习生转正需要满足哪些条件?”

系统返回(来自《HR 手册.pdf》):

根据《员工转正管理办法》第3.2条,实习生需同时满足:

  1. 试用期满3个月;
  2. 月度考核平均分≥80;
  3. 通过部门转正答辩。
    详细流程请参考 HR 系统 > 入职管理 > 转正申请。

全程无需人工干预,答案可追溯到原文位置


AI 工程化的本质,不是取代人,而是把人从重复劳动中解放出来
你不需要成为算法科学家,只需掌握 LangChain + 向量数据库 + 大模型 API 这一组合拳,就能在 2025 年的技术浪潮中占据先机。

👍 如果你觉得有帮助,欢迎点赞、收藏!
💬 你打算用 RAG 解决什么业务问题?欢迎评论区交流!

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐