OpenClaw 实战案例:企业知识库系统构建
·
目录
摘要
本文通过一个完整的企业知识库系统案例,演示如何使用 OpenClaw 构建智能知识管理平台。文章涵盖知识采集、知识组织、智能检索、知识问答等核心功能,帮助开发者掌握 OpenClaw 在企业知识管理场景的应用。通过详细的系统设计和代码实现,让读者了解知识库系统的完整构建过程。📚
1. 引言 - 知识库系统概述
1.1 企业知识管理痛点
企业知识管理面临诸多挑战,传统方案难以满足现代企业需求:
| 痛点 | 传统方案 | OpenClaw方案 |
|---|---|---|
| 知识分散 | 文档散落各处 | 统一知识中心 |
| 检索困难 | 关键词匹配 | 语义检索 |
| 更新滞后 | 手动维护 | 自动采集 |
| 利用率低 | 被动查找 | 主动推荐 |
| 难以传承 | 人员流失 | 知识沉淀 |
1.2 知识库系统架构
1.3 核心功能规划
| 功能模块 | 核心能力 | 技术实现 |
|---|---|---|
| 知识采集 | 多源数据导入 | 文档解析 + 网页抓取 |
| 知识处理 | 结构化抽取 | NLP + 知识抽取 |
| 智能检索 | 语义搜索 | 向量检索 + 排序 |
| 知识问答 | 自然语言问答 | RAG + LLM |
| 知识图谱 | 关系可视化 | 图数据库 + 可视化 |
2. 知识采集模块
2.1 多源数据导入
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from dataclasses import dataclass
import os
import re
@dataclass
class KnowledgeDocument:
"""知识文档"""
id: str
title: str
content: str
source: str
source_type: str
metadata: Dict
created_at: float
updated_at: float
class DataImporter(ABC):
"""数据导入器基类"""
@abstractmethod
def import_data(self, source: str) -> List[KnowledgeDocument]:
"""导入数据"""
pass
@abstractmethod
def supported_formats(self) -> List[str]:
"""支持的格式"""
pass
class DocumentImporter(DataImporter):
"""文档导入器"""
def __init__(self):
self.parsers = {
'.txt': self._parse_txt,
'.md': self._parse_markdown,
'.pdf': self._parse_pdf,
'.docx': self._parse_docx,
'.html': self._parse_html
}
def import_data(self, source: str) -> List[KnowledgeDocument]:
"""导入文档"""
documents = []
if os.path.isfile(source):
doc = self._import_file(source)
if doc:
documents.append(doc)
elif os.path.isdir(source):
documents = self._import_directory(source)
return documents
def _import_file(self, file_path: str) -> Optional[KnowledgeDocument]:
"""导入单个文件"""
ext = os.path.splitext(file_path)[1].lower()
if ext not in self.parsers:
return None
parser = self.parsers[ext]
content, metadata = parser(file_path)
return KnowledgeDocument(
id=f"doc_{int(time.time() * 1000)}",
title=metadata.get('title', os.path.basename(file_path)),
content=content,
source=file_path,
source_type='file',
metadata=metadata,
created_at=time.time(),
updated_at=time.time()
)
def _import_directory(self, dir_path: str) -> List[KnowledgeDocument]:
"""导入目录"""
documents = []
for root, _, files in os.walk(dir_path):
for file in files:
file_path = os.path.join(root, file)
doc = self._import_file(file_path)
if doc:
documents.append(doc)
return documents
def _parse_txt(self, file_path: str) -> tuple:
"""解析TXT文件"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, {'format': 'txt'}
def _parse_markdown(self, file_path: str) -> tuple:
"""解析Markdown文件"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取标题
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
title = title_match.group(1) if title_match else os.path.basename(file_path)
return content, {'format': 'markdown', 'title': title}
def _parse_pdf(self, file_path: str) -> tuple:
"""解析PDF文件"""
# 需要安装 PyPDF2 或 pdfplumber
# 这里简化实现
return "PDF内容...", {'format': 'pdf'}
def _parse_docx(self, file_path: str) -> tuple:
"""解析Word文档"""
# 需要安装 python-docx
# 这里简化实现
return "Word内容...", {'format': 'docx'}
def _parse_html(self, file_path: str) -> tuple:
"""解析HTML文件"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取正文(简化实现)
# 实际应使用 BeautifulSoup
text = re.sub(r'<[^>]+>', '', content)
return text, {'format': 'html'}
def supported_formats(self) -> List[str]:
return list(self.parsers.keys())
class WebScraper(DataImporter):
"""网页抓取器"""
def __init__(self):
self.session = None
def import_data(self, source: str) -> List[KnowledgeDocument]:
"""抓取网页"""
documents = []
# 使用OpenClaw的web_fetch功能
# 这里简化实现
content = self._fetch_page(source)
if content:
documents.append(KnowledgeDocument(
id=f"web_{int(time.time() * 1000)}",
title=self._extract_title(content),
content=content,
source=source,
source_type='web',
metadata={'url': source},
created_at=time.time(),
updated_at=time.time()
))
return documents
def _fetch_page(self, url: str) -> Optional[str]:
"""获取页面内容"""
# 使用 web_fetch 工具
# result = web_fetch(url=url)
# return result.get('content')
return "网页内容..."
def _extract_title(self, content: str) -> str:
"""提取标题"""
match = re.search(r'<title>(.+?)</title>', content)
return match.group(1) if match else "无标题"
def supported_formats(self) -> List[str]:
return ['http', 'https']
# 使用示例
doc_importer = DocumentImporter()
web_scraper = WebScraper()
# 导入文档
docs = doc_importer.import_data("/path/to/documents")
print(f"导入 {len(docs)} 个文档")
# 抓取网页
web_docs = web_scraper.import_data("https://example.com/article")
print(f"抓取 {len(web_docs)} 个网页")
2.2 知识抽取与结构化
from typing import List, Dict, Tuple
import re
class KnowledgeExtractor:
"""知识抽取器"""
def __init__(self):
self.entity_patterns = self._define_entity_patterns()
self.relation_patterns = self._define_relation_patterns()
def extract(self, document: KnowledgeDocument) -> Dict:
"""
抽取知识
Args:
document: 知识文档
Returns:
抽取结果
"""
content = document.content
# 实体抽取
entities = self._extract_entities(content)
# 关系抽取
relations = self._extract_relations(content, entities)
# 关键词抽取
keywords = self._extract_keywords(content)
# 摘要生成
summary = self._generate_summary(content)
# 问题生成
questions = self._generate_questions(content)
return {
"document_id": document.id,
"entities": entities,
"relations": relations,
"keywords": keywords,
"summary": summary,
"questions": questions
}
def _define_entity_patterns(self) -> Dict:
"""定义实体模式"""
return {
"person": [
r'[\u4e00-\u9fa5]{2,4}(?=说|表示|认为|指出)',
],
"organization": [
r'[\u4e00-\u9fa5]+公司',
r'[\u4e00-\u9fa5]+集团',
r'[\u4e00-\u9fa5]+部门',
],
"location": [
r'[\u4e00-\u9fa5]+省',
r'[\u4e00-\u9fa5]+市',
r'[\u4e00-\u9fa5]+区',
],
"date": [
r'\d{4}年\d{1,2}月\d{1,2}日',
r'\d{4}-\d{2}-\d{2}',
],
"money": [
r'\d+万?元',
r'\d+\.?\d*万美元',
]
}
def _define_relation_patterns(self) -> List:
"""定义关系模式"""
return [
{
"pattern": r'(.+?)是(.+?)的(.+)',
"relation": "属性关系"
},
{
"pattern": r'(.+?)属于(.+)',
"relation": "归属关系"
},
{
"pattern": r'(.+?)负责(.+)',
"relation": "责任关系"
}
]
def _extract_entities(self, content: str) -> List[Dict]:
"""抽取实体"""
entities = []
for entity_type, patterns in self.entity_patterns.items():
for pattern in patterns:
matches = re.finditer(pattern, content)
for match in matches:
entities.append({
"type": entity_type,
"text": match.group(),
"start": match.start(),
"end": match.end()
})
return entities
def _extract_relations(self, content: str, entities: List[Dict]) -> List[Dict]:
"""抽取关系"""
relations = []
for rel_pattern in self.relation_patterns:
matches = re.finditer(rel_pattern["pattern"], content)
for match in matches:
relations.append({
"type": rel_pattern["relation"],
"subject": match.group(1) if len(match.groups()) > 0 else None,
"predicate": rel_pattern["relation"],
"object": match.group(2) if len(match.groups()) > 1 else None,
"text": match.group()
})
return relations
def _extract_keywords(self, content: str, top_k: int = 10) -> List[str]:
"""抽取关键词"""
# 简化实现:基于词频
# 实际应使用 TF-IDF 或 TextRank
# 移除标点和空白
text = re.sub(r'[^\w\s\u4e00-\u9fa5]', '', content)
# 分词(简化)
words = list(text)
# 统计词频
word_freq = {}
for word in words:
if len(word) > 1: # 过滤单字
word_freq[word] = word_freq.get(word, 0) + 1
# 排序
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, _ in sorted_words[:top_k]]
def _generate_summary(self, content: str, max_length: int = 200) -> str:
"""生成摘要"""
# 简化实现:取前N字
# 实际应使用抽取式或生成式摘要
sentences = content.split('。')
summary = ""
for sentence in sentences:
if len(summary) + len(sentence) <= max_length:
summary += sentence + "。"
else:
break
return summary
def _generate_questions(self, content: str) -> List[str]:
"""生成问题"""
questions = []
# 基于标题生成问题
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
if title_match:
title = title_match.group(1)
questions.append(f"什么是{title}?")
# 基于内容生成问题
# 简化实现,实际应使用问答生成模型
return questions
# 使用示例
extractor = KnowledgeExtractor()
doc = KnowledgeDocument(
id="doc_001",
title="OpenClaw介绍",
content="# OpenClaw介绍\n\nOpenClaw是一个强大的AI Agent框架...",
source="manual",
source_type="manual",
metadata={},
created_at=time.time(),
updated_at=time.time()
)
result = extractor.extract(doc)
print(f"实体: {result['entities']}")
print(f"关键词: {result['keywords']}")
print(f"摘要: {result['summary']}")
3. 知识存储与索引
3.1 向量化存储
from typing import List, Dict, Optional
import numpy as np
from dataclasses import dataclass
@dataclass
class VectorRecord:
"""向量记录"""
id: str
vector: np.ndarray
metadata: Dict
class VectorStore:
"""向量存储"""
def __init__(self, dimension: int = 768):
self.dimension = dimension
self.records: Dict[str, VectorRecord] = {}
self.index = None # 实际应使用 FAISS 或 Milvus
def add(self, id: str, vector: np.ndarray, metadata: Dict = None):
"""添加向量"""
if len(vector) != self.dimension:
raise ValueError(f"向量维度不匹配: {len(vector)} != {self.dimension}")
self.records[id] = VectorRecord(
id=id,
vector=vector,
metadata=metadata or {}
)
def batch_add(self, records: List[Tuple[str, np.ndarray, Dict]]):
"""批量添加"""
for id, vector, metadata in records:
self.add(id, vector, metadata)
def search(self, query_vector: np.ndarray, top_k: int = 10) -> List[Dict]:
"""
搜索相似向量
Args:
query_vector: 查询向量
top_k: 返回数量
Returns:
搜索结果
"""
results = []
for record in self.records.values():
# 计算余弦相似度
similarity = self._cosine_similarity(query_vector, record.vector)
results.append({
"id": record.id,
"score": similarity,
"metadata": record.metadata
})
# 排序
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_k]
def _cosine_similarity(self, v1: np.ndarray, v2: np.ndarray) -> float:
"""计算余弦相似度"""
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
def delete(self, id: str):
"""删除向量"""
if id in self.records:
del self.records[id]
def get(self, id: str) -> Optional[VectorRecord]:
"""获取向量"""
return self.records.get(id)
class EmbeddingGenerator:
"""向量生成器"""
def __init__(self, model_name: str = "default"):
self.model_name = model_name
def encode(self, text: str) -> np.ndarray:
"""
生成文本向量
Args:
text: 输入文本
Returns:
向量
"""
# 简化实现
# 实际应调用 OpenClaw 的嵌入模型
# 或使用 sentence-transformers
# 返回随机向量作为示例
return np.random.randn(768)
def batch_encode(self, texts: List[str]) -> List[np.ndarray]:
"""批量生成向量"""
return [self.encode(text) for text in texts]
# 使用示例
vector_store = VectorStore(dimension=768)
embedding_gen = EmbeddingGenerator()
# 添加文档向量
text = "OpenClaw是一个强大的AI Agent框架"
vector = embedding_gen.encode(text)
vector_store.add("doc_001", vector, {"text": text, "title": "OpenClaw介绍"})
# 搜索
query = "AI Agent框架"
query_vector = embedding_gen.encode(query)
results = vector_store.search(query_vector, top_k=5)
print(f"找到 {len(results)} 个相似文档")
3.2 知识索引构建
from typing import Dict, List, Set
from collections import defaultdict
import re
class KnowledgeIndex:
"""知识索引"""
def __init__(self):
self.keyword_index: Dict[str, Set[str]] = defaultdict(set)
self.category_index: Dict[str, Set[str]] = defaultdict(set)
self.tag_index: Dict[str, Set[str]] = defaultdict(set)
self.documents: Dict[str, KnowledgeDocument] = {}
def add_document(self, document: KnowledgeDocument, keywords: List[str] = None):
"""添加文档到索引"""
doc_id = document.id
self.documents[doc_id] = document
# 关键词索引
if keywords:
for keyword in keywords:
self.keyword_index[keyword.lower()].add(doc_id)
else:
# 自动提取关键词
extracted_keywords = self._extract_keywords(document.content)
for keyword in extracted_keywords:
self.keyword_index[keyword.lower()].add(doc_id)
# 分类索引
category = document.metadata.get("category")
if category:
self.category_index[category].add(doc_id)
# 标签索引
tags = document.metadata.get("tags", [])
for tag in tags:
self.tag_index[tag].add(doc_id)
def search_by_keyword(self, keyword: str) -> List[KnowledgeDocument]:
"""按关键词搜索"""
doc_ids = self.keyword_index.get(keyword.lower(), set())
return [self.documents[doc_id] for doc_id in doc_ids if doc_id in self.documents]
def search_by_category(self, category: str) -> List[KnowledgeDocument]:
"""按分类搜索"""
doc_ids = self.category_index.get(category, set())
return [self.documents[doc_id] for doc_id in doc_ids if doc_id in self.documents]
def search_by_tags(self, tags: List[str]) -> List[KnowledgeDocument]:
"""按标签搜索"""
if not tags:
return []
# 取交集
result_ids = None
for tag in tags:
tag_ids = self.tag_index.get(tag, set())
if result_ids is None:
result_ids = tag_ids.copy()
else:
result_ids &= tag_ids
if result_ids is None:
return []
return [self.documents[doc_id] for doc_id in result_ids if doc_id in self.documents]
def search(self, query: str) -> List[KnowledgeDocument]:
"""综合搜索"""
# 分词
words = re.findall(r'[\w\u4e00-\u9fa5]+', query.lower())
# 收集匹配的文档
matched_docs = defaultdict(int)
for word in words:
doc_ids = self.keyword_index.get(word, set())
for doc_id in doc_ids:
matched_docs[doc_id] += 1
# 按匹配度排序
sorted_doc_ids = sorted(matched_docs.keys(), key=lambda x: matched_docs[x], reverse=True)
return [self.documents[doc_id] for doc_id in sorted_doc_ids if doc_id in self.documents]
def _extract_keywords(self, content: str) -> List[str]:
"""提取关键词"""
# 简化实现
words = re.findall(r'[\w\u4e00-\u9fa5]+', content)
return list(set(words))
def get_statistics(self) -> Dict:
"""获取统计信息"""
return {
"total_documents": len(self.documents),
"total_keywords": len(self.keyword_index),
"total_categories": len(self.category_index),
"total_tags": len(self.tag_index)
}
# 使用示例
index = KnowledgeIndex()
doc1 = KnowledgeDocument(
id="doc_001",
title="OpenClaw入门指南",
content="OpenClaw是一个强大的AI Agent框架...",
source="manual",
source_type="manual",
metadata={"category": "教程", "tags": ["OpenClaw", "入门"]},
created_at=time.time(),
updated_at=time.time()
)
index.add_document(doc1)
# 搜索
results = index.search("OpenClaw 教程")
print(f"找到 {len(results)} 个文档")
4. 智能检索与问答
4.1 语义检索
from typing import List, Dict, Tuple
import numpy as np
class SemanticRetriever:
"""语义检索器"""
def __init__(self, vector_store: VectorStore, embedding_gen: EmbeddingGenerator):
self.vector_store = vector_store
self.embedding_gen = embedding_gen
self.documents: Dict[str, KnowledgeDocument] = {}
def index_document(self, document: KnowledgeDocument):
"""索引文档"""
# 分段
chunks = self._chunk_document(document)
# 向量化并存储
for i, chunk in enumerate(chunks):
vector = self.embedding_gen.encode(chunk)
chunk_id = f"{document.id}_chunk_{i}"
self.vector_store.add(chunk_id, vector, {
"document_id": document.id,
"chunk_index": i,
"content": chunk
})
self.documents[document.id] = document
def _chunk_document(self, document: KnowledgeDocument, chunk_size: int = 500) -> List[str]:
"""文档分段"""
content = document.content
# 按段落分割
paragraphs = content.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) <= chunk_size:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
"""
检索相关内容
Args:
query: 查询文本
top_k: 返回数量
Returns:
检索结果
"""
# 向量化查询
query_vector = self.embedding_gen.encode(query)
# 向量搜索
results = self.vector_store.search(query_vector, top_k=top_k * 2)
# 重排序(简化)
reranked_results = self._rerank(query, results)
return reranked_results[:top_k]
def _rerank(self, query: str, results: List[Dict]) -> List[Dict]:
"""重排序"""
# 简化实现:保持原排序
# 实际应使用重排序模型
return results
def hybrid_search(self, query: str, keyword_results: List, top_k: int = 5) -> List[Dict]:
"""
混合检索
Args:
query: 查询文本
keyword_results: 关键词检索结果
top_k: 返回数量
Returns:
混合检索结果
"""
# 语义检索
semantic_results = self.retrieve(query, top_k=top_k)
# 合并结果
all_results = {}
# 添加语义检索结果
for result in semantic_results:
doc_id = result["metadata"]["document_id"]
if doc_id not in all_results:
all_results[doc_id] = {
"document_id": doc_id,
"semantic_score": result["score"],
"keyword_score": 0,
"content": result["metadata"]["content"]
}
# 添加关键词检索结果
for doc in keyword_results:
if doc.id in all_results:
all_results[doc.id]["keyword_score"] = 1.0
else:
all_results[doc.id] = {
"document_id": doc.id,
"semantic_score": 0,
"keyword_score": 1.0,
"content": doc.content[:500]
}
# 计算综合分数
for doc_id in all_results:
result = all_results[doc_id]
result["final_score"] = 0.7 * result["semantic_score"] + 0.3 * result["keyword_score"]
# 排序
sorted_results = sorted(all_results.values(), key=lambda x: x["final_score"], reverse=True)
return sorted_results[:top_k]
# 使用示例
retriever = SemanticRetriever(vector_store, embedding_gen)
# 索引文档
retriever.index_document(doc1)
# 检索
results = retriever.retrieve("如何使用OpenClaw")
for result in results:
print(f"分数: {result['score']:.4f}")
print(f"内容: {result['metadata']['content'][:100]}...\n")
4.2 知识问答
from typing import Dict, List, Optional
class KnowledgeQA:
"""知识问答"""
def __init__(self, retriever: SemanticRetriever):
self.retriever = retriever
def answer(self, question: str, top_k: int = 3) -> Dict:
"""
回答问题
Args:
question: 问题
top_k: 检索数量
Returns:
回答结果
"""
# 检索相关内容
results = self.retriever.retrieve(question, top_k=top_k)
if not results:
return {
"question": question,
"answer": "抱歉,我没有找到相关信息。",
"sources": [],
"confidence": 0
}
# 构建上下文
context = self._build_context(results)
# 生成回答(使用OpenClaw)
answer = self._generate_answer(question, context)
# 计算置信度
confidence = self._calculate_confidence(results)
return {
"question": question,
"answer": answer,
"sources": [
{
"document_id": r["metadata"]["document_id"],
"content": r["metadata"]["content"][:200],
"score": r["score"]
}
for r in results
],
"confidence": confidence
}
def _build_context(self, results: List[Dict]) -> str:
"""构建上下文"""
context_parts = []
for i, result in enumerate(results):
content = result["metadata"]["content"]
context_parts.append(f"[文档{i+1}]\n{content}")
return "\n\n".join(context_parts)
def _generate_answer(self, question: str, context: str) -> str:
"""生成回答"""
# 使用OpenClaw生成回答
# prompt = f"根据以下信息回答问题:\n\n{context}\n\n问题:{question}"
# answer = openclaw.generate(prompt)
# 简化实现
return f"根据知识库信息,{question}的答案是..."
def _calculate_confidence(self, results: List[Dict]) -> float:
"""计算置信度"""
if not results:
return 0
# 基于检索分数计算
top_score = results[0]["score"]
# 归一化到0-1
confidence = min(top_score, 1.0)
return confidence
def batch_answer(self, questions: List[str]) -> List[Dict]:
"""批量回答"""
return [self.answer(q) for q in questions]
# 使用示例
qa = KnowledgeQA(retriever)
# 提问
result = qa.answer("OpenClaw是什么?")
print(f"问题: {result['question']}")
print(f"回答: {result['answer']}")
print(f"置信度: {result['confidence']:.2f}")
print(f"来源: {len(result['sources'])} 个文档")
5. 知识图谱
5.1 图谱构建
from typing import Dict, List, Set
from dataclasses import dataclass
@dataclass
class Entity:
"""实体"""
id: str
name: str
type: str
properties: Dict
@dataclass
class Relation:
"""关系"""
id: str
source_id: str
target_id: str
relation_type: str
properties: Dict
class KnowledgeGraph:
"""知识图谱"""
def __init__(self):
self.entities: Dict[str, Entity] = {}
self.relations: Dict[str, Relation] = {}
self.entity_index: Dict[str, Set[str]] = {} # 名称到ID的索引
def add_entity(self, entity: Entity):
"""添加实体"""
self.entities[entity.id] = entity
# 更新索引
if entity.name not in self.entity_index:
self.entity_index[entity.name] = set()
self.entity_index[entity.name].add(entity.id)
def add_relation(self, relation: Relation):
"""添加关系"""
self.relations[relation.id] = relation
def get_entity(self, entity_id: str) -> Optional[Entity]:
"""获取实体"""
return self.entities.get(entity_id)
def get_entity_by_name(self, name: str) -> List[Entity]:
"""按名称获取实体"""
entity_ids = self.entity_index.get(name, set())
return [self.entities[eid] for eid in entity_ids if eid in self.entities]
def get_relations(self, entity_id: str, direction: str = "both") -> List[Relation]:
"""
获取实体的关系
Args:
entity_id: 实体ID
direction: 方向 (in/out/both)
Returns:
关系列表
"""
result = []
for relation in self.relations.values():
if direction == "out" and relation.source_id == entity_id:
result.append(relation)
elif direction == "in" and relation.target_id == entity_id:
result.append(relation)
elif direction == "both":
if relation.source_id == entity_id or relation.target_id == entity_id:
result.append(relation)
return result
def find_path(self, start_id: str, end_id: str, max_depth: int = 5) -> List[List[str]]:
"""
查找路径
Args:
start_id: 起始实体ID
end_id: 目标实体ID
max_depth: 最大深度
Returns:
路径列表
"""
paths = []
self._dfs(start_id, end_id, [start_id], paths, max_depth)
return paths
def _dfs(self, current: str, target: str, path: List[str], paths: List, max_depth: int):
"""深度优先搜索"""
if len(path) > max_depth:
return
if current == target:
paths.append(path.copy())
return
# 获取相邻实体
relations = self.get_relations(current, "out")
for relation in relations:
next_entity = relation.target_id
if next_entity not in path: # 避免循环
path.append(next_entity)
self._dfs(next_entity, target, path, paths, max_depth)
path.pop()
def get_neighbors(self, entity_id: str, depth: int = 1) -> Dict:
"""
获取邻居
Args:
entity_id: 实体ID
depth: 深度
Returns:
邻居图
"""
visited = set()
entities = []
relations = []
self._bfs(entity_id, depth, visited, entities, relations)
return {
"entities": entities,
"relations": relations
}
def _bfs(self, start: str, depth: int, visited: Set, entities: List, relations: List):
"""广度优先搜索"""
queue = [(start, 0)]
visited.add(start)
while queue:
current, d = queue.pop(0)
if d > depth:
continue
# 添加实体
if current in self.entities:
entities.append(self.entities[current])
# 获取关系
for relation in self.get_relations(current, "out"):
relations.append(relation)
next_entity = relation.target_id
if next_entity not in visited:
visited.add(next_entity)
queue.append((next_entity, d + 1))
# 使用示例
kg = KnowledgeGraph()
# 添加实体
kg.add_entity(Entity(
id="ent_001",
name="OpenClaw",
type="产品",
properties={"描述": "AI Agent框架"}
))
kg.add_entity(Entity(
id="ent_002",
name="张龙生",
type="人物",
properties={"角色": "开发者"}
))
# 添加关系
kg.add_relation(Relation(
id="rel_001",
source_id="ent_002",
target_id="ent_001",
relation_type="开发",
properties={}
))
# 查询
relations = kg.get_relations("ent_002")
print(f"张龙生的关系: {len(relations)} 个")
6. 最佳实践
6.1 系统设计原则
| 原则 | 说明 | 实践 |
|---|---|---|
| 可扩展 | 支持多数据源 | 插件式导入器 |
| 高性能 | 快速检索 | 向量索引 + 缓存 |
| 准确性 | 精准回答 | 混合检索 + 重排序 |
| 可解释 | 来源可追溯 | 引用来源展示 |
6.2 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 检索不准 | 向量质量差 | 优化嵌入模型 |
| 回答错误 | 知识不完整 | 补充知识库 |
| 性能慢 | 索引未优化 | 优化索引结构 |
7. 总结
7.1 核心要点
本文通过完整的知识库系统案例,展示了 OpenClaw 在企业知识管理场景的应用:
| 模块 | 核心功能 | 技术要点 |
|---|---|---|
| 知识采集 | 多源导入 | 文档解析 + 网页抓取 |
| 知识处理 | 结构化抽取 | NLP + 实体关系 |
| 知识存储 | 向量化存储 | 向量数据库 |
| 智能检索 | 语义搜索 | 向量检索 + 混合 |
| 知识问答 | RAG问答 | 检索 + 生成 |
7.2 下一步学习
- 第74篇:OpenClaw 实战案例:自动化运维系统
参考资料
更多推荐


所有评论(0)