Trae Agent网络请求处理:API调用与数据获取技巧
在使用Trae Agent(基于大型语言模型的通用软件开发任务代理)时,你是否曾遇到过API调用失败、网络超时、数据获取不完整等问题?作为连接LLM(大型语言模型)与外部工具的关键纽带,网络请求处理直接影响Trae Agent的任务执行效率和稳定性。本文将深入剖析Trae Agent的网络请求架构,提供API调用的最佳实践、错误处理策略以及高级数据获取技巧,帮助开发者充分发挥Trae Agent的
Trae Agent网络请求处理:API调用与数据获取技巧
引言:解决Trae Agent网络请求的核心痛点
在使用Trae Agent(基于大型语言模型的通用软件开发任务代理)时,你是否曾遇到过API调用失败、网络超时、数据获取不完整等问题?作为连接LLM(大型语言模型)与外部工具的关键纽带,网络请求处理直接影响Trae Agent的任务执行效率和稳定性。本文将深入剖析Trae Agent的网络请求架构,提供API调用的最佳实践、错误处理策略以及高级数据获取技巧,帮助开发者充分发挥Trae Agent的强大能力。
读完本文,你将能够:
- 理解Trae Agent网络请求的核心组件与工作流程
- 掌握LLM API调用的配置方法与优化技巧
- 实现健壮的网络错误处理与重试机制
- 高效获取和处理外部数据
- 解决常见的网络请求问题
Trae Agent网络请求架构概览
Trae Agent的网络请求处理系统采用模块化设计,主要包含四大核心组件:LLM客户端、MCP(多工具协作协议)客户端、配置管理和工具集成。这些组件协同工作,确保Trae Agent能够无缝与各种外部服务和API进行通信。
核心组件架构
网络请求工作流程
Trae Agent的网络请求处理遵循以下工作流程:
- 配置解析:从配置文件或环境变量中加载API密钥、基础URL等网络相关配置
- 客户端初始化:根据目标服务类型(如OpenRouter、Ollama等)初始化相应的LLM客户端
- 请求构建:根据用户指令和工具需求,构建符合API规范的请求参数
- 请求发送:通过HTTP或WebSocket等协议发送网络请求
- 响应处理:解析API响应,提取关键信息
- 错误处理:检测并处理网络错误、API错误等异常情况
- 结果记录:将请求和响应信息记录到轨迹日志(可选)
LLM API调用配置与实现
Trae Agent支持多种LLM服务提供商,包括OpenRouter、Ollama、Anthropic等。每种提供商都有其特定的API要求和配置选项。正确配置LLM客户端是确保Trae Agent正常工作的基础。
基础配置方法
LLM客户端配置主要通过ModelConfig类实现,包含API密钥、基础URL、API版本等关键参数。以下是配置LLM客户端的基本步骤:
- 创建模型配置:指定模型提供商、API密钥、基础URL等信息
- 初始化LLM客户端:根据模型配置创建相应的LLM客户端实例
- 设置轨迹记录器:(可选)配置轨迹记录器以记录API交互
from trae_agent.utils.config import ModelConfig, ProviderConfig
from trae_agent.utils.llm_clients.openrouter_client import OpenRouterClient
# 创建模型配置
provider_config = ProviderConfig(
api_key="your_openrouter_api_key",
base_url="https://openrouter.ai/api/v1",
api_version=None
)
model_config = ModelConfig(
model_name="gpt-4",
model_provider=provider_config,
supports_tool_calling=True
)
# 初始化LLM客户端
llm_client = OpenRouterClient(model_config)
# 设置轨迹记录器(可选)
# llm_client.set_trajectory_recorder(trajectory_recorder)
主流LLM服务配置详解
OpenRouter配置
OpenRouter是一个聚合多种LLM模型的服务,支持主流模型如GPT-4、Claude、Llama等。Trae Agent通过OpenRouterClient类实现与OpenRouter的交互。
# 配置OpenRouter客户端
from trae_agent.utils.llm_clients.openrouter_client import OpenRouterClient
# 默认配置
openrouter_config = ModelConfig(
model_name="gpt-4",
model_provider=ProviderConfig(
api_key=os.getenv("OPENROUTER_API_KEY"),
# 当base_url未指定时,将自动使用默认值:https://openrouter.ai/api/v1
),
supports_tool_calling=True
)
client = OpenRouterClient(openrouter_config)
# 自定义基础URL配置
custom_openrouter_config = ModelConfig(
model_name="claude-3-opus",
model_provider=ProviderConfig(
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url="https://custom-openrouter-proxy.example.com/api/v1"
),
supports_tool_calling=True
)
custom_client = OpenRouterClient(custom_openrouter_config)
OpenRouter客户端还支持额外的HTTP头配置,如引用站点URL和站点名称,这些信息有助于提高API请求的优先级和可靠性:
# 在OpenRouterProvider类中
def get_extra_headers(self) -> dict[str, str]:
"""Get OpenRouter-specific headers."""
extra_headers: dict[str, str] = {}
openrouter_site_url = os.getenv("OPENROUTER_SITE_URL")
if openrouter_site_url:
extra_headers["HTTP-Referer"] = openrouter_site_url
openrouter_site_name = os.getenv("OPENROUTER_SITE_NAME")
if openrouter_site_name:
extra_headers["X-Title"] = openrouter_site_name
return extra_headers
Ollama配置
Ollama是一个本地运行的LLM服务,适合隐私敏感或需要低延迟的场景。Trae Agent通过OllamaClient类与Ollama服务交互:
from trae_agent.utils.llm_clients.ollama_client import OllamaClient
# 默认配置(连接本地Ollama服务)
ollama_config = ModelConfig(
model_name="llama3",
model_provider=ProviderConfig(
api_key=None, # Ollama默认不需要API密钥
base_url="http://localhost:11434/v1" # Ollama默认API地址
),
supports_tool_calling=True
)
ollama_client = OllamaClient(ollama_config)
# 远程Ollama服务配置
remote_ollama_config = ModelConfig(
model_name="mistral",
model_provider=ProviderConfig(
api_key=None,
base_url="http://ollama-server.example.com:11434/v1"
),
supports_tool_calling=True
)
remote_ollama_client = OllamaClient(remote_ollama_config)
配置最佳实践
- 环境变量管理API密钥:避免硬编码API密钥,使用环境变量或配置文件管理敏感信息。
# 推荐做法
import os
from trae_agent.utils.config import ModelConfig, ProviderConfig
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError("OPENROUTER_API_KEY环境变量未设置")
model_config = ModelConfig(
model_name="gpt-4",
model_provider=ProviderConfig(
api_key=api_key,
base_url="https://openrouter.ai/api/v1"
),
supports_tool_calling=True
)
- 使用合规访问服务:对于国外LLM服务,考虑使用合规的国内镜像或服务以提高访问速度和稳定性。
# 使用合规访问服务示例
model_config = ModelConfig(
model_name="gpt-4",
model_provider=ProviderConfig(
api_key=api_key,
base_url="https://openai-proxy.example.com/v1" # 合规代理地址
),
supports_tool_calling=True
)
- 按环境区分配置:为开发、测试和生产环境创建不同的配置,避免环境间的干扰。
# 环境特定配置示例
def get_model_config(env: str) -> ModelConfig:
if env == "production":
return ModelConfig(
model_name="gpt-4",
model_provider=ProviderConfig(
api_key=os.getenv("PROD_OPENROUTER_API_KEY"),
base_url="https://openrouter.ai/api/v1"
),
supports_tool_calling=True
)
elif env == "development":
return ModelConfig(
model_name="llama3",
model_provider=ProviderConfig(
api_key=None,
base_url="http://localhost:11434/v1"
),
supports_tool_calling=True
)
else: # testing
return ModelConfig(
model_name="mock-model",
model_provider=ProviderConfig(
api_key=None,
base_url="https://mock-llm.example.com/v1"
),
supports_tool_calling=True
)
- 动态配置切换:根据任务需求动态选择不同的LLM模型和配置。
def select_model_config(task_type: str) -> ModelConfig:
"""根据任务类型选择合适的模型配置"""
if task_type == "code_generation":
# 代码生成任务使用更强大的模型
return ModelConfig(
model_name="gpt-4",
model_provider=ProviderConfig(
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url="https://openrouter.ai/api/v1"
),
supports_tool_calling=True
)
elif task_type == "simple_qa":
# 简单问答任务使用更高效的模型
return ModelConfig(
model_name="llama3",
model_provider=ProviderConfig(
api_key=None,
base_url="http://localhost:11434/v1"
),
supports_tool_calling=True
)
else:
# 默认配置
return ModelConfig(
model_name="claude-3-sonnet",
model_provider=ProviderConfig(
api_key=os.getenv("ANTHROPIC_API_KEY"),
base_url="https://api.anthropic.com"
),
supports_tool_calling=True
)
API调用实现与优化
Trae Agent的LLM API调用实现基于OpenAI兼容的客户端架构,提供了统一的接口来与不同的LLM服务进行交互。这种设计不仅简化了代码结构,还使得添加新的LLM提供商变得更加容易。
API调用核心实现
OpenAI兼容的客户端基类OpenAICompatibleClient提供了API调用的核心功能:
# 核心API调用实现(简化版)
class OpenAICompatibleClient(BaseLLMClient):
def __init__(self, model_config: ModelConfig, provider: ProviderConfig):
super().__init__(model_config)
self.provider = provider
self.client = self.provider.create_client(
api_key=self.api_key,
base_url=self.base_url,
api_version=self.api_version
)
self.chat_history: list[LLMMessage] = []
def set_chat_history(self, messages: list[LLMMessage]) -> None:
"""设置聊天历史"""
self.chat_history = messages.copy()
def chat(
self,
messages: list[LLMMessage],
model_config: ModelConfig,
tools: list[Tool] | None = None,
reuse_history: bool = True,
) -> LLMResponse:
"""发送聊天消息到LLM API"""
# 构建完整消息列表
full_messages = []
if reuse_history:
full_messages.extend(self.chat_history)
full_messages.extend(messages)
# 准备工具调用参数(如果需要)
tool_params = None
if tools and self.supports_tool_calling(model_config):
tool_params = {
"tools": [tool.to_dict() for tool in tools],
"tool_choice": "auto"
}
# 发送API请求
try:
response = self.client.chat.completions.create(
model=model_config.model_name,
messages=full_messages,
**(tool_params or {}),
stream=False # 非流式响应
)
# 记录聊天历史
self.chat_history = full_messages
self.chat_history.append({
"role": "assistant",
"content": response.choices[0].message.content
})
# 记录轨迹(如果启用)
if self.trajectory_recorder:
self.trajectory_recorder.record_llm_interaction(
request=full_messages,
response=response
)
return LLMResponse.from_api_response(response)
except Exception as e:
# 错误处理
self.handle_api_error(e)
raise
API调用优化技巧
- 请求批处理:将多个独立请求合并为批处理请求,减少API调用次数,降低延迟和成本。
def batch_chat(
self,
batches: list[list[LLMMessage]],
model_config: ModelConfig,
tools: list[Tool] | None = None
) -> list[LLMResponse]:
"""批处理多个聊天请求"""
results = []
for batch in batches:
results.append(self.chat(batch, model_config, tools, reuse_history=False))
return results
- 流式响应处理:对于大型响应或需要实时反馈的场景,使用流式响应模式。
def stream_chat(
self,
messages: list[LLMMessage],
model_config: ModelConfig,
tools: list[Tool] | None = None,
reuse_history: bool = True,
) -> Generator[LLMResponseChunk, None, None]:
"""流式聊天响应"""
# 构建完整消息列表(与非流式类似)
full_messages = []
if reuse_history:
full_messages.extend(self.chat_history)
full_messages.extend(messages)
# 准备工具调用参数(如果需要)
tool_params = None
if tools and self.supports_tool_calling(model_config):
tool_params = {
"tools": [tool.to_dict() for tool in tools],
"tool_choice": "auto"
}
# 发送流式API请求
response = self.client.chat.completions.create(
model=model_config.model_name,
messages=full_messages,
**(tool_params or {}),
stream=True # 流式响应
)
# 处理流式响应
for chunk in response:
yield LLMResponseChunk.from_api_chunk(chunk)
- 上下文窗口管理:动态管理对话历史,确保不超过模型的上下文窗口限制。
def manage_context_window(
self,
new_messages: list[LLMMessage],
max_tokens: int = 8192,
token_estimator: Callable[[list[LLMMessage]], int] = estimate_tokens
) -> list[LLMMessage]:
"""管理上下文窗口,确保不超过最大token限制"""
# 估算当前历史消息的token数
current_tokens = token_estimator(self.chat_history)
new_tokens = token_estimator(new_messages)
# 如果加上新消息会超过限制,需要截断历史
if current_tokens + new_tokens > max_tokens:
# 尝试只保留最近的消息
truncated_history = []
remaining_tokens = max_tokens - new_tokens
# 从后往前添加历史消息,直到接近token限制
for msg in reversed(self.chat_history):
msg_tokens = token_estimator([msg])
if remaining_tokens - msg_tokens < 0:
break
truncated_history.append(msg)
remaining_tokens -= msg_tokens
# 反转回正常顺序
truncated_history.reverse()
# 更新历史
self.chat_history = truncated_history
return self.chat_history + new_messages
- 模型缓存:缓存频繁使用的相同或相似请求的响应,减少重复计算和API调用。
from functools import lru_cache
import hashlib
import json
def get_request_hash(messages: list[LLMMessage], model_name: str) -> str:
"""生成请求的唯一哈希值,用于缓存"""
request_key = json.dumps({
"messages": messages,
"model": model_name
}, sort_keys=True)
return hashlib.sha256(request_key.encode()).hexdigest()
class CachedLLMClient(OpenAICompatibleClient):
"""带缓存的LLM客户端"""
def __init__(self, model_config: ModelConfig):
super().__init__(model_config)
self.cache = {} # 内存缓存,生产环境可使用Redis等分布式缓存
@lru_cache(maxsize=1000) # 限制缓存大小
def cached_chat(
self,
request_hash: str,
messages: list[LLMMessage],
model_config: ModelConfig,
tools: list[Tool] | None = None,
reuse_history: bool = True,
) -> LLMResponse:
"""带缓存的聊天方法"""
return super().chat(messages, model_config, tools, reuse_history)
def chat(
self,
messages: list[LLMMessage],
model_config: ModelConfig,
tools: list[Tool] | None = None,
reuse_history: bool = True,
use_cache: bool = True
) -> LLMResponse:
"""重写chat方法,添加缓存支持"""
if use_cache and not tools: # 工具调用不缓存
request_hash = get_request_hash(messages, model_config.model_name)
if request_hash in self.cache:
return self.cache[request_hash]
# 调用缓存版本的chat方法
response = self.cached_chat(
request_hash, messages, model_config, tools, reuse_history
)
self.cache[request_hash] = response
return response
else:
return super().chat(messages, model_config, tools, reuse_history)
网络错误处理与重试机制
网络请求过程中难免会遇到各种错误,如连接超时、API限流、服务器错误等。Trae Agent提供了健壮的错误处理机制,确保在遇到这些问题时能够优雅地恢复或给出有意义的错误信息。
常见网络错误类型及处理策略
| 错误类型 | 描述 | 处理策略 | 重试建议 |
|---|---|---|---|
| 连接超时 | 无法与服务器建立连接 | 检查网络连接、服务器状态 | 是,使用指数退避策略 |
| 连接拒绝 | 服务器拒绝连接 | 检查URL和端口是否正确 | 是,短暂延迟后重试 |
| API密钥无效 | 认证失败 | 检查API密钥是否正确 | 否,需要人工干预 |
| 权限不足 | 没有足够权限执行请求 | 检查API密钥权限 | 否,需要人工干预 |
| 请求频率超限 | API调用频率超过限制 | 减少请求频率 | 是,等待限流周期后重试 |
| 请求大小超限 | 请求超过API大小限制 | 减小请求大小 | 否,需要调整请求 |
| 服务器错误 | 服务器内部错误 | 等待服务器恢复 | 是,延迟后重试 |
| 响应格式错误 | API返回非预期格式 | 检查API文档和版本 | 否,需要代码修复 |
错误处理实现
Trae Agent的错误处理机制主要通过handle_api_error方法实现,该方法位于OpenAICompatibleClient类中:
def handle_api_error(self, error: Exception) -> None:
"""处理API错误"""
error_msg = f"API调用失败: {str(error)}"
# 区分不同类型的错误
if isinstance(error, openai.AuthenticationError):
# 认证错误
raise AuthenticationError(
f"API认证失败: {str(error)}. "
"请检查您的API密钥是否正确。"
) from error
elif isinstance(error, openai.PermissionDeniedError):
# 权限错误
raise PermissionError(
f"API权限不足: {str(error)}. "
"请检查您的API密钥是否具有足够的权限。"
) from error
elif isinstance(error, openai.RateLimitError):
# 限流错误
retry_after = getattr(error, 'headers', {}).get('Retry-After', 60)
raise RateLimitError(
f"API请求频率超限: {str(error)}. "
f"请在{retry_after}秒后重试。"
) from error
elif isinstance(error, openai.APIConnectionError):
# 连接错误
raise ConnectionError(
f"API连接失败: {str(error)}. "
"请检查您的网络连接和API URL是否正确。"
) from error
elif isinstance(error, openai.Timeout):
# 超时错误
raise TimeoutError(
f"API请求超时: {str(error)}. "
"请检查网络连接或尝试增加超时时间。"
) from error
elif isinstance(error, openai.BadRequestError):
# 请求错误
raise RequestError(
f"API请求格式错误: {str(error)}. "
"请检查请求参数是否符合API要求。"
) from error
elif isinstance(error, openai.NotFoundError):
# 资源未找到
raise ResourceNotFoundError(
f"API资源未找到: {str(error)}. "
"请检查请求的资源路径是否正确。"
) from error
else:
# 其他未知错误
raise APIError(f"API调用发生未知错误: {str(error)}") from error
重试机制实现
Trae Agent使用重试装饰器实现网络请求的自动重试功能,结合指数退避策略,在网络不稳定的情况下提高请求成功率:
import time
from functools import wraps
from typing import Callable, TypeVar, Any
# 定义泛型类型变量
F = TypeVar('F', bound=Callable[..., Any])
def retry_api_call(
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
retry_on_exceptions: tuple[type[Exception], ...] = (
ConnectionError, TimeoutError, RateLimitError
)
) -> Callable[[F], F]:
"""
API调用重试装饰器
Args:
max_retries: 最大重试次数
initial_delay: 初始延迟(秒)
backoff_factor: 退避因子,每次重试延迟 = initial_delay * (backoff_factor **重试次数)
retry_on_exceptions: 需要重试的异常类型
"""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
retry_count = 0
while retry_count <= max_retries:
try:
return func(*args, **kwargs)
except retry_on_exceptions as e:
# 检查是否还有重试次数
if retry_count == max_retries:
raise # 达到最大重试次数,抛出异常
# 计算重试延迟
delay = initial_delay * (backoff_factor **retry_count)
retry_count += 1
# 记录重试信息
print(f"API调用失败: {str(e)}. "
f"将在{delay:.2f}秒后进行第{retry_count}次重试(共{max_retries}次)。")
# 等待重试
time.sleep(delay)
raise # 理论上不会到达这里
return wrapper # type: ignore
return decorator
# 使用示例
class OpenAICompatibleClient(BaseLLMClient):
# ... 其他代码 ...
@retry_api_call(
max_retries=3,
initial_delay=1.0,
backoff_factor=2.0
)
def chat(
self,
messages: list[LLMMessage],
model_config: ModelConfig,
tools: list[Tool] | None = None,
reuse_history: bool = True,
) -> LLMResponse:
# ... 聊天实现 ...
错误监控与告警
为了及时发现和解决网络请求问题,建议实现错误监控与告警机制:
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("trae_agent.network")
class ErrorMonitor:
"""错误监控器"""
def __init__(self, alert_threshold: int = 5, alert_window: int = 60):
"""
Args:
alert_threshold: 告警阈值,在alert_window时间内超过此数量的错误将触发告警
alert_window: 告警窗口大小(秒)
"""
self.alert_threshold = alert_threshold
self.alert_window = alert_window
self.error_history: list[datetime] = []
self.alert_sent = False
def record_error(self, error: Exception) -> None:
"""记录错误"""
now = datetime.now()
self.error_history.append(now)
# 移除窗口外的错误记录
self.error_history = [
t for t in self.error_history
if (now - t).total_seconds() <= self.alert_window
]
# 检查是否需要触发告警
if len(self.error_history) >= self.alert_threshold and not self.alert_sent:
self.trigger_alert()
self.alert_sent = True # 防止重复告警
# 如果错误数量低于阈值,重置告警状态
if len(self.error_history) < self.alert_threshold:
self.alert_sent = False
# 记录错误详情
logger.error(f"网络请求错误: {str(error)}", exc_info=True)
def trigger_alert(self) -> None:
"""触发告警"""
error_count = len(self.error_history)
logger.critical(
f"网络请求错误率过高!在过去{self.alert_window}秒内发生了{error_count}次错误。"
)
# 这里可以添加发送邮件、短信或其他通知的代码
# send_alert_email(error_count, self.alert_window)
MCP客户端与工具网络请求
除了LLM API调用外,Trae Agent还通过MCP(多工具协作协议)客户端与各种外部工具进行通信。MCP客户端负责管理与工具服务的连接,协调工具调用,并处理工具返回的数据。
MCP客户端实现
MCP客户端的核心功能是连接到MCP服务器,发现可用工具,并调用这些工具完成特定任务:
from contextlib import AsyncExitStack
from enum import Enum
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from ..tools.mcp_tool import MCPTool
from .config import MCPServerConfig
class MCPServerStatus(Enum):
DISCONNECTED = "disconnected" # 服务器未连接或出现错误
CONNECTING = "connecting" # 正在连接服务器
CONNECTED = "connected" # 服务器已连接并就绪
class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: ClientSession | None = None
self.exit_stack = AsyncExitStack()
self.mcp_servers_status: dict[str, MCPServerStatus] = {}
def get_mcp_server_status(self, mcp_server_name: str) -> MCPServerStatus:
"""获取MCP服务器状态"""
return self.mcp_servers_status.get(mcp_server_name, MCPServerStatus.DISCONNECTED)
def update_mcp_server_status(self, mcp_server_name, status: MCPServerStatus):
"""更新MCP服务器状态"""
self.mcp_servers_status[mcp_server_name] = status
async def connect_and_discover(
self,
mcp_server_name: str,
mcp_server_config: MCPServerConfig,
mcp_tools_container: list,
model_provider,
):
"""连接到MCP服务器并发现可用工具"""
transport = None
if mcp_server_config.http_url:
# HTTP传输(尚未实现)
raise NotImplementedError("HTTP传输尚未实现")
elif mcp_server_config.url:
# WebSocket传输(尚未实现)
raise NotImplementedError("WebSocket传输尚未实现")
elif mcp_server_config.command:
# 标准输入输出传输
params = StdioServerParameters(
command=mcp_server_config.command,
args=mcp_server_config.args,
env=mcp_server_config.env,
cwd=mcp_server_config.cwd,
)
transport = await self.exit_stack.enter_async_context(stdio_client(params))
else:
# 无效配置
raise ValueError(
f"{mcp_server_name}的MCP服务器配置无效。"
"请提供命令或URL。"
)
try:
# 连接服务器
await self.connect_to_server(mcp_server_name, transport)
# 列出可用工具
mcp_tools = await self.list_tools()
# 创建MCP工具对象
for tool in mcp_tools.tools:
mcp_tool = MCPTool(self, tool, model_provider)
mcp_tools_container.append(mcp_tool)
except Exception as e:
# 连接或发现过程中出错
logger.error(f"MCP服务器连接或工具发现失败: {str(e)}")
raise
async def connect_to_server(self, mcp_server_name, transport):
"""连接到MCP服务器"""
if self.get_mcp_server_status(mcp_server_name) != MCPServerStatus.CONNECTED:
self.update_mcp_server_status(mcp_server_name, MCPServerStatus.CONNECTING)
try:
stdio, write = transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write)
)
await self.session.initialize()
self.update_mcp_server_status(mcp_server_name, MCPServerStatus.CONNECTED)
logger.info(f"成功连接到MCP服务器: {mcp_server_name}")
except Exception as e:
self.update_mcp_server_status(mcp_server_name, MCPServerStatus.DISCONNECTED)
logger.error(f"MCP服务器连接失败: {str(e)}")
raise
async def call_tool(self, name, args):
"""调用MCP工具"""
if not self.session:
raise ConnectionError("MCP会话未初始化,请先连接到服务器")
try:
output = await self.session.call_tool(name, args)
return output
except Exception as e:
logger.error(f"MCP工具调用失败: {name}, 参数: {args}, 错误: {str(e)}")
raise
async def list_tools(self):
"""列出可用工具"""
if not self.session:
raise ConnectionError("MCP会话未初始化,请先连接到服务器")
try:
tools = await self.session.list_tools()
return tools
except Exception as e:
logger.error(f"获取MCP工具列表失败: {str(e)}")
raise
async def cleanup(self, mcp_server_name):
"""清理资源"""
await self.exit_stack.aclose()
self.update_mcp_server_status(mcp_server_name, MCPServerStatus.DISCONNECTED)
logger.info(f"MCP服务器连接已关闭: {mcp_server_name}")
工具网络请求示例
以下是一个使用HTTP请求获取外部数据的工具实现示例:
import requests
from trae_agent.tools.base import Tool, ToolParameter, ToolResult
class HttpGetTool(Tool):
"""HTTP GET请求工具,用于获取网页内容或API数据"""
def __init__(self):
super().__init__(
name="http_get",
description="使用HTTP GET方法获取指定URL的内容",
parameters=[
ToolParameter(
name="url",
type="string",
description="要获取的URL地址",
required=True
),
ToolParameter(
name="timeout",
type="integer",
description="请求超时时间(秒)",
required=False,
default=
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)