摘要

本文通过一个完整的企业知识库系统案例,演示如何使用 OpenClaw 构建智能知识管理平台。文章涵盖知识采集、知识组织、智能检索、知识问答等核心功能,帮助开发者掌握 OpenClaw 在企业知识管理场景的应用。通过详细的系统设计和代码实现,让读者了解知识库系统的完整构建过程。📚


1. 引言 - 知识库系统概述

1.1 企业知识管理痛点

企业知识管理面临诸多挑战,传统方案难以满足现代企业需求:

痛点 传统方案 OpenClaw方案
知识分散 文档散落各处 统一知识中心
检索困难 关键词匹配 语义检索
更新滞后 手动维护 自动采集
利用率低 被动查找 主动推荐
难以传承 人员流失 知识沉淀

1.2 知识库系统架构

知识服务层

知识存储层

知识处理层

知识采集层

文档导入

网页抓取

API对接

手动录入

文本提取

知识抽取

向量化

索引构建

文档库

向量库

关系库

智能检索

知识问答

知识推荐

知识图谱

1.3 核心功能规划

功能模块 核心能力 技术实现
知识采集 多源数据导入 文档解析 + 网页抓取
知识处理 结构化抽取 NLP + 知识抽取
智能检索 语义搜索 向量检索 + 排序
知识问答 自然语言问答 RAG + LLM
知识图谱 关系可视化 图数据库 + 可视化

2. 知识采集模块

2.1 多源数据导入

from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from dataclasses import dataclass
import os
import re

@dataclass
class KnowledgeDocument:
    """知识文档"""
    id: str
    title: str
    content: str
    source: str
    source_type: str
    metadata: Dict
    created_at: float
    updated_at: float

class DataImporter(ABC):
    """数据导入器基类"""
    
    @abstractmethod
    def import_data(self, source: str) -> List[KnowledgeDocument]:
        """导入数据"""
        pass
    
    @abstractmethod
    def supported_formats(self) -> List[str]:
        """支持的格式"""
        pass

class DocumentImporter(DataImporter):
    """文档导入器"""
    
    def __init__(self):
        self.parsers = {
            '.txt': self._parse_txt,
            '.md': self._parse_markdown,
            '.pdf': self._parse_pdf,
            '.docx': self._parse_docx,
            '.html': self._parse_html
        }
    
    def import_data(self, source: str) -> List[KnowledgeDocument]:
        """导入文档"""
        documents = []
        
        if os.path.isfile(source):
            doc = self._import_file(source)
            if doc:
                documents.append(doc)
        elif os.path.isdir(source):
            documents = self._import_directory(source)
        
        return documents
    
    def _import_file(self, file_path: str) -> Optional[KnowledgeDocument]:
        """导入单个文件"""
        ext = os.path.splitext(file_path)[1].lower()
        
        if ext not in self.parsers:
            return None
        
        parser = self.parsers[ext]
        content, metadata = parser(file_path)
        
        return KnowledgeDocument(
            id=f"doc_{int(time.time() * 1000)}",
            title=metadata.get('title', os.path.basename(file_path)),
            content=content,
            source=file_path,
            source_type='file',
            metadata=metadata,
            created_at=time.time(),
            updated_at=time.time()
        )
    
    def _import_directory(self, dir_path: str) -> List[KnowledgeDocument]:
        """导入目录"""
        documents = []
        
        for root, _, files in os.walk(dir_path):
            for file in files:
                file_path = os.path.join(root, file)
                doc = self._import_file(file_path)
                if doc:
                    documents.append(doc)
        
        return documents
    
    def _parse_txt(self, file_path: str) -> tuple:
        """解析TXT文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        return content, {'format': 'txt'}
    
    def _parse_markdown(self, file_path: str) -> tuple:
        """解析Markdown文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 提取标题
        title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
        title = title_match.group(1) if title_match else os.path.basename(file_path)
        
        return content, {'format': 'markdown', 'title': title}
    
    def _parse_pdf(self, file_path: str) -> tuple:
        """解析PDF文件"""
        # 需要安装 PyPDF2 或 pdfplumber
        # 这里简化实现
        return "PDF内容...", {'format': 'pdf'}
    
    def _parse_docx(self, file_path: str) -> tuple:
        """解析Word文档"""
        # 需要安装 python-docx
        # 这里简化实现
        return "Word内容...", {'format': 'docx'}
    
    def _parse_html(self, file_path: str) -> tuple:
        """解析HTML文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 提取正文(简化实现)
        # 实际应使用 BeautifulSoup
        text = re.sub(r'<[^>]+>', '', content)
        
        return text, {'format': 'html'}
    
    def supported_formats(self) -> List[str]:
        return list(self.parsers.keys())

class WebScraper(DataImporter):
    """网页抓取器"""
    
    def __init__(self):
        self.session = None
    
    def import_data(self, source: str) -> List[KnowledgeDocument]:
        """抓取网页"""
        documents = []
        
        # 使用OpenClaw的web_fetch功能
        # 这里简化实现
        content = self._fetch_page(source)
        
        if content:
            documents.append(KnowledgeDocument(
                id=f"web_{int(time.time() * 1000)}",
                title=self._extract_title(content),
                content=content,
                source=source,
                source_type='web',
                metadata={'url': source},
                created_at=time.time(),
                updated_at=time.time()
            ))
        
        return documents
    
    def _fetch_page(self, url: str) -> Optional[str]:
        """获取页面内容"""
        # 使用 web_fetch 工具
        # result = web_fetch(url=url)
        # return result.get('content')
        return "网页内容..."
    
    def _extract_title(self, content: str) -> str:
        """提取标题"""
        match = re.search(r'<title>(.+?)</title>', content)
        return match.group(1) if match else "无标题"
    
    def supported_formats(self) -> List[str]:
        return ['http', 'https']

# 使用示例
doc_importer = DocumentImporter()
web_scraper = WebScraper()

# 导入文档
docs = doc_importer.import_data("/path/to/documents")
print(f"导入 {len(docs)} 个文档")

# 抓取网页
web_docs = web_scraper.import_data("https://example.com/article")
print(f"抓取 {len(web_docs)} 个网页")

2.2 知识抽取与结构化

from typing import List, Dict, Tuple
import re

class KnowledgeExtractor:
    """知识抽取器"""
    
    def __init__(self):
        self.entity_patterns = self._define_entity_patterns()
        self.relation_patterns = self._define_relation_patterns()
    
    def extract(self, document: KnowledgeDocument) -> Dict:
        """
        抽取知识
        
        Args:
            document: 知识文档
        
        Returns:
            抽取结果
        """
        content = document.content
        
        # 实体抽取
        entities = self._extract_entities(content)
        
        # 关系抽取
        relations = self._extract_relations(content, entities)
        
        # 关键词抽取
        keywords = self._extract_keywords(content)
        
        # 摘要生成
        summary = self._generate_summary(content)
        
        # 问题生成
        questions = self._generate_questions(content)
        
        return {
            "document_id": document.id,
            "entities": entities,
            "relations": relations,
            "keywords": keywords,
            "summary": summary,
            "questions": questions
        }
    
    def _define_entity_patterns(self) -> Dict:
        """定义实体模式"""
        return {
            "person": [
                r'[\u4e00-\u9fa5]{2,4}(?=说|表示|认为|指出)',
            ],
            "organization": [
                r'[\u4e00-\u9fa5]+公司',
                r'[\u4e00-\u9fa5]+集团',
                r'[\u4e00-\u9fa5]+部门',
            ],
            "location": [
                r'[\u4e00-\u9fa5]+省',
                r'[\u4e00-\u9fa5]+市',
                r'[\u4e00-\u9fa5]+区',
            ],
            "date": [
                r'\d{4}年\d{1,2}月\d{1,2}日',
                r'\d{4}-\d{2}-\d{2}',
            ],
            "money": [
                r'\d+万?元',
                r'\d+\.?\d*万美元',
            ]
        }
    
    def _define_relation_patterns(self) -> List:
        """定义关系模式"""
        return [
            {
                "pattern": r'(.+?)是(.+?)的(.+)',
                "relation": "属性关系"
            },
            {
                "pattern": r'(.+?)属于(.+)',
                "relation": "归属关系"
            },
            {
                "pattern": r'(.+?)负责(.+)',
                "relation": "责任关系"
            }
        ]
    
    def _extract_entities(self, content: str) -> List[Dict]:
        """抽取实体"""
        entities = []
        
        for entity_type, patterns in self.entity_patterns.items():
            for pattern in patterns:
                matches = re.finditer(pattern, content)
                for match in matches:
                    entities.append({
                        "type": entity_type,
                        "text": match.group(),
                        "start": match.start(),
                        "end": match.end()
                    })
        
        return entities
    
    def _extract_relations(self, content: str, entities: List[Dict]) -> List[Dict]:
        """抽取关系"""
        relations = []
        
        for rel_pattern in self.relation_patterns:
            matches = re.finditer(rel_pattern["pattern"], content)
            for match in matches:
                relations.append({
                    "type": rel_pattern["relation"],
                    "subject": match.group(1) if len(match.groups()) > 0 else None,
                    "predicate": rel_pattern["relation"],
                    "object": match.group(2) if len(match.groups()) > 1 else None,
                    "text": match.group()
                })
        
        return relations
    
    def _extract_keywords(self, content: str, top_k: int = 10) -> List[str]:
        """抽取关键词"""
        # 简化实现:基于词频
        # 实际应使用 TF-IDF 或 TextRank
        
        # 移除标点和空白
        text = re.sub(r'[^\w\s\u4e00-\u9fa5]', '', content)
        
        # 分词(简化)
        words = list(text)
        
        # 统计词频
        word_freq = {}
        for word in words:
            if len(word) > 1:  # 过滤单字
                word_freq[word] = word_freq.get(word, 0) + 1
        
        # 排序
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
        
        return [word for word, _ in sorted_words[:top_k]]
    
    def _generate_summary(self, content: str, max_length: int = 200) -> str:
        """生成摘要"""
        # 简化实现:取前N字
        # 实际应使用抽取式或生成式摘要
        
        sentences = content.split('。')
        
        summary = ""
        for sentence in sentences:
            if len(summary) + len(sentence) <= max_length:
                summary += sentence + "。"
            else:
                break
        
        return summary
    
    def _generate_questions(self, content: str) -> List[str]:
        """生成问题"""
        questions = []
        
        # 基于标题生成问题
        title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
        if title_match:
            title = title_match.group(1)
            questions.append(f"什么是{title}?")
        
        # 基于内容生成问题
        # 简化实现,实际应使用问答生成模型
        
        return questions

# 使用示例
extractor = KnowledgeExtractor()

doc = KnowledgeDocument(
    id="doc_001",
    title="OpenClaw介绍",
    content="# OpenClaw介绍\n\nOpenClaw是一个强大的AI Agent框架...",
    source="manual",
    source_type="manual",
    metadata={},
    created_at=time.time(),
    updated_at=time.time()
)

result = extractor.extract(doc)
print(f"实体: {result['entities']}")
print(f"关键词: {result['keywords']}")
print(f"摘要: {result['summary']}")

3. 知识存储与索引

3.1 向量化存储

from typing import List, Dict, Optional
import numpy as np
from dataclasses import dataclass

@dataclass
class VectorRecord:
    """向量记录"""
    id: str
    vector: np.ndarray
    metadata: Dict

class VectorStore:
    """向量存储"""
    
    def __init__(self, dimension: int = 768):
        self.dimension = dimension
        self.records: Dict[str, VectorRecord] = {}
        self.index = None  # 实际应使用 FAISS 或 Milvus
    
    def add(self, id: str, vector: np.ndarray, metadata: Dict = None):
        """添加向量"""
        if len(vector) != self.dimension:
            raise ValueError(f"向量维度不匹配: {len(vector)} != {self.dimension}")
        
        self.records[id] = VectorRecord(
            id=id,
            vector=vector,
            metadata=metadata or {}
        )
    
    def batch_add(self, records: List[Tuple[str, np.ndarray, Dict]]):
        """批量添加"""
        for id, vector, metadata in records:
            self.add(id, vector, metadata)
    
    def search(self, query_vector: np.ndarray, top_k: int = 10) -> List[Dict]:
        """
        搜索相似向量
        
        Args:
            query_vector: 查询向量
            top_k: 返回数量
        
        Returns:
            搜索结果
        """
        results = []
        
        for record in self.records.values():
            # 计算余弦相似度
            similarity = self._cosine_similarity(query_vector, record.vector)
            results.append({
                "id": record.id,
                "score": similarity,
                "metadata": record.metadata
            })
        
        # 排序
        results.sort(key=lambda x: x["score"], reverse=True)
        
        return results[:top_k]
    
    def _cosine_similarity(self, v1: np.ndarray, v2: np.ndarray) -> float:
        """计算余弦相似度"""
        return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
    
    def delete(self, id: str):
        """删除向量"""
        if id in self.records:
            del self.records[id]
    
    def get(self, id: str) -> Optional[VectorRecord]:
        """获取向量"""
        return self.records.get(id)

class EmbeddingGenerator:
    """向量生成器"""
    
    def __init__(self, model_name: str = "default"):
        self.model_name = model_name
    
    def encode(self, text: str) -> np.ndarray:
        """
        生成文本向量
        
        Args:
            text: 输入文本
        
        Returns:
            向量
        """
        # 简化实现
        # 实际应调用 OpenClaw 的嵌入模型
        # 或使用 sentence-transformers
        
        # 返回随机向量作为示例
        return np.random.randn(768)
    
    def batch_encode(self, texts: List[str]) -> List[np.ndarray]:
        """批量生成向量"""
        return [self.encode(text) for text in texts]

# 使用示例
vector_store = VectorStore(dimension=768)
embedding_gen = EmbeddingGenerator()

# 添加文档向量
text = "OpenClaw是一个强大的AI Agent框架"
vector = embedding_gen.encode(text)
vector_store.add("doc_001", vector, {"text": text, "title": "OpenClaw介绍"})

# 搜索
query = "AI Agent框架"
query_vector = embedding_gen.encode(query)
results = vector_store.search(query_vector, top_k=5)
print(f"找到 {len(results)} 个相似文档")

3.2 知识索引构建

from typing import Dict, List, Set
from collections import defaultdict
import re

class KnowledgeIndex:
    """知识索引"""
    
    def __init__(self):
        self.keyword_index: Dict[str, Set[str]] = defaultdict(set)
        self.category_index: Dict[str, Set[str]] = defaultdict(set)
        self.tag_index: Dict[str, Set[str]] = defaultdict(set)
        self.documents: Dict[str, KnowledgeDocument] = {}
    
    def add_document(self, document: KnowledgeDocument, keywords: List[str] = None):
        """添加文档到索引"""
        doc_id = document.id
        self.documents[doc_id] = document
        
        # 关键词索引
        if keywords:
            for keyword in keywords:
                self.keyword_index[keyword.lower()].add(doc_id)
        else:
            # 自动提取关键词
            extracted_keywords = self._extract_keywords(document.content)
            for keyword in extracted_keywords:
                self.keyword_index[keyword.lower()].add(doc_id)
        
        # 分类索引
        category = document.metadata.get("category")
        if category:
            self.category_index[category].add(doc_id)
        
        # 标签索引
        tags = document.metadata.get("tags", [])
        for tag in tags:
            self.tag_index[tag].add(doc_id)
    
    def search_by_keyword(self, keyword: str) -> List[KnowledgeDocument]:
        """按关键词搜索"""
        doc_ids = self.keyword_index.get(keyword.lower(), set())
        return [self.documents[doc_id] for doc_id in doc_ids if doc_id in self.documents]
    
    def search_by_category(self, category: str) -> List[KnowledgeDocument]:
        """按分类搜索"""
        doc_ids = self.category_index.get(category, set())
        return [self.documents[doc_id] for doc_id in doc_ids if doc_id in self.documents]
    
    def search_by_tags(self, tags: List[str]) -> List[KnowledgeDocument]:
        """按标签搜索"""
        if not tags:
            return []
        
        # 取交集
        result_ids = None
        for tag in tags:
            tag_ids = self.tag_index.get(tag, set())
            if result_ids is None:
                result_ids = tag_ids.copy()
            else:
                result_ids &= tag_ids
        
        if result_ids is None:
            return []
        
        return [self.documents[doc_id] for doc_id in result_ids if doc_id in self.documents]
    
    def search(self, query: str) -> List[KnowledgeDocument]:
        """综合搜索"""
        # 分词
        words = re.findall(r'[\w\u4e00-\u9fa5]+', query.lower())
        
        # 收集匹配的文档
        matched_docs = defaultdict(int)
        for word in words:
            doc_ids = self.keyword_index.get(word, set())
            for doc_id in doc_ids:
                matched_docs[doc_id] += 1
        
        # 按匹配度排序
        sorted_doc_ids = sorted(matched_docs.keys(), key=lambda x: matched_docs[x], reverse=True)
        
        return [self.documents[doc_id] for doc_id in sorted_doc_ids if doc_id in self.documents]
    
    def _extract_keywords(self, content: str) -> List[str]:
        """提取关键词"""
        # 简化实现
        words = re.findall(r'[\w\u4e00-\u9fa5]+', content)
        return list(set(words))
    
    def get_statistics(self) -> Dict:
        """获取统计信息"""
        return {
            "total_documents": len(self.documents),
            "total_keywords": len(self.keyword_index),
            "total_categories": len(self.category_index),
            "total_tags": len(self.tag_index)
        }

# 使用示例
index = KnowledgeIndex()

doc1 = KnowledgeDocument(
    id="doc_001",
    title="OpenClaw入门指南",
    content="OpenClaw是一个强大的AI Agent框架...",
    source="manual",
    source_type="manual",
    metadata={"category": "教程", "tags": ["OpenClaw", "入门"]},
    created_at=time.time(),
    updated_at=time.time()
)

index.add_document(doc1)

# 搜索
results = index.search("OpenClaw 教程")
print(f"找到 {len(results)} 个文档")

4. 智能检索与问答

4.1 语义检索

from typing import List, Dict, Tuple
import numpy as np

class SemanticRetriever:
    """语义检索器"""
    
    def __init__(self, vector_store: VectorStore, embedding_gen: EmbeddingGenerator):
        self.vector_store = vector_store
        self.embedding_gen = embedding_gen
        self.documents: Dict[str, KnowledgeDocument] = {}
    
    def index_document(self, document: KnowledgeDocument):
        """索引文档"""
        # 分段
        chunks = self._chunk_document(document)
        
        # 向量化并存储
        for i, chunk in enumerate(chunks):
            vector = self.embedding_gen.encode(chunk)
            chunk_id = f"{document.id}_chunk_{i}"
            
            self.vector_store.add(chunk_id, vector, {
                "document_id": document.id,
                "chunk_index": i,
                "content": chunk
            })
        
        self.documents[document.id] = document
    
    def _chunk_document(self, document: KnowledgeDocument, chunk_size: int = 500) -> List[str]:
        """文档分段"""
        content = document.content
        
        # 按段落分割
        paragraphs = content.split('\n\n')
        
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            if len(current_chunk) + len(para) <= chunk_size:
                current_chunk += para + "\n\n"
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = para + "\n\n"
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        """
        检索相关内容
        
        Args:
            query: 查询文本
            top_k: 返回数量
        
        Returns:
            检索结果
        """
        # 向量化查询
        query_vector = self.embedding_gen.encode(query)
        
        # 向量搜索
        results = self.vector_store.search(query_vector, top_k=top_k * 2)
        
        # 重排序(简化)
        reranked_results = self._rerank(query, results)
        
        return reranked_results[:top_k]
    
    def _rerank(self, query: str, results: List[Dict]) -> List[Dict]:
        """重排序"""
        # 简化实现:保持原排序
        # 实际应使用重排序模型
        return results
    
    def hybrid_search(self, query: str, keyword_results: List, top_k: int = 5) -> List[Dict]:
        """
        混合检索
        
        Args:
            query: 查询文本
            keyword_results: 关键词检索结果
            top_k: 返回数量
        
        Returns:
            混合检索结果
        """
        # 语义检索
        semantic_results = self.retrieve(query, top_k=top_k)
        
        # 合并结果
        all_results = {}
        
        # 添加语义检索结果
        for result in semantic_results:
            doc_id = result["metadata"]["document_id"]
            if doc_id not in all_results:
                all_results[doc_id] = {
                    "document_id": doc_id,
                    "semantic_score": result["score"],
                    "keyword_score": 0,
                    "content": result["metadata"]["content"]
                }
        
        # 添加关键词检索结果
        for doc in keyword_results:
            if doc.id in all_results:
                all_results[doc.id]["keyword_score"] = 1.0
            else:
                all_results[doc.id] = {
                    "document_id": doc.id,
                    "semantic_score": 0,
                    "keyword_score": 1.0,
                    "content": doc.content[:500]
                }
        
        # 计算综合分数
        for doc_id in all_results:
            result = all_results[doc_id]
            result["final_score"] = 0.7 * result["semantic_score"] + 0.3 * result["keyword_score"]
        
        # 排序
        sorted_results = sorted(all_results.values(), key=lambda x: x["final_score"], reverse=True)
        
        return sorted_results[:top_k]

# 使用示例
retriever = SemanticRetriever(vector_store, embedding_gen)

# 索引文档
retriever.index_document(doc1)

# 检索
results = retriever.retrieve("如何使用OpenClaw")
for result in results:
    print(f"分数: {result['score']:.4f}")
    print(f"内容: {result['metadata']['content'][:100]}...\n")

4.2 知识问答

from typing import Dict, List, Optional

class KnowledgeQA:
    """知识问答"""
    
    def __init__(self, retriever: SemanticRetriever):
        self.retriever = retriever
    
    def answer(self, question: str, top_k: int = 3) -> Dict:
        """
        回答问题
        
        Args:
            question: 问题
            top_k: 检索数量
        
        Returns:
            回答结果
        """
        # 检索相关内容
        results = self.retriever.retrieve(question, top_k=top_k)
        
        if not results:
            return {
                "question": question,
                "answer": "抱歉,我没有找到相关信息。",
                "sources": [],
                "confidence": 0
            }
        
        # 构建上下文
        context = self._build_context(results)
        
        # 生成回答(使用OpenClaw)
        answer = self._generate_answer(question, context)
        
        # 计算置信度
        confidence = self._calculate_confidence(results)
        
        return {
            "question": question,
            "answer": answer,
            "sources": [
                {
                    "document_id": r["metadata"]["document_id"],
                    "content": r["metadata"]["content"][:200],
                    "score": r["score"]
                }
                for r in results
            ],
            "confidence": confidence
        }
    
    def _build_context(self, results: List[Dict]) -> str:
        """构建上下文"""
        context_parts = []
        
        for i, result in enumerate(results):
            content = result["metadata"]["content"]
            context_parts.append(f"[文档{i+1}]\n{content}")
        
        return "\n\n".join(context_parts)
    
    def _generate_answer(self, question: str, context: str) -> str:
        """生成回答"""
        # 使用OpenClaw生成回答
        # prompt = f"根据以下信息回答问题:\n\n{context}\n\n问题:{question}"
        # answer = openclaw.generate(prompt)
        
        # 简化实现
        return f"根据知识库信息,{question}的答案是..."
    
    def _calculate_confidence(self, results: List[Dict]) -> float:
        """计算置信度"""
        if not results:
            return 0
        
        # 基于检索分数计算
        top_score = results[0]["score"]
        
        # 归一化到0-1
        confidence = min(top_score, 1.0)
        
        return confidence
    
    def batch_answer(self, questions: List[str]) -> List[Dict]:
        """批量回答"""
        return [self.answer(q) for q in questions]

# 使用示例
qa = KnowledgeQA(retriever)

# 提问
result = qa.answer("OpenClaw是什么?")
print(f"问题: {result['question']}")
print(f"回答: {result['answer']}")
print(f"置信度: {result['confidence']:.2f}")
print(f"来源: {len(result['sources'])} 个文档")

5. 知识图谱

5.1 图谱构建

from typing import Dict, List, Set
from dataclasses import dataclass

@dataclass
class Entity:
    """实体"""
    id: str
    name: str
    type: str
    properties: Dict

@dataclass
class Relation:
    """关系"""
    id: str
    source_id: str
    target_id: str
    relation_type: str
    properties: Dict

class KnowledgeGraph:
    """知识图谱"""
    
    def __init__(self):
        self.entities: Dict[str, Entity] = {}
        self.relations: Dict[str, Relation] = {}
        self.entity_index: Dict[str, Set[str]] = {}  # 名称到ID的索引
    
    def add_entity(self, entity: Entity):
        """添加实体"""
        self.entities[entity.id] = entity
        
        # 更新索引
        if entity.name not in self.entity_index:
            self.entity_index[entity.name] = set()
        self.entity_index[entity.name].add(entity.id)
    
    def add_relation(self, relation: Relation):
        """添加关系"""
        self.relations[relation.id] = relation
    
    def get_entity(self, entity_id: str) -> Optional[Entity]:
        """获取实体"""
        return self.entities.get(entity_id)
    
    def get_entity_by_name(self, name: str) -> List[Entity]:
        """按名称获取实体"""
        entity_ids = self.entity_index.get(name, set())
        return [self.entities[eid] for eid in entity_ids if eid in self.entities]
    
    def get_relations(self, entity_id: str, direction: str = "both") -> List[Relation]:
        """
        获取实体的关系
        
        Args:
            entity_id: 实体ID
            direction: 方向 (in/out/both)
        
        Returns:
            关系列表
        """
        result = []
        
        for relation in self.relations.values():
            if direction == "out" and relation.source_id == entity_id:
                result.append(relation)
            elif direction == "in" and relation.target_id == entity_id:
                result.append(relation)
            elif direction == "both":
                if relation.source_id == entity_id or relation.target_id == entity_id:
                    result.append(relation)
        
        return result
    
    def find_path(self, start_id: str, end_id: str, max_depth: int = 5) -> List[List[str]]:
        """
        查找路径
        
        Args:
            start_id: 起始实体ID
            end_id: 目标实体ID
            max_depth: 最大深度
        
        Returns:
            路径列表
        """
        paths = []
        self._dfs(start_id, end_id, [start_id], paths, max_depth)
        return paths
    
    def _dfs(self, current: str, target: str, path: List[str], paths: List, max_depth: int):
        """深度优先搜索"""
        if len(path) > max_depth:
            return
        
        if current == target:
            paths.append(path.copy())
            return
        
        # 获取相邻实体
        relations = self.get_relations(current, "out")
        
        for relation in relations:
            next_entity = relation.target_id
            if next_entity not in path:  # 避免循环
                path.append(next_entity)
                self._dfs(next_entity, target, path, paths, max_depth)
                path.pop()
    
    def get_neighbors(self, entity_id: str, depth: int = 1) -> Dict:
        """
        获取邻居
        
        Args:
            entity_id: 实体ID
            depth: 深度
        
        Returns:
            邻居图
        """
        visited = set()
        entities = []
        relations = []
        
        self._bfs(entity_id, depth, visited, entities, relations)
        
        return {
            "entities": entities,
            "relations": relations
        }
    
    def _bfs(self, start: str, depth: int, visited: Set, entities: List, relations: List):
        """广度优先搜索"""
        queue = [(start, 0)]
        visited.add(start)
        
        while queue:
            current, d = queue.pop(0)
            
            if d > depth:
                continue
            
            # 添加实体
            if current in self.entities:
                entities.append(self.entities[current])
            
            # 获取关系
            for relation in self.get_relations(current, "out"):
                relations.append(relation)
                
                next_entity = relation.target_id
                if next_entity not in visited:
                    visited.add(next_entity)
                    queue.append((next_entity, d + 1))

# 使用示例
kg = KnowledgeGraph()

# 添加实体
kg.add_entity(Entity(
    id="ent_001",
    name="OpenClaw",
    type="产品",
    properties={"描述": "AI Agent框架"}
))

kg.add_entity(Entity(
    id="ent_002",
    name="张龙生",
    type="人物",
    properties={"角色": "开发者"}
))

# 添加关系
kg.add_relation(Relation(
    id="rel_001",
    source_id="ent_002",
    target_id="ent_001",
    relation_type="开发",
    properties={}
))

# 查询
relations = kg.get_relations("ent_002")
print(f"张龙生的关系: {len(relations)} 个")

6. 最佳实践

6.1 系统设计原则

原则 说明 实践
可扩展 支持多数据源 插件式导入器
高性能 快速检索 向量索引 + 缓存
准确性 精准回答 混合检索 + 重排序
可解释 来源可追溯 引用来源展示

6.2 常见问题

问题 原因 解决方案
检索不准 向量质量差 优化嵌入模型
回答错误 知识不完整 补充知识库
性能慢 索引未优化 优化索引结构

7. 总结

7.1 核心要点

本文通过完整的知识库系统案例,展示了 OpenClaw 在企业知识管理场景的应用:

模块 核心功能 技术要点
知识采集 多源导入 文档解析 + 网页抓取
知识处理 结构化抽取 NLP + 实体关系
知识存储 向量化存储 向量数据库
智能检索 语义搜索 向量检索 + 混合
知识问答 RAG问答 检索 + 生成

7.2 下一步学习

  • 第74篇:OpenClaw 实战案例:自动化运维系统

参考资料


Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐