参考: https://blog.csdn.net/qq_56591814/article/details/131376763
参考: https://zhuanlan.zhihu.com/p/1934598099780756665

使用langchain构建一个基于本地知识库回答的机器人
关键步骤:

  1. 文档加载,切片
  2. 文档向量化后存入向量数据库
  3. langchain从向量库中获取参考文本,然后组成pormnt,调用大模型回答

文档加载代码:

def load_and_process_data(data_dir: str, chunk_size: int = 512):
    """加载并处理文档数据"""
    # 确保目录存在
    if not os.path.exists(data_dir):
        raise ValueError(f"数据目录 {data_dir} 不存在")

    # 读取文档
    loader = DirectoryLoader(
        data_dir,
        glob="**/*.txt",
        loader_cls=TextLoader,  # 强制使用文本加载器
        loader_kwargs={"encoding": "utf-8"},  # 避免编码错误
        show_progress=True,
    )
    documents = loader.load()

    #
    # 分块处理
    text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=50)
    # 切割加载的 document
    split_docs = text_splitter.split_documents(documents)
    print(
        f"文档分割为 {len(split_docs)} 个片段(单片段最大{chunk_size}字符)", flush=True
    )
    return split_docs

文档向量化后 存入数据库
Pinecone版本: 注意维度和index_name 提前在pinecone上创建


def configure_embedding():
    """配置文本嵌入模型"""
    # 指定了一个预训练的sentence-transformer模型的路径
    # embedding-bge-small-en-v1.5  从魔搭下载
    model_path = r"./llm-model/embedding-bge-small-en-v1.5"
    print(f"[步骤1/3] 加载本地模型:{model_path}")
    model = HuggingFaceEmbeddings(
        model_name=model_path,
        model_kwargs={"device": "cpu"},  # 可指定'cuda'使用GPU(需安装CUDA)
        encode_kwargs={"normalize_embeddings": True},  # 归一化嵌入向量
    )
    print(f"[完成] 模型加载耗时 秒")
    return model

存入向量库

def build_pinecone_index(split_docs):
    embed_model = configure_embedding()
    vectorstore = PineconeVectorStore(
        index_name=INDEX_NAME,
        embedding=embed_model,
        pinecone_api_key=API_KEY,
    )
    vectorstore.add_documents(split_docs)
    print(f"成功向Pinecone添加 {len(split_docs)} 个文档片段")
    return vectorstore

chroma版本
(我windows系统,本地chromadb无法执行下去,连报错都没有就结束了)

import os
import time
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from chromadb.config import Settings  # 新增:用于配置远程连接
from chromadb.api.fastapi import FastAPI


def load_documents(data_dir):
    """加载指定目录下的文档"""
    loader = DirectoryLoader(
        path=data_dir,
        glob="**/*.txt",
        loader_cls=TextLoader,
        loader_kwargs={"encoding": "utf-8"},
    )
    documents = loader.load()
    print(f"成功加载 {len(documents)} 个文档", flush=True)
    return documents


def split_documents(documents, chunk_size=500, chunk_overlap=50):
    """将文档分割为合适长度的片段"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", "。", ",", " ", ""],
    )
    split_docs = text_splitter.split_documents(documents)
    print(
        f"文档分割为 {len(split_docs)} 个片段(单片段最大{chunk_size}字符)", flush=True
    )
    return split_docs


def init_embedding_model(local_model_path=None):
    """初始化嵌入模型(支持本地模型或远程模型)"""
    if local_model_path and os.path.exists(local_model_path):
        print(f"使用本地嵌入模型:{local_model_path}", flush=True)
        embeddings = HuggingFaceEmbeddings(
            model_name=local_model_path,
            model_kwargs={
                "device": "cpu",
                "trust_remote_code": True,
            },
            encode_kwargs={"normalize_embeddings": True},
        )
    else:
        print("使用远程嵌入模型:shibing624/text2vec-base-chinese", flush=True)
        embeddings = HuggingFaceEmbeddings(
            model_name="shibing624/text2vec-base-chinese",
            model_kwargs={"device": "cpu"},
            encode_kwargs={"normalize_embeddings": True},
        )
    print("嵌入模型初始化完成", flush=True)
    return embeddings


def vectorize_and_store(
    split_docs,
    embeddings,
    remote_host="localhost",
    remote_port=8000,
    collection_name="docs_collection",
):
    """通过IP连接远程Chroma服务,向量化并存储文档"""
    print("进入vectorize_and_store函数,开始向量化流程...", flush=True)
    start_time = time.time()

    # 配置远程Chroma服务连接
    chroma_settings = Settings(
        chroma_server_host=remote_host,  # 远程服务器IP
        chroma_server_http_port=remote_port,  # 关键修改:用 http_port 替代 server_port
        chroma_api_impl="chromadb.api.fastapi.FastAPI",
    )
    print(f"开始连接远程Chroma服务:{remote_host}:{remote_port}...", flush=True)  # 新增

    # 连接远程服务并创建集合
    vector_store = Chroma.from_documents(
        documents=split_docs,
        embedding=embeddings,
        collection_name=collection_name,
        client_settings=chroma_settings,  # 指定远程连接配置
        # 注意:远程模式下无需指定persist_directory(数据存在服务端)
    )

    print("远程Chroma向量存储创建完成...", flush=True)  # 新增

    # 验证存储结果(远程集合)
    collection = vector_store._collection
    print(
        f"向量化完成!耗时 {time.time() - start_time:.2f}秒,共存储 {collection.count()} 个片段",
        flush=True,
    )
    return vector_store


def retrieve_similar_docs(vector_store, query, top_k=3):
    """从远程Chroma服务检索相似文档"""
    start_time = time.time()
    results = vector_store.similarity_search(query=query, k=top_k)
    print(
        f"检索完成!耗时 {time.time() - start_time:.4f}秒,找到 {len(results)} 个相似片段",
        flush=True,
    )
    return results


if __name__ == "__main__":
    # 配置参数(重点修改远程服务IP和端口)
    DATA_DIR = r"/llm-demo/temp-data"
    REMOTE_CHROMA_HOST = "192.168.1.100"  # 替换为实际服务器IP(如"192.168.1.100")
    REMOTE_CHROMA_PORT = 8000  # 与服务端启动时的端口一致
    LOCAL_MODEL_PATH = r"llm-demo/llm-model/embedding-bge-small-en-v1.5"

    try:
        # 1. 加载文档
        documents = load_documents(DATA_DIR)

        # 2. 分割文档
        split_docs = split_documents(documents)

        # 3. 初始化嵌入模型
        embeddings = init_embedding_model(local_model_path=LOCAL_MODEL_PATH)

        # 4. 连接远程Chroma并存储(核心修改)
        vector_store = vectorize_and_store(
            split_docs=split_docs,
            embeddings=embeddings,
            remote_host=REMOTE_CHROMA_HOST,
            remote_port=REMOTE_CHROMA_PORT,
        )

        # 5. 检索示例
        query = "文档中提到了哪些核心概念?"
        similar_docs = retrieve_similar_docs(vector_store, query, top_k=3)

        # 打印检索结果
        print("\n===== 检索结果 =====")
        for i, doc in enumerate(similar_docs, 1):
            print(f"片段 {i}(来源:{doc.metadata.get('source', '未知')}):")
            print(f"{doc.page_content[:200]}...\n")
    except Exception as e:
        print(f"程序执行出错:{str(e)}", flush=True)
        import traceback
        traceback.print_exc()

自定义LLM需要继承 from langchain.llms.base import LLM
直接调用豆包的API, 本地部署大模型太费电
代码在下面

整个demo工两个文件
MyCustomLLM.py

import json
from httpx import get
from langchain.llms.base import LLM
from typing import Optional, List, Any, Mapping
import requests

API_KEY = "*****************"
API_URL = "*****************"
MODEL = "doubao-seed-1-6-flash-250715"


def get_llm():
    return MyCustomLLM(
        model_name=MODEL,
        temperature=0.5,
        max_tokens=512,
        api_url=API_URL,
        api_key=API_KEY,
    )


class MyCustomLLM(LLM):
    """
    自定义LLM类,继承自LangChain的LLM基类
    可用于集成本地部署的模型或第三方API
    """

    # 模型相关参数(可根据实际模型调整)
    model_name: str = MODEL  # 模型名称
    temperature: float = 0.7  # 温度参数,控制输出随机性
    max_tokens: int = 1024  # 最大生成token数
    api_url: Optional[str] = API_URL  # 模型API地址(如果通过API调用)
    api_key: Optional[str] = API_KEY  # 模型API密钥(如果通过API调用)

    
    def _llm_type(self) -> str:
        """返回LLM类型标识,用于LangChain内部识别"""
        return "my_custom_llm"

    
    def _identifying_params(self) -> Mapping[str, Any]:
        """返回模型的标识参数,用于日志和序列化"""
        return {
            "model_name": self.model_name,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "api_url": self.api_url,  # 可选,仅当使用API时需要
        }

    def _call(
        self, prompt: str, stop: Optional[List[str]] = None, **kwargs: Any
    ) -> str:
        """
        核心方法:接收提示词,返回模型生成结果
        这里实现与自定义模型的交互逻辑
        """
        return self.complete(prompt)

    def complete(self, prompt: str):
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
        }
        data = {
            "model": self.model_name,
            "messages": [
                {"content": [{"text": prompt, "type": "text"}], "role": "user"}
            ],
            "stream": False,  # 开启流式响应
        }
        try:
            response = requests.post(
                self.api_url, headers=headers, data=json.dumps(data), stream=False
            )
            response.raise_for_status()
            content = response.text
            print("prompt:  " + prompt)
            print("result:  " + content)
            chunk = json.loads(content)
            # 提取当前片段的文本内容
            result = chunk["choices"][0]["message"]["content"]
            return result
        except Exception as e:
            return f"调用豆包API时出错: {str(e)}"


# 测试示例
if __name__ == "__main__":
    # 1. 初始化自定义LLM(模拟模式)
    custom_llm = get_llm()

    # 2. 调用模型生成文本
    prompt = "请解释什么是大语言模型?"
    response = custom_llm.invoke(prompt)
    print(f"提示:{prompt}")
    print(f"响应:{response}")

主程序
langchain_pinecone_demo.py

import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_pinecone import PineconeVectorStore
from langchain.text_splitter import CharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from MyCustomLLM import get_llm
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

API_KEY = "***************"  # pinecone APIkey
INDEX_NAME = "langchain-text-embedding-index"


def load_and_process_data(data_dir: str, chunk_size: int = 512):
    """加载并处理文档数据"""
    # 确保目录存在
    if not os.path.exists(data_dir):
        raise ValueError(f"数据目录 {data_dir} 不存在")

    # 读取文档
    loader = DirectoryLoader(
        data_dir,
        glob="**/*.txt",
        loader_cls=TextLoader,  # 强制使用文本加载器
        loader_kwargs={"encoding": "utf-8"},  # 避免编码错误
        show_progress=True,
    )
    documents = loader.load()

    #
    # 分块处理
    text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=50)
    # 切割加载的 document
    split_docs = text_splitter.split_documents(documents)
    print(
        f"文档分割为 {len(split_docs)} 个片段(单片段最大{chunk_size}字符)", flush=True
    )
    return split_docs


def configure_embedding():
    """配置文本嵌入模型"""
    # 指定了一个预训练的sentence-transformer模型的路径
    model_path = r"llm-demo/llm-model/embedding-bge-small-en-v1.5"
    print(f"[步骤1/3] 加载本地模型:{model_path}")
    model = HuggingFaceEmbeddings(
        model_name=model_path,
        model_kwargs={"device": "cpu"},  # 可指定'cuda'使用GPU(需安装CUDA)
        encode_kwargs={"normalize_embeddings": True},  # 归一化嵌入向量
    )
    print(f"[完成] 模型加载耗时 秒")
    return model


def build_pinecone_index(split_docs):
    embed_model = configure_embedding()
    vectorstore = PineconeVectorStore(
        index_name=INDEX_NAME,
        embedding=embed_model,
        pinecone_api_key=API_KEY,
    )
    vectorstore.add_documents(split_docs)
    print(f"成功向Pinecone添加 {len(split_docs)} 个文档片段")
    return vectorstore


def create_qa_chain(llm, vectorstore):  # 新增vectorstore参数
    """使用新版 API 创建问答链(替代 load_qa_chain)"""
    # 1. 定义提示模板
    prompt = ChatPromptTemplate.from_template(
        """
    请根据以下参考文档回答问题。如果文档中没有相关信息,请回答"没有找到相关信息"。
    
    参考文档:
    {context}
    
    问题:
    {question}
    """
    )

    # 2. 定义处理链(stuff 模式:将所有文档合并为上下文)
    def format_docs(docs):
        # 将文档列表转换为字符串
        return "\n\n".join([doc.page_content for doc in docs])

    # 3. 构建链
    chain = (
        {
            "context": vectorstore.as_retriever()
            | format_docs,  # 使用传入的vectorstore
            "question": RunnablePassthrough(),  # 传递问题
        }
        | prompt  # 注入提示模板
        | llm  # 调用LLM
        | StrOutputParser()  # 解析输出为字符串
    )

    return chain


if __name__ == "__main__":
    temp_dir = "llm-demo\\temp-data"
    print(f"数据目录:{temp_dir}")
    split_docs = load_and_process_data(temp_dir)
    print(f"分割后的文档数量:{len(split_docs)}")
    vectorstore = build_pinecone_index(split_docs)

    query = "线缆怎么检测"
    llm = get_llm()
    chain = create_qa_chain(llm, vectorstore)
    answer = chain.invoke(query)
    print("\n===== 回答 =====")
    print(answer)



有个大坑 就是其中各种依赖版本太乱了

当前python 3.12.8
当前环境依赖 requirements.txt pip install -r requirements.txt

aiofiles==24.1.0
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiohttp-retry==2.9.1
aiosignal==1.4.0
aiosqlite==0.21.0
altair==5.5.0
annotated-types==0.7.0
anyio==4.10.0
attrs==25.3.0
backoff==2.2.1
banks==2.2.0
bcrypt==4.3.0
beautifulsoup4==4.13.4
black==25.1.0
blinker==1.9.0
build==1.3.0
cachetools==5.5.2
certifi==2025.8.3
cffi==1.17.1
charset-normalizer==3.4.2
chromadb==1.0.16
click==8.2.1
colorama==0.4.6
coloredlogs==15.0.1
contourpy==1.3.1
cryptography==45.0.6
cycler==0.12.1
dataclasses-json==0.6.7
defusedxml==0.7.1
Deprecated==1.2.18
dirtyjson==1.0.8
distro==1.9.0
durationpy==0.10
emoji==2.14.1
filelock==3.18.0
filetype==1.2.0
Flask==3.1.1
flask-cors==6.0.1
flatbuffers==25.2.10
fonttools==4.55.3
frozenlist==1.7.0
fsspec==2025.7.0
gitdb==4.0.12
GitPython==3.1.45
google-auth==2.40.3
googleapis-common-protos==1.70.0
greenlet==3.2.4
griffe==1.11.0
grpcio==1.74.0
h11==0.16.0
html5lib==1.1
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
httpx-sse==0.4.1
huggingface-hub==0.34.3
humanfriendly==10.0
idna==3.10
importlib_metadata==8.7.0
importlib_resources==6.5.2
iniconfig==2.1.0
itsdangerous==2.2.0
Jinja2==3.1.6
jiter==0.10.0
joblib==1.5.1
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.25.0
jsonschema-specifications==2025.4.1
kiwisolver==1.4.8
kubernetes==33.1.0
langchain==0.3.27
langchain-chroma==0.2.5
langchain-community==0.3.27
langchain-core==0.3.74
langchain-huggingface==0.3.1
langchain-openai==0.3.30
langchain-pinecone==0.2.11
langchain-tests==0.3.20
langchain-text-splitters==0.3.9
langdetect==1.0.9
langsmith==0.4.14
llama-cloud==0.1.35
llama-cloud-services==0.6.54
llama-index==0.13.0
llama-index-cli==0.5.0
llama-index-core==0.13.0
llama-index-embeddings-huggingface==0.6.0
llama-index-embeddings-openai==0.5.0
llama-index-indices-managed-llama-cloud==0.9.0
llama-index-instrumentation==0.4.0
llama-index-llms-openai==0.5.2
llama-index-readers-file==0.5.0
llama-index-readers-llama-parse==0.5.0
llama-index-vector-stores-chroma==0.5.0
llama-index-workflows==1.3.0
llama-parse==0.6.54
load-dotenv==0.1.0
lxml==6.0.0
markdown-it-py==3.0.0
MarkupSafe==3.0.2
marshmallow==3.26.1
matplotlib==3.10.5
mdurl==0.1.2
mmh3==5.2.0
modelscope==1.28.2
mplfinance==0.12.10b0
mpmath==1.3.0
multidict==6.6.3
mypy_extensions==1.1.0
narwhals==2.1.0
nest-asyncio==1.6.0
networkx==3.5
nltk==3.9.1
numpy==2.3.2
oauthlib==3.3.1
olefile==0.47
onnxruntime==1.20.1
openai==1.99.9
opentelemetry-api==1.36.0
opentelemetry-exporter-otlp-proto-common==1.36.0
opentelemetry-exporter-otlp-proto-grpc==1.36.0
opentelemetry-proto==1.36.0
opentelemetry-sdk==1.36.0
opentelemetry-semantic-conventions==0.57b0
orjson==3.11.1
overrides==7.7.0
packaging==24.2
pandas==2.2.3
pathspec==0.12.1
pillow==11.3.0
pinecone==7.3.0
pinecone-plugin-assistant==1.7.0
pinecone-plugin-interface==0.0.7
platformdirs==4.3.8
pluggy==1.6.0
posthog==5.4.0
propcache==0.3.2
protobuf==6.31.1
psutil==7.0.0
py-cpuinfo==9.0.0
pyarrow==21.0.0
pyasn1==0.6.1
pyasn1_modules==0.4.2
pybase64==1.4.2
pycparser==2.22
pydantic==2.11.7
pydantic-settings==2.10.1
pydantic_core==2.33.2
pydeck==0.9.1
Pygments==2.19.2
pyparsing==3.2.0
pypdf==5.9.0
PyPika==0.48.9
pyproject_hooks==1.2.0
pyreadline3==3.5.4
pytest==8.4.1
pytest-asyncio==0.26.0
pytest-benchmark==5.1.0
pytest-codspeed==4.0.0
pytest-recording==0.13.4
pytest-socket==0.7.0
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
python-iso639==2025.2.18
python-magic==0.4.27
python-oxmsg==0.0.2
pytz==2025.2
PyYAML==6.0.2
RapidFuzz==3.13.0
referencing==0.36.2
regex==2025.7.34
requests==2.32.4
requests-oauthlib==2.0.0
requests-toolbelt==1.0.0
rich==14.1.0
rpds-py==0.27.0
rsa==4.9.1
ruff==0.8.4
safetensors==0.6.1
scikit-learn==1.7.1
scipy==1.16.1
sentence-transformers==5.1.0
setuptools==80.9.0
shellingham==1.5.4
six==1.17.0
smmap==5.0.2
sniffio==1.3.1
soupsieve==2.7
SQLAlchemy==2.0.42
sseclient-py==1.8.0
streamlit==1.48.0
striprtf==0.0.26
sympy==1.14.0
syrupy==4.9.1
tenacity==9.1.2
threadpoolctl==3.6.0
tiktoken==0.10.0
tokenizers==0.21.4
toml==0.10.2
torch==2.8.0
tornado==6.5.2
tqdm==4.67.1
transformers==4.55.0
typer==0.16.0
typing-inspect==0.9.0
typing-inspection==0.4.1
typing_extensions==4.14.1
tzdata==2025.2
unstructured==0.18.13
unstructured-client==0.42.3
urllib3==2.5.0
uvicorn==0.35.0
vcrpy==7.0.0
watchdog==6.0.0
watchfiles==1.1.0
webencodings==0.5.1
websocket-client==1.8.0
websockets==15.0.1
Werkzeug==3.1.3
wrapt==1.17.2
yarl==1.20.1
zipp==3.23.0
zstandard==0.23.0


Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐