AgentScope扩展开发:自定义工具创建

【免费下载链接】agentscope 【免费下载链接】agentscope 项目地址: https://gitcode.com/GitHub_Trending/ag/agentscope

概述

在AgentScope框架中,工具(Tool)是智能体执行具体任务的核心组件。通过自定义工具,开发者可以扩展智能体的能力范围,使其能够处理特定领域的任务。本文将深入探讨如何在AgentScope中创建和使用自定义工具。

工具系统架构

AgentScope的工具系统采用模块化设计,支持同步/异步函数、流式/非流式返回,并提供完整的工具管理功能。

核心组件

mermaid

基础工具创建

同步工具函数

最简单的工具是同步执行的函数,返回ToolResponse对象:

from agentscope.tool import ToolResponse
from agentscope.message import TextBlock

def calculate_sum(a: int, b: int) -> ToolResponse:
    """计算两个整数的和
    
    Args:
        a (int): 第一个整数
        b (int): 第二个整数
        
    Returns:
        ToolResponse: 包含计算结果的响应
    """
    result = a + b
    return ToolResponse(
        content=[
            TextBlock(
                type="text",
                text=f"计算结果: {a} + {b} = {result}"
            )
        ]
    )

异步工具函数

对于需要I/O操作的工具,推荐使用异步函数:

import asyncio
from agentscope.tool import ToolResponse
from agentscope.message import TextBlock

async def fetch_data_from_api(url: str, timeout: int = 30) -> ToolResponse:
    """从API获取数据
    
    Args:
        url (str): API端点URL
        timeout (int): 请求超时时间,默认30秒
        
    Returns:
        ToolResponse: 包含API响应数据的工具响应
    """
    import aiohttp
    
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=timeout) as response:
                if response.status == 200:
                    data = await response.json()
                    return ToolResponse(
                        content=[
                            TextBlock(
                                type="text",
                                text=f"API响应: {data}"
                            )
                        ]
                    )
                else:
                    return ToolResponse(
                        content=[
                            TextBlock(
                                type="text",
                                text=f"请求失败,状态码: {response.status}"
                            )
                        ]
                    )
    except Exception as e:
        return ToolResponse(
            content=[
                TextBlock(
                    type="text",
                    text=f"请求异常: {str(e)}"
                )
            ]
        )

流式工具实现

同步流式工具

对于需要逐步返回结果的工具,可以使用生成器实现流式返回:

from agentscope.tool import ToolResponse
from agentscope.message import TextBlock
from typing import Generator

def process_large_data(data: list, chunk_size: int = 100) -> Generator[ToolResponse, None, None]:
    """处理大数据集并流式返回结果
    
    Args:
        data (list): 要处理的数据列表
        chunk_size (int): 每批处理的数据量
        
    Returns:
        Generator[ToolResponse]: 流式工具响应生成器
    """
    total = len(data)
    for i in range(0, total, chunk_size):
        chunk = data[i:i + chunk_size]
        processed_chunk = [item * 2 for item in chunk]  # 示例处理逻辑
        
        yield ToolResponse(
            content=[
                TextBlock(
                    type="text",
                    text=f"处理进度: {min(i + chunk_size, total)}/{total}\n"
                         f"当前批次结果: {processed_chunk}"
                )
            ],
            stream=True,
            is_last=(i + chunk_size >= total)
        )

异步流式工具

异步流式工具适用于需要异步处理并逐步返回的场景:

import asyncio
from agentscope.tool import ToolResponse
from agentscope.message import TextBlock
from typing import AsyncGenerator

async def async_data_processor(data: list, delay: float = 0.1) -> AsyncGenerator[ToolResponse, None]:
    """异步处理数据并流式返回
    
    Args:
        data (list): 要处理的数据
        delay (float): 每个项目的处理延迟
        
    Returns:
        AsyncGenerator[ToolResponse]: 异步流式响应生成器
    """
    for i, item in enumerate(data):
        # 模拟异步处理
        await asyncio.sleep(delay)
        processed_item = f"处理结果_{i}: {item * 3}"
        
        yield ToolResponse(
            content=[
                TextBlock(
                    type="text",
                    text=processed_item
                )
            ],
            stream=True,
            is_last=(i == len(data) - 1)
        )

工具注册与管理

基本工具注册

from agentscope.tool import Toolkit

# 创建工具包实例
toolkit = Toolkit()

# 注册基本工具
toolkit.register_tool_function(calculate_sum)
toolkit.register_tool_function(fetch_data_from_api)

# 注册流式工具
toolkit.register_tool_function(process_large_data)
toolkit.register_tool_function(async_data_processor)

工具分组管理

对于复杂的工具集合,可以使用工具分组进行组织:

# 创建数学工具组
toolkit.create_tool_group(
    group_name="math_tools",
    description="数学计算相关工具",
    active=True,
    notes="使用数学工具时请确保输入参数为有效数值"
)

# 创建数据处理工具组  
toolkit.create_tool_group(
    group_name="data_processing",
    description="数据处理和分析工具",
    active=False,
    notes="大数据处理工具可能需要较长时间执行"
)

# 将工具注册到特定分组
toolkit.register_tool_function(
    calculate_sum,
    group_name="math_tools",
    func_description="计算两个整数的加法结果"
)

toolkit.register_tool_function(
    process_large_data, 
    group_name="data_processing",
    func_description="批量处理大型数据集并流式返回结果"
)

预设参数配置

可以为工具预设一些参数,这些参数不会暴露给智能体:

# 预设API密钥和其他配置参数
toolkit.register_tool_function(
    fetch_data_from_api,
    preset_kwargs={
        "timeout": 60,
        "headers": {"Authorization": "Bearer your_api_key"}
    },
    func_description="从指定URL获取数据"
)

高级工具特性

工具后处理函数

后处理函数允许在工具执行完成后对结果进行额外处理:

from agentscope.message import ToolUseBlock

def postprocess_tool_result(
    tool_call: ToolUseBlock,
    tool_response: ToolResponse
) -> ToolResponse:
    """工具结果后处理函数
    
    Args:
        tool_call: 工具调用信息
        tool_response: 原始工具响应
        
    Returns:
        处理后的工具响应
    """
    # 添加执行时间戳信息
    import time
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    
    # 在原有内容基础上添加元数据
    new_content = tool_response.content + [
        TextBlock(
            type="text", 
            text=f"\n执行时间: {timestamp}\n工具名称: {tool_call['name']}"
        )
    ]
    
    return ToolResponse(
        content=new_content,
        metadata=tool_response.metadata or {},
        stream=tool_response.stream,
        is_last=tool_response.is_last,
        is_interrupted=tool_response.is_interrupted
    )

# 注册带后处理的工具
toolkit.register_tool_function(
    calculate_sum,
    postprocess_func=postprocess_tool_result
)

自定义JSON Schema

对于需要精确控制参数格式的工具,可以手动指定JSON Schema:

custom_schema = {
    "type": "function",
    "function": {
        "name": "advanced_calculation",
        "description": "执行高级数学计算",
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "description": "计算操作类型",
                    "enum": ["add", "subtract", "multiply", "divide"]
                },
                "values": {
                    "type": "array",
                    "description": "要计算的数值数组",
                    "items": {"type": "number"}
                },
                "precision": {
                    "type": "integer", 
                    "description": "结果精度(小数位数)",
                    "minimum": 0,
                    "maximum": 10
                }
            },
            "required": ["operation", "values"]
        }
    }
}

def advanced_calculation(operation: str, values: list, precision: int = 2) -> ToolResponse:
    """高级数学计算工具"""
    # 实现计算逻辑
    pass

toolkit.register_tool_function(
    advanced_calculation,
    json_schema=custom_schema
)

工具使用最佳实践

错误处理与重试机制

from typing import Optional
import tenacity

@tenacity.retry(
    stop=tenacity.stop_after_attempt(3),
    wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
    retry=tenacity.retry_if_exception_type((ConnectionError, TimeoutError))
)
async def robust_api_call(
    url: str,
    method: str = "GET",
    payload: Optional[dict] = None,
    timeout: int = 30
) -> ToolResponse:
    """带重试机制的API调用工具
    
    Args:
        url: API端点
        method: HTTP方法
        payload: 请求负载
        timeout: 超时时间
        
    Returns:
        工具响应
    """
    import aiohttp
    
    try:
        async with aiohttp.ClientSession() as session:
            async with session.request(
                method=method,
                url=url,
                json=payload,
                timeout=timeout
            ) as response:
                response.raise_for_status()
                data = await response.json()
                
                return ToolResponse(
                    content=[TextBlock(type="text", text=f"成功: {data}")],
                    metadata={"status_code": response.status}
                )
                
    except Exception as e:
        return ToolResponse(
            content=[TextBlock(type="text", text=f"失败: {str(e)}")],
            metadata={"error": str(e)}
        )

性能监控工具

import time
from functools import wraps

def monitor_performance(func):
    """工具性能监控装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        
        try:
            result = await func(*args, **kwargs)
            execution_time = time.time() - start_time
            
            # 添加性能元数据
            if hasattr(result, 'metadata'):
                result.metadata = result.metadata or {}
                result.metadata.update({
                    "execution_time": execution_time,
                    "performance": "good" if execution_time < 1.0 else "slow"
                })
            else:
                result.metadata = {
                    "execution_time": execution_time,
                    "performance": "good" if execution_time < 1.0 else "slow"
                }
                
            return result
            
        except Exception as e:
            execution_time = time.time() - start_time
            return ToolResponse(
                content=[TextBlock(type="text", text=f"工具执行失败: {str(e)}")],
                metadata={
                    "execution_time": execution_time,
                    "error": str(e),
                    "performance": "error"
                }
            )
    
    return wrapper

@monitor_performance
async def monitored_tool_function(param: str) -> ToolResponse:
    """带性能监控的工具函数"""
    await asyncio.sleep(0.5)  # 模拟工作负载
    return ToolResponse(
        content=[TextBlock(type="text", text=f"处理结果: {param.upper()}")]
    )

工具测试与调试

单元测试示例

import pytest
from agentscope.tool import Toolkit

class TestCustomTools:
    """自定义工具测试类"""
    
    def setup_method(self):
        self.toolkit = Toolkit()
        self.toolkit.register_tool_function(calculate_sum)
    
    @pytest.mark.asyncio
    async def test_calculate_sum(self):
        """测试加法工具"""
        from agentscope.message import ToolUseBlock
        
        tool_call = ToolUseBlock(
            name="calculate_sum",
            input={"a": 5, "b": 3}
        )
        
        async for response in self.toolkit.call_tool_function(tool_call):
            assert "8" in response.content[0].text
            assert not response.is_interrupted
    
    @pytest.mark.asyncio 
    async def test_calculate_sum_negative(self):
        """测试负数加法"""
        from agentscope.message import ToolUseBlock
        
        tool_call = ToolUseBlock(
            name="calculate_sum", 
            input={"a": -5, "b": 3}
        )
        
        async for response in self.toolkit.call_tool_function(tool_call):
            assert "-2" in response.content[0].text

调试技巧

# 启用详细日志记录
import logging
logging.basicConfig(level=logging.DEBUG)

# 工具执行跟踪
def debug_tool_execution(func):
    """工具执行调试装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        print(f"调用工具: {func.__name__}")
        print(f"参数: args={args}, kwargs={kwargs}")
        
        try:
            result = await func(*args, **kwargs)
            print(f"工具执行成功: {result}")
            return result
        except Exception as e:
            print(f"工具执行失败: {e}")
            raise
    
    return wrapper

总结

AgentScope的自定义工具系统提供了强大的扩展能力,支持从简单的同步函数到复杂的异步流式处理。通过合理的工具设计、分组管理和错误处理,可以构建出健壮且高效的智能体工具生态系统。

关键要点

特性 说明 适用场景
同步工具 简单直接,易于实现 计算密集型任务
异步工具 非阻塞执行,提高并发性 I/O密集型任务
流式工具 逐步返回结果,实时反馈 大数据处理、长时任务
工具分组 逻辑组织,按需激活 复杂工具集合管理
后处理函数 结果加工,统一格式化 结果标准化、元数据添加
预设参数 隐藏敏感配置,简化调用 API密钥、固定配置

通过掌握这些工具开发技巧,您可以为AgentScope智能体创建各种强大的自定义能力,满足不同应用场景的需求。

【免费下载链接】agentscope 【免费下载链接】agentscope 项目地址: https://gitcode.com/GitHub_Trending/ag/agentscope

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐