AgentScope扩展开发:自定义工具创建
在AgentScope框架中,工具(Tool)是智能体执行具体任务的核心组件。通过自定义工具,开发者可以扩展智能体的能力范围,使其能够处理特定领域的任务。本文将深入探讨如何在AgentScope中创建和使用自定义工具。## 工具系统架构AgentScope的工具系统采用模块化设计,支持同步/异步函数、流式/非流式返回,并提供完整的工具管理功能。### 核心组件```mermaid...
AgentScope扩展开发:自定义工具创建
【免费下载链接】agentscope 项目地址: https://gitcode.com/GitHub_Trending/ag/agentscope
概述
在AgentScope框架中,工具(Tool)是智能体执行具体任务的核心组件。通过自定义工具,开发者可以扩展智能体的能力范围,使其能够处理特定领域的任务。本文将深入探讨如何在AgentScope中创建和使用自定义工具。
工具系统架构
AgentScope的工具系统采用模块化设计,支持同步/异步函数、流式/非流式返回,并提供完整的工具管理功能。
核心组件
基础工具创建
同步工具函数
最简单的工具是同步执行的函数,返回ToolResponse对象:
from agentscope.tool import ToolResponse
from agentscope.message import TextBlock
def calculate_sum(a: int, b: int) -> ToolResponse:
"""计算两个整数的和
Args:
a (int): 第一个整数
b (int): 第二个整数
Returns:
ToolResponse: 包含计算结果的响应
"""
result = a + b
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"计算结果: {a} + {b} = {result}"
)
]
)
异步工具函数
对于需要I/O操作的工具,推荐使用异步函数:
import asyncio
from agentscope.tool import ToolResponse
from agentscope.message import TextBlock
async def fetch_data_from_api(url: str, timeout: int = 30) -> ToolResponse:
"""从API获取数据
Args:
url (str): API端点URL
timeout (int): 请求超时时间,默认30秒
Returns:
ToolResponse: 包含API响应数据的工具响应
"""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
if response.status == 200:
data = await response.json()
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"API响应: {data}"
)
]
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"请求失败,状态码: {response.status}"
)
]
)
except Exception as e:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"请求异常: {str(e)}"
)
]
)
流式工具实现
同步流式工具
对于需要逐步返回结果的工具,可以使用生成器实现流式返回:
from agentscope.tool import ToolResponse
from agentscope.message import TextBlock
from typing import Generator
def process_large_data(data: list, chunk_size: int = 100) -> Generator[ToolResponse, None, None]:
"""处理大数据集并流式返回结果
Args:
data (list): 要处理的数据列表
chunk_size (int): 每批处理的数据量
Returns:
Generator[ToolResponse]: 流式工具响应生成器
"""
total = len(data)
for i in range(0, total, chunk_size):
chunk = data[i:i + chunk_size]
processed_chunk = [item * 2 for item in chunk] # 示例处理逻辑
yield ToolResponse(
content=[
TextBlock(
type="text",
text=f"处理进度: {min(i + chunk_size, total)}/{total}\n"
f"当前批次结果: {processed_chunk}"
)
],
stream=True,
is_last=(i + chunk_size >= total)
)
异步流式工具
异步流式工具适用于需要异步处理并逐步返回的场景:
import asyncio
from agentscope.tool import ToolResponse
from agentscope.message import TextBlock
from typing import AsyncGenerator
async def async_data_processor(data: list, delay: float = 0.1) -> AsyncGenerator[ToolResponse, None]:
"""异步处理数据并流式返回
Args:
data (list): 要处理的数据
delay (float): 每个项目的处理延迟
Returns:
AsyncGenerator[ToolResponse]: 异步流式响应生成器
"""
for i, item in enumerate(data):
# 模拟异步处理
await asyncio.sleep(delay)
processed_item = f"处理结果_{i}: {item * 3}"
yield ToolResponse(
content=[
TextBlock(
type="text",
text=processed_item
)
],
stream=True,
is_last=(i == len(data) - 1)
)
工具注册与管理
基本工具注册
from agentscope.tool import Toolkit
# 创建工具包实例
toolkit = Toolkit()
# 注册基本工具
toolkit.register_tool_function(calculate_sum)
toolkit.register_tool_function(fetch_data_from_api)
# 注册流式工具
toolkit.register_tool_function(process_large_data)
toolkit.register_tool_function(async_data_processor)
工具分组管理
对于复杂的工具集合,可以使用工具分组进行组织:
# 创建数学工具组
toolkit.create_tool_group(
group_name="math_tools",
description="数学计算相关工具",
active=True,
notes="使用数学工具时请确保输入参数为有效数值"
)
# 创建数据处理工具组
toolkit.create_tool_group(
group_name="data_processing",
description="数据处理和分析工具",
active=False,
notes="大数据处理工具可能需要较长时间执行"
)
# 将工具注册到特定分组
toolkit.register_tool_function(
calculate_sum,
group_name="math_tools",
func_description="计算两个整数的加法结果"
)
toolkit.register_tool_function(
process_large_data,
group_name="data_processing",
func_description="批量处理大型数据集并流式返回结果"
)
预设参数配置
可以为工具预设一些参数,这些参数不会暴露给智能体:
# 预设API密钥和其他配置参数
toolkit.register_tool_function(
fetch_data_from_api,
preset_kwargs={
"timeout": 60,
"headers": {"Authorization": "Bearer your_api_key"}
},
func_description="从指定URL获取数据"
)
高级工具特性
工具后处理函数
后处理函数允许在工具执行完成后对结果进行额外处理:
from agentscope.message import ToolUseBlock
def postprocess_tool_result(
tool_call: ToolUseBlock,
tool_response: ToolResponse
) -> ToolResponse:
"""工具结果后处理函数
Args:
tool_call: 工具调用信息
tool_response: 原始工具响应
Returns:
处理后的工具响应
"""
# 添加执行时间戳信息
import time
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
# 在原有内容基础上添加元数据
new_content = tool_response.content + [
TextBlock(
type="text",
text=f"\n执行时间: {timestamp}\n工具名称: {tool_call['name']}"
)
]
return ToolResponse(
content=new_content,
metadata=tool_response.metadata or {},
stream=tool_response.stream,
is_last=tool_response.is_last,
is_interrupted=tool_response.is_interrupted
)
# 注册带后处理的工具
toolkit.register_tool_function(
calculate_sum,
postprocess_func=postprocess_tool_result
)
自定义JSON Schema
对于需要精确控制参数格式的工具,可以手动指定JSON Schema:
custom_schema = {
"type": "function",
"function": {
"name": "advanced_calculation",
"description": "执行高级数学计算",
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"description": "计算操作类型",
"enum": ["add", "subtract", "multiply", "divide"]
},
"values": {
"type": "array",
"description": "要计算的数值数组",
"items": {"type": "number"}
},
"precision": {
"type": "integer",
"description": "结果精度(小数位数)",
"minimum": 0,
"maximum": 10
}
},
"required": ["operation", "values"]
}
}
}
def advanced_calculation(operation: str, values: list, precision: int = 2) -> ToolResponse:
"""高级数学计算工具"""
# 实现计算逻辑
pass
toolkit.register_tool_function(
advanced_calculation,
json_schema=custom_schema
)
工具使用最佳实践
错误处理与重试机制
from typing import Optional
import tenacity
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
retry=tenacity.retry_if_exception_type((ConnectionError, TimeoutError))
)
async def robust_api_call(
url: str,
method: str = "GET",
payload: Optional[dict] = None,
timeout: int = 30
) -> ToolResponse:
"""带重试机制的API调用工具
Args:
url: API端点
method: HTTP方法
payload: 请求负载
timeout: 超时时间
Returns:
工具响应
"""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=url,
json=payload,
timeout=timeout
) as response:
response.raise_for_status()
data = await response.json()
return ToolResponse(
content=[TextBlock(type="text", text=f"成功: {data}")],
metadata={"status_code": response.status}
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"失败: {str(e)}")],
metadata={"error": str(e)}
)
性能监控工具
import time
from functools import wraps
def monitor_performance(func):
"""工具性能监控装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
# 添加性能元数据
if hasattr(result, 'metadata'):
result.metadata = result.metadata or {}
result.metadata.update({
"execution_time": execution_time,
"performance": "good" if execution_time < 1.0 else "slow"
})
else:
result.metadata = {
"execution_time": execution_time,
"performance": "good" if execution_time < 1.0 else "slow"
}
return result
except Exception as e:
execution_time = time.time() - start_time
return ToolResponse(
content=[TextBlock(type="text", text=f"工具执行失败: {str(e)}")],
metadata={
"execution_time": execution_time,
"error": str(e),
"performance": "error"
}
)
return wrapper
@monitor_performance
async def monitored_tool_function(param: str) -> ToolResponse:
"""带性能监控的工具函数"""
await asyncio.sleep(0.5) # 模拟工作负载
return ToolResponse(
content=[TextBlock(type="text", text=f"处理结果: {param.upper()}")]
)
工具测试与调试
单元测试示例
import pytest
from agentscope.tool import Toolkit
class TestCustomTools:
"""自定义工具测试类"""
def setup_method(self):
self.toolkit = Toolkit()
self.toolkit.register_tool_function(calculate_sum)
@pytest.mark.asyncio
async def test_calculate_sum(self):
"""测试加法工具"""
from agentscope.message import ToolUseBlock
tool_call = ToolUseBlock(
name="calculate_sum",
input={"a": 5, "b": 3}
)
async for response in self.toolkit.call_tool_function(tool_call):
assert "8" in response.content[0].text
assert not response.is_interrupted
@pytest.mark.asyncio
async def test_calculate_sum_negative(self):
"""测试负数加法"""
from agentscope.message import ToolUseBlock
tool_call = ToolUseBlock(
name="calculate_sum",
input={"a": -5, "b": 3}
)
async for response in self.toolkit.call_tool_function(tool_call):
assert "-2" in response.content[0].text
调试技巧
# 启用详细日志记录
import logging
logging.basicConfig(level=logging.DEBUG)
# 工具执行跟踪
def debug_tool_execution(func):
"""工具执行调试装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
print(f"调用工具: {func.__name__}")
print(f"参数: args={args}, kwargs={kwargs}")
try:
result = await func(*args, **kwargs)
print(f"工具执行成功: {result}")
return result
except Exception as e:
print(f"工具执行失败: {e}")
raise
return wrapper
总结
AgentScope的自定义工具系统提供了强大的扩展能力,支持从简单的同步函数到复杂的异步流式处理。通过合理的工具设计、分组管理和错误处理,可以构建出健壮且高效的智能体工具生态系统。
关键要点
| 特性 | 说明 | 适用场景 |
|---|---|---|
| 同步工具 | 简单直接,易于实现 | 计算密集型任务 |
| 异步工具 | 非阻塞执行,提高并发性 | I/O密集型任务 |
| 流式工具 | 逐步返回结果,实时反馈 | 大数据处理、长时任务 |
| 工具分组 | 逻辑组织,按需激活 | 复杂工具集合管理 |
| 后处理函数 | 结果加工,统一格式化 | 结果标准化、元数据添加 |
| 预设参数 | 隐藏敏感配置,简化调用 | API密钥、固定配置 |
通过掌握这些工具开发技巧,您可以为AgentScope智能体创建各种强大的自定义能力,满足不同应用场景的需求。
【免费下载链接】agentscope 项目地址: https://gitcode.com/GitHub_Trending/ag/agentscope
更多推荐
所有评论(0)