5步打造专属嵌入模型:AgentScope扩展开发实战指南

【免费下载链接】agentscope 【免费下载链接】agentscope 项目地址: https://gitcode.com/GitHub_Trending/ag/agentscope

你是否在使用AgentScope时受限于内置嵌入模型?想接入企业私有向量服务或优化特定场景下的文本表征能力?本文将通过5个实战步骤,带你从零构建自定义嵌入模型,解决模型适配难题,提升向量计算效率。读完本文你将掌握:基础类继承规范、异步调用实现、缓存机制集成、多模态扩展技巧以及完整测试流程。

核心概念与架构解析

嵌入模型(Embedding Model)是将文本、图像等非结构化数据转换为向量表示的关键组件。在AgentScope中,所有嵌入模型需遵循src/agentscope/embedding/_embedding_base.py定义的抽象接口,该架构确保了不同模型间的兼容性和可替换性。

嵌入模型架构

核心基类EmbeddingModelBase包含三个必须实现的要素:

  • model_name: 模型标识字符串
  • dimensions: 输出向量维度
  • __call__: 异步嵌入计算方法

官方已实现的模型包括:

步骤1:创建基础模型类

首先创建自定义模型类文件src/agentscope/embedding/_custom_embedding.py,继承EmbeddingModelBase并实现核心属性:

from typing import Any, List
from ._embedding_base import EmbeddingModelBase
from ._embedding_response import EmbeddingResponse

class CustomEmbedding(EmbeddingModelBase):
    supported_modalities: list[str] = ["text"]  # 支持的输入类型
    
    def __init__(
        self,
        api_key: str,
        model_name: str = "custom-embedding-v1",
        dimensions: int = 768,
        **kwargs: Any,
    ) -> None:
        super().__init__(model_name, dimensions)
        self.api_key = api_key
        self.client = self._init_client(** kwargs)  # 初始化模型客户端

步骤2:实现异步调用逻辑

重点实现__call__方法处理输入并返回标准EmbeddingResponse。参考OpenAI实现的异步处理模式:

async def __call__(
    self,
    text: List[str | TextBlock],
    **kwargs: Any,
) -> EmbeddingResponse:
    # 1. 输入标准化处理
    processed_texts = self._preprocess_input(text)
    
    # 2. API调用逻辑
    start_time = datetime.now()
    response = await self.client.embed(
        input=processed_texts,
        model=self.model_name,
        **kwargs
    )
    latency = (datetime.now() - start_time).total_seconds()
    
    # 3. 结果格式化
    return EmbeddingResponse(
        embeddings=[item["vector"] for item in response["data"]],
        usage=EmbeddingUsage(
            tokens=response["usage"]["total_tokens"],
            time=latency
        )
    )
    
def _preprocess_input(self, text: List[str | TextBlock]) -> List[str]:
    """标准化输入格式为纯文本列表"""
    processed = []
    for item in text:
        if isinstance(item, TextBlock):
            processed.append(item.content)
        elif isinstance(item, str):
            processed.append(item)
        else:
            raise ValueError(f"不支持的输入类型: {type(item)}")
    return processed

步骤3:集成缓存机制

为避免重复计算,通过EmbeddingCacheBase添加缓存功能。修改__init__方法引入缓存参数:

from ._cache_base import EmbeddingCacheBase

def __init__(
    self,
    api_key: str,
    model_name: str = "custom-embedding-v1",
    dimensions: int = 768,
    embedding_cache: EmbeddingCacheBase | None = None,
    **kwargs: Any,
) -> None:
    super().__init__(model_name, dimensions)
    self.api_key = api_key
    self.client = self._init_client(** kwargs)
    self.embedding_cache = embedding_cache  # 缓存实例

__call__方法中添加缓存逻辑:

async def __call__(self, text: List[str | TextBlock], **kwargs: Any) -> EmbeddingResponse:
    # 生成缓存键
    cache_key = {"text": text, "model": self.model_name, **kwargs}
    
    # 尝试从缓存获取
    if self.embedding_cache:
        cached = await self.embedding_cache.retrieve(identifier=cache_key)
        if cached:
            return EmbeddingResponse(
                embeddings=cached,
                usage=EmbeddingUsage(tokens=0, time=0),
                source="cache"
            )
    
    # 实际计算(省略原有代码)...
    result = EmbeddingResponse(...)
    
    # 存入缓存
    if self.embedding_cache:
        await self.embedding_cache.store(
            identifier=cache_key,
            embeddings=result.embeddings
        )
    
    return result

步骤4:多模态支持扩展

如需支持图像等非文本输入,修改supported_modalities并扩展输入处理逻辑:

class CustomMultimodalEmbedding(EmbeddingModelBase):
    supported_modalities: list[str] = ["text", "image"]  # 支持文本和图像
    
    async def __call__(
        self, 
        inputs: List[dict],  # 格式: {"type": "text", "content": "..."} 或 {"type": "image", "url": "..."}
        **kwargs: Any
    ) -> EmbeddingResponse:
        processed_inputs = []
        for item in inputs:
            if item["type"] == "text":
                processed_inputs.append(self._process_text(item["content"]))
            elif item["type"] == "image":
                processed_inputs.append(self._process_image(item["url"]))
        
        # 调用多模态嵌入API...

步骤5:注册与测试验证

模型注册

src/agentscope/embedding/init.py中添加导出:

from ._custom_embedding import CustomEmbedding, CustomMultimodalEmbedding

__all__ = [
    # ... 原有模型
    "CustomEmbedding",
    "CustomMultimodalEmbedding",
]

使用示例

创建测试文件examples/functionality/embedding/custom_embedding_demo.py

import asyncio
from agentscope.embedding import CustomEmbedding
from agentscope.embedding._file_cache import FileEmbeddingCache

async def main():
    # 初始化带文件缓存的嵌入模型
    cache = FileEmbeddingCache(cache_dir="./embedding_cache")
    embedding = CustomEmbedding(
        api_key="your-api-key",
        model_name="custom-embedding-v1",
        dimensions=768,
        embedding_cache=cache
    )
    
    # 计算嵌入
    texts = ["AgentScope自定义嵌入开发", "扩展AI能力边界"]
    result = await embedding(texts)
    
    print(f"嵌入结果: {result.embeddings}")
    print(f"使用统计: {result.usage}")

if __name__ == "__main__":
    asyncio.run(main())

运行测试

执行测试命令验证功能:

python examples/functionality/embedding/custom_embedding_demo.py

查看缓存文件生成情况:

ls ./embedding_cache

高级优化与最佳实践

性能优化

1.** 批量处理 :参考OpenAI实现的批量大小限制处理 2. 异步并发 :使用asyncio.gather并行处理多个嵌入请求 3. 超时控制 **:添加API调用超时处理避免长期阻塞

错误处理

实现完善的异常处理机制:

from agentscope.exception import EmbeddingError

async def __call__(self, text: List[str | TextBlock], **kwargs: Any) -> EmbeddingResponse:
    try:
        # API调用逻辑
    except Exception as e:
        raise EmbeddingError(f"自定义嵌入计算失败: {str(e)}") from e

总结与扩展

通过本文介绍的5个步骤,你已掌握AgentScope嵌入模型的扩展方法。该模式同样适用于其他组件扩展,如自定义工具模型格式化器等。完整开发规范可参考官方文档

AgentScope扩展生态

后续可探索的方向:

如果你开发了有用的扩展,欢迎通过GitHub Issues提交贡献!

本文代码示例已同步至examples/functionality/embedding目录,可直接运行体验。

【免费下载链接】agentscope 【免费下载链接】agentscope 项目地址: https://gitcode.com/GitHub_Trending/ag/agentscope

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐