DeerFlow多智能体项目分析-向量数据库实现知识检索的源码解析
DeerFlow 项目采用模块化设计,支持多种向量数据库和RAG(检索增强生成)提供商,为AI研究提供强大的知识检索能力。项目地址:https://github.com/bytedance/deer-flow文件路径"""文本块""""""文档对象"""id: str"""资源描述"""uri: str = Field(..., description="资源URI")title: str = F
概述
DeerFlow 项目采用模块化设计,支持多种向量数据库和RAG(检索增强生成)提供商,为AI研究提供强大的知识检索能力。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
支持的向量数据库
1. 核心抽象接口
项目定义了统一的检索器抽象基类:
文件路径: src/rag/retriever.py
class Retriever(abc.ABC):
@abc.abstractmethod
def list_resources(self, query: str | None = None) -> list[Resource]:
"""列出RAG提供商的资源"""
pass
@abc.abstractmethod
def query_relevant_documents(self, query: str, resources: list[Resource] = []) -> list[Document]:
"""从资源中查询相关文档"""
pass
2. 数据模型定义
文件路径: src/rag/retriever.py
class Chunk:
"""文本块"""
content: str
similarity: float
class Document:
"""文档对象"""
id: str
url: str | None = None
title: str | None = None
chunks: list[Chunk] = []
class Resource(BaseModel):
"""资源描述"""
uri: str = Field(..., description="资源URI")
title: str = Field(..., description="资源标题")
description: str | None = Field("", description="资源描述")
向量数据库实现
1. Milvus 实现
文件路径: src/rag/milvus.py
嵌入模型支持
class DashscopeEmbeddings:
"""OpenAI兼容的嵌入包装器"""
def embed_query(self, text: str) -> List[float]:
"""返回给定文本的嵌入向量"""
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""返回多个文档的嵌入向量(LangChain接口)"""
Milvus检索器配置
class MilvusRetriever(Retriever):
"""基于Milvus向量存储的检索器实现"""
def __init__(self) -> None:
# 连接配置
self.uri: str = get_str_env("MILVUS_URI", "http://localhost:19530")
self.collection_name: str = get_str_env("MILVUS_COLLECTION", "documents")
# 嵌入配置
self.embedding_provider = get_str_env("MILVUS_EMBEDDING_PROVIDER", "openai")
self.embedding_dim: int = self._get_embedding_dimension(self.embedding_model)
集合Schema定义
def _create_collection_schema(self) -> CollectionSchema:
"""构建Milvus集合Schema"""
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.embedding_dim),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="url", dtype=DataType.VARCHAR, max_length=1024),
]
2. RAGFlow 实现
文件路径: src/rag/ragflow.py
class RAGFlowProvider(Retriever):
"""使用RAGFlow检索文档的提供商"""
def query_relevant_documents(self, query: str, resources: list[Resource] = []) -> list[Document]:
"""查询相关文档"""
payload = {
"question": query,
"dataset_ids": dataset_ids,
"document_ids": document_ids,
"page_size": self.page_size,
}
response = requests.post(f"{self.api_url}/api/v1/retrieval", headers=headers, json=payload)
3. VikingDB 实现
文件路径: src/rag/vikingdb_knowledge_base.py
class VikingDBKnowledgeBaseProvider(Retriever):
"""使用VikingDB知识库API检索文档的提供商"""
def _create_signature(self, method: str, path: str, query_params: dict, headers: dict, payload: bytes) -> str:
"""创建API签名"""
# HMAC-SHA256签名实现
def _make_signed_request(self, method: str, path: str, params: dict = None, data: dict = None):
"""发起签名请求"""
配置管理
1. RAG提供商配置
文件路径: src/config/tools.py
class RAGProvider(enum.Enum):
DIFY = "dify"
RAGFLOW = "ragflow"
VIKINGDB_KNOWLEDGE_BASE = "vikingdb_knowledge_base"
MOI = "moi"
MILVUS = "milvus"
SELECTED_RAG_PROVIDER = os.getenv("RAG_PROVIDER")
2. 构建器模式
文件路径: src/rag/builder.py
def build_retriever() -> Retriever | None:
if SELECTED_RAG_PROVIDER == RAGProvider.DIFY.value:
return DifyProvider()
elif SELECTED_RAG_PROVIDER == RAGProvider.RAGFLOW.value:
return RAGFlowProvider()
elif SELECTED_RAG_PROVIDER == RAGProvider.MILVUS.value:
return MilvusProvider()
# ... 其他提供商
工具集成
检索工具
文件路径: src/tools/retriever.py
class RetrieverTool(BaseTool):
name: str = "local_search_tool"
description: str = "从带有`rag://`前缀的文件中检索信息"
def _run(self, keywords: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> list[Document]:
documents = self.retriever.query_relevant_documents(keywords, self.resources)
return [doc.to_dict() for doc in documents]
def get_retriever_tool(resources: List[Resource]) -> RetrieverTool | None:
"""获取检索工具实例"""
retriever = build_retriever()
return RetrieverTool(retriever=retriever, resources=resources)
环境配置
Milvus 配置示例
文件路径: .env.example
# Milvus向量数据库配置
RAG_PROVIDER=milvus
MILVUS_URI=http://localhost:19530 # 或 ./milvus_demo.db (Lite版)
MILVUS_COLLECTION=documents
MILVUS_EMBEDDING_PROVIDER=openai
MILVUS_EMBEDDING_MODEL=text-embedding-ada-002
MILVUS_EMBEDDING_API_KEY=your_api_key
MILVUS_TOP_K=10
MILVUS_AUTO_LOAD_EXAMPLES=true
RAGFlow 配置示例
# RAGFlow配置
RAG_PROVIDER=ragflow
RAGFLOW_API_URL="http://localhost:9388"
RAGFLOW_API_KEY="ragflow-xxx"
RAGFLOW_RETRIEVAL_SIZE=10
RAGFLOW_CROSS_LANGUAGES=English,Chinese,Spanish,French,German,Japanese,Korean
VikingDB 配置示例
# VikingDB知识库配置
RAG_PROVIDER=vikingdb_knowledge_base
VIKINGDB_KNOWLEDGE_BASE_API_URL="api-knowledgebase.mlp.cn-beijing.volces.com"
VIKINGDB_KNOWLEDGE_BASE_API_AK="AKxxx"
VIKINGDB_KNOWLEDGE_BASE_API_SK="your_secret_key"
VIKINGDB_KNOWLEDGE_BASE_RETRIEVAL_SIZE=15
技术特性
1. 模块化设计
- 统一的抽象接口,易于扩展新的向量数据库
- 工厂模式动态创建检索器实例
- 配置驱动的提供商选择
2. 多模态支持
- 支持文本、图片等多种数据类型
- 灵活的嵌入模型配置
- 跨语言检索能力
3. 高性能检索
- 向量相似度计算
- 可配置的检索结果数量
- 智能文档分块和聚合
4. 企业级特性
- API签名认证(VikingDB)
- 连接池和错误处理
- 详细的日志记录
使用场景
- 私有知识库检索: 从用户上传的文档中检索相关信息
- 上下文增强: 为AI研究提供领域特定的背景知识
- 多源数据融合: 整合来自不同数据源的信息
- 实时查询: 在研究过程中动态检索相关文档
总结
DeerFlow的向量数据库架构体现了现代AI应用的最佳实践:
- 抽象化设计提供了良好的扩展性
- 多提供商支持满足不同部署需求
- 统一的工具接口简化了集成复杂度
- 丰富的配置选项支持灵活的定制化
这种设计使得DeerFlow能够适应各种企业环境和技术栈,为AI研究提供强大而灵活的知识检索能力。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)