Trae Agent网络请求处理:API调用与数据获取技巧

【免费下载链接】trae-agent Trae 代理是一个基于大型语言模型(LLM)的通用软件开发任务代理。它提供了一个强大的命令行界面(CLI),能够理解自然语言指令,并使用各种工具和LLM提供者执行复杂的软件开发工作流程。 【免费下载链接】trae-agent 项目地址: https://gitcode.com/gh_mirrors/tr/trae-agent

引言:解决Trae Agent网络请求的核心痛点

在使用Trae Agent(基于大型语言模型的通用软件开发任务代理)时,你是否曾遇到过API调用失败、网络超时、数据获取不完整等问题?作为连接LLM(大型语言模型)与外部工具的关键纽带,网络请求处理直接影响Trae Agent的任务执行效率和稳定性。本文将深入剖析Trae Agent的网络请求架构,提供API调用的最佳实践、错误处理策略以及高级数据获取技巧,帮助开发者充分发挥Trae Agent的强大能力。

读完本文,你将能够:

  • 理解Trae Agent网络请求的核心组件与工作流程
  • 掌握LLM API调用的配置方法与优化技巧
  • 实现健壮的网络错误处理与重试机制
  • 高效获取和处理外部数据
  • 解决常见的网络请求问题

Trae Agent网络请求架构概览

Trae Agent的网络请求处理系统采用模块化设计,主要包含四大核心组件:LLM客户端、MCP(多工具协作协议)客户端、配置管理和工具集成。这些组件协同工作,确保Trae Agent能够无缝与各种外部服务和API进行通信。

核心组件架构

mermaid

网络请求工作流程

Trae Agent的网络请求处理遵循以下工作流程:

  1. 配置解析:从配置文件或环境变量中加载API密钥、基础URL等网络相关配置
  2. 客户端初始化:根据目标服务类型(如OpenRouter、Ollama等)初始化相应的LLM客户端
  3. 请求构建:根据用户指令和工具需求,构建符合API规范的请求参数
  4. 请求发送:通过HTTP或WebSocket等协议发送网络请求
  5. 响应处理:解析API响应,提取关键信息
  6. 错误处理:检测并处理网络错误、API错误等异常情况
  7. 结果记录:将请求和响应信息记录到轨迹日志(可选)

mermaid

LLM API调用配置与实现

Trae Agent支持多种LLM服务提供商,包括OpenRouter、Ollama、Anthropic等。每种提供商都有其特定的API要求和配置选项。正确配置LLM客户端是确保Trae Agent正常工作的基础。

基础配置方法

LLM客户端配置主要通过ModelConfig类实现,包含API密钥、基础URL、API版本等关键参数。以下是配置LLM客户端的基本步骤:

  1. 创建模型配置:指定模型提供商、API密钥、基础URL等信息
  2. 初始化LLM客户端:根据模型配置创建相应的LLM客户端实例
  3. 设置轨迹记录器:(可选)配置轨迹记录器以记录API交互
from trae_agent.utils.config import ModelConfig, ProviderConfig
from trae_agent.utils.llm_clients.openrouter_client import OpenRouterClient

# 创建模型配置
provider_config = ProviderConfig(
    api_key="your_openrouter_api_key",
    base_url="https://openrouter.ai/api/v1",
    api_version=None
)

model_config = ModelConfig(
    model_name="gpt-4",
    model_provider=provider_config,
    supports_tool_calling=True
)

# 初始化LLM客户端
llm_client = OpenRouterClient(model_config)

# 设置轨迹记录器(可选)
# llm_client.set_trajectory_recorder(trajectory_recorder)

主流LLM服务配置详解

OpenRouter配置

OpenRouter是一个聚合多种LLM模型的服务,支持主流模型如GPT-4、Claude、Llama等。Trae Agent通过OpenRouterClient类实现与OpenRouter的交互。

# 配置OpenRouter客户端
from trae_agent.utils.llm_clients.openrouter_client import OpenRouterClient

# 默认配置
openrouter_config = ModelConfig(
    model_name="gpt-4",
    model_provider=ProviderConfig(
        api_key=os.getenv("OPENROUTER_API_KEY"),
        # 当base_url未指定时,将自动使用默认值:https://openrouter.ai/api/v1
    ),
    supports_tool_calling=True
)

client = OpenRouterClient(openrouter_config)

# 自定义基础URL配置
custom_openrouter_config = ModelConfig(
    model_name="claude-3-opus",
    model_provider=ProviderConfig(
        api_key=os.getenv("OPENROUTER_API_KEY"),
        base_url="https://custom-openrouter-proxy.example.com/api/v1"
    ),
    supports_tool_calling=True
)

custom_client = OpenRouterClient(custom_openrouter_config)

OpenRouter客户端还支持额外的HTTP头配置,如引用站点URL和站点名称,这些信息有助于提高API请求的优先级和可靠性:

# 在OpenRouterProvider类中
def get_extra_headers(self) -> dict[str, str]:
    """Get OpenRouter-specific headers."""
    extra_headers: dict[str, str] = {}

    openrouter_site_url = os.getenv("OPENROUTER_SITE_URL")
    if openrouter_site_url:
        extra_headers["HTTP-Referer"] = openrouter_site_url

    openrouter_site_name = os.getenv("OPENROUTER_SITE_NAME")
    if openrouter_site_name:
        extra_headers["X-Title"] = openrouter_site_name

    return extra_headers
Ollama配置

Ollama是一个本地运行的LLM服务,适合隐私敏感或需要低延迟的场景。Trae Agent通过OllamaClient类与Ollama服务交互:

from trae_agent.utils.llm_clients.ollama_client import OllamaClient

# 默认配置(连接本地Ollama服务)
ollama_config = ModelConfig(
    model_name="llama3",
    model_provider=ProviderConfig(
        api_key=None,  # Ollama默认不需要API密钥
        base_url="http://localhost:11434/v1"  # Ollama默认API地址
    ),
    supports_tool_calling=True
)

ollama_client = OllamaClient(ollama_config)

# 远程Ollama服务配置
remote_ollama_config = ModelConfig(
    model_name="mistral",
    model_provider=ProviderConfig(
        api_key=None,
        base_url="http://ollama-server.example.com:11434/v1"
    ),
    supports_tool_calling=True
)

remote_ollama_client = OllamaClient(remote_ollama_config)

配置最佳实践

  1. 环境变量管理API密钥:避免硬编码API密钥,使用环境变量或配置文件管理敏感信息。
# 推荐做法
import os
from trae_agent.utils.config import ModelConfig, ProviderConfig

api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
    raise ValueError("OPENROUTER_API_KEY环境变量未设置")

model_config = ModelConfig(
    model_name="gpt-4",
    model_provider=ProviderConfig(
        api_key=api_key,
        base_url="https://openrouter.ai/api/v1"
    ),
    supports_tool_calling=True
)
  1. 使用合规访问服务:对于国外LLM服务,考虑使用合规的国内镜像或服务以提高访问速度和稳定性。
# 使用合规访问服务示例
model_config = ModelConfig(
    model_name="gpt-4",
    model_provider=ProviderConfig(
        api_key=api_key,
        base_url="https://openai-proxy.example.com/v1"  # 合规代理地址
    ),
    supports_tool_calling=True
)
  1. 按环境区分配置:为开发、测试和生产环境创建不同的配置,避免环境间的干扰。
# 环境特定配置示例
def get_model_config(env: str) -> ModelConfig:
    if env == "production":
        return ModelConfig(
            model_name="gpt-4",
            model_provider=ProviderConfig(
                api_key=os.getenv("PROD_OPENROUTER_API_KEY"),
                base_url="https://openrouter.ai/api/v1"
            ),
            supports_tool_calling=True
        )
    elif env == "development":
        return ModelConfig(
            model_name="llama3",
            model_provider=ProviderConfig(
                api_key=None,
                base_url="http://localhost:11434/v1"
            ),
            supports_tool_calling=True
        )
    else:  # testing
        return ModelConfig(
            model_name="mock-model",
            model_provider=ProviderConfig(
                api_key=None,
                base_url="https://mock-llm.example.com/v1"
            ),
            supports_tool_calling=True
        )
  1. 动态配置切换:根据任务需求动态选择不同的LLM模型和配置。
def select_model_config(task_type: str) -> ModelConfig:
    """根据任务类型选择合适的模型配置"""
    if task_type == "code_generation":
        # 代码生成任务使用更强大的模型
        return ModelConfig(
            model_name="gpt-4",
            model_provider=ProviderConfig(
                api_key=os.getenv("OPENROUTER_API_KEY"),
                base_url="https://openrouter.ai/api/v1"
            ),
            supports_tool_calling=True
        )
    elif task_type == "simple_qa":
        # 简单问答任务使用更高效的模型
        return ModelConfig(
            model_name="llama3",
            model_provider=ProviderConfig(
                api_key=None,
                base_url="http://localhost:11434/v1"
            ),
            supports_tool_calling=True
        )
    else:
        # 默认配置
        return ModelConfig(
            model_name="claude-3-sonnet",
            model_provider=ProviderConfig(
                api_key=os.getenv("ANTHROPIC_API_KEY"),
                base_url="https://api.anthropic.com"
            ),
            supports_tool_calling=True
        )

API调用实现与优化

Trae Agent的LLM API调用实现基于OpenAI兼容的客户端架构,提供了统一的接口来与不同的LLM服务进行交互。这种设计不仅简化了代码结构,还使得添加新的LLM提供商变得更加容易。

API调用核心实现

OpenAI兼容的客户端基类OpenAICompatibleClient提供了API调用的核心功能:

# 核心API调用实现(简化版)
class OpenAICompatibleClient(BaseLLMClient):
    def __init__(self, model_config: ModelConfig, provider: ProviderConfig):
        super().__init__(model_config)
        self.provider = provider
        self.client = self.provider.create_client(
            api_key=self.api_key,
            base_url=self.base_url,
            api_version=self.api_version
        )
        self.chat_history: list[LLMMessage] = []
        
    def set_chat_history(self, messages: list[LLMMessage]) -> None:
        """设置聊天历史"""
        self.chat_history = messages.copy()
        
    def chat(
        self,
        messages: list[LLMMessage],
        model_config: ModelConfig,
        tools: list[Tool] | None = None,
        reuse_history: bool = True,
    ) -> LLMResponse:
        """发送聊天消息到LLM API"""
        # 构建完整消息列表
        full_messages = []
        if reuse_history:
            full_messages.extend(self.chat_history)
        full_messages.extend(messages)
        
        # 准备工具调用参数(如果需要)
        tool_params = None
        if tools and self.supports_tool_calling(model_config):
            tool_params = {
                "tools": [tool.to_dict() for tool in tools],
                "tool_choice": "auto"
            }
            
        # 发送API请求
        try:
            response = self.client.chat.completions.create(
                model=model_config.model_name,
                messages=full_messages,
                **(tool_params or {}),
                stream=False  # 非流式响应
            )
            
            # 记录聊天历史
            self.chat_history = full_messages
            self.chat_history.append({
                "role": "assistant",
                "content": response.choices[0].message.content
            })
            
            # 记录轨迹(如果启用)
            if self.trajectory_recorder:
                self.trajectory_recorder.record_llm_interaction(
                    request=full_messages,
                    response=response
                )
                
            return LLMResponse.from_api_response(response)
            
        except Exception as e:
            # 错误处理
            self.handle_api_error(e)
            raise

API调用优化技巧

  1. 请求批处理:将多个独立请求合并为批处理请求,减少API调用次数,降低延迟和成本。
def batch_chat(
    self,
    batches: list[list[LLMMessage]],
    model_config: ModelConfig,
    tools: list[Tool] | None = None
) -> list[LLMResponse]:
    """批处理多个聊天请求"""
    results = []
    for batch in batches:
        results.append(self.chat(batch, model_config, tools, reuse_history=False))
    return results
  1. 流式响应处理:对于大型响应或需要实时反馈的场景,使用流式响应模式。
def stream_chat(
    self,
    messages: list[LLMMessage],
    model_config: ModelConfig,
    tools: list[Tool] | None = None,
    reuse_history: bool = True,
) -> Generator[LLMResponseChunk, None, None]:
    """流式聊天响应"""
    # 构建完整消息列表(与非流式类似)
    full_messages = []
    if reuse_history:
        full_messages.extend(self.chat_history)
    full_messages.extend(messages)
    
    # 准备工具调用参数(如果需要)
    tool_params = None
    if tools and self.supports_tool_calling(model_config):
        tool_params = {
            "tools": [tool.to_dict() for tool in tools],
            "tool_choice": "auto"
        }
        
    # 发送流式API请求
    response = self.client.chat.completions.create(
        model=model_config.model_name,
        messages=full_messages,
        **(tool_params or {}),
        stream=True  # 流式响应
    )
    
    # 处理流式响应
    for chunk in response:
        yield LLMResponseChunk.from_api_chunk(chunk)
  1. 上下文窗口管理:动态管理对话历史,确保不超过模型的上下文窗口限制。
def manage_context_window(
    self,
    new_messages: list[LLMMessage],
    max_tokens: int = 8192,
    token_estimator: Callable[[list[LLMMessage]], int] = estimate_tokens
) -> list[LLMMessage]:
    """管理上下文窗口,确保不超过最大token限制"""
    # 估算当前历史消息的token数
    current_tokens = token_estimator(self.chat_history)
    new_tokens = token_estimator(new_messages)
    
    # 如果加上新消息会超过限制,需要截断历史
    if current_tokens + new_tokens > max_tokens:
        # 尝试只保留最近的消息
        truncated_history = []
        remaining_tokens = max_tokens - new_tokens
        
        # 从后往前添加历史消息,直到接近token限制
        for msg in reversed(self.chat_history):
            msg_tokens = token_estimator([msg])
            if remaining_tokens - msg_tokens < 0:
                break
            truncated_history.append(msg)
            remaining_tokens -= msg_tokens
            
        # 反转回正常顺序
        truncated_history.reverse()
        
        # 更新历史
        self.chat_history = truncated_history
        
    return self.chat_history + new_messages
  1. 模型缓存:缓存频繁使用的相同或相似请求的响应,减少重复计算和API调用。
from functools import lru_cache
import hashlib
import json

def get_request_hash(messages: list[LLMMessage], model_name: str) -> str:
    """生成请求的唯一哈希值,用于缓存"""
    request_key = json.dumps({
        "messages": messages,
        "model": model_name
    }, sort_keys=True)
    return hashlib.sha256(request_key.encode()).hexdigest()

class CachedLLMClient(OpenAICompatibleClient):
    """带缓存的LLM客户端"""
    
    def __init__(self, model_config: ModelConfig):
        super().__init__(model_config)
        self.cache = {}  # 内存缓存,生产环境可使用Redis等分布式缓存
        
    @lru_cache(maxsize=1000)  # 限制缓存大小
    def cached_chat(
        self,
        request_hash: str,
        messages: list[LLMMessage],
        model_config: ModelConfig,
        tools: list[Tool] | None = None,
        reuse_history: bool = True,
    ) -> LLMResponse:
        """带缓存的聊天方法"""
        return super().chat(messages, model_config, tools, reuse_history)
        
    def chat(
        self,
        messages: list[LLMMessage],
        model_config: ModelConfig,
        tools: list[Tool] | None = None,
        reuse_history: bool = True,
        use_cache: bool = True
    ) -> LLMResponse:
        """重写chat方法,添加缓存支持"""
        if use_cache and not tools:  # 工具调用不缓存
            request_hash = get_request_hash(messages, model_config.model_name)
            if request_hash in self.cache:
                return self.cache[request_hash]
                
            # 调用缓存版本的chat方法
            response = self.cached_chat(
                request_hash, messages, model_config, tools, reuse_history
            )
            self.cache[request_hash] = response
            return response
        else:
            return super().chat(messages, model_config, tools, reuse_history)

网络错误处理与重试机制

网络请求过程中难免会遇到各种错误,如连接超时、API限流、服务器错误等。Trae Agent提供了健壮的错误处理机制,确保在遇到这些问题时能够优雅地恢复或给出有意义的错误信息。

常见网络错误类型及处理策略

错误类型 描述 处理策略 重试建议
连接超时 无法与服务器建立连接 检查网络连接、服务器状态 是,使用指数退避策略
连接拒绝 服务器拒绝连接 检查URL和端口是否正确 是,短暂延迟后重试
API密钥无效 认证失败 检查API密钥是否正确 否,需要人工干预
权限不足 没有足够权限执行请求 检查API密钥权限 否,需要人工干预
请求频率超限 API调用频率超过限制 减少请求频率 是,等待限流周期后重试
请求大小超限 请求超过API大小限制 减小请求大小 否,需要调整请求
服务器错误 服务器内部错误 等待服务器恢复 是,延迟后重试
响应格式错误 API返回非预期格式 检查API文档和版本 否,需要代码修复

错误处理实现

Trae Agent的错误处理机制主要通过handle_api_error方法实现,该方法位于OpenAICompatibleClient类中:

def handle_api_error(self, error: Exception) -> None:
    """处理API错误"""
    error_msg = f"API调用失败: {str(error)}"
    
    # 区分不同类型的错误
    if isinstance(error, openai.AuthenticationError):
        # 认证错误
        raise AuthenticationError(
            f"API认证失败: {str(error)}. "
            "请检查您的API密钥是否正确。"
        ) from error
        
    elif isinstance(error, openai.PermissionDeniedError):
        # 权限错误
        raise PermissionError(
            f"API权限不足: {str(error)}. "
            "请检查您的API密钥是否具有足够的权限。"
        ) from error
        
    elif isinstance(error, openai.RateLimitError):
        # 限流错误
        retry_after = getattr(error, 'headers', {}).get('Retry-After', 60)
        raise RateLimitError(
            f"API请求频率超限: {str(error)}. "
            f"请在{retry_after}秒后重试。"
        ) from error
        
    elif isinstance(error, openai.APIConnectionError):
        # 连接错误
        raise ConnectionError(
            f"API连接失败: {str(error)}. "
            "请检查您的网络连接和API URL是否正确。"
        ) from error
        
    elif isinstance(error, openai.Timeout):
        # 超时错误
        raise TimeoutError(
            f"API请求超时: {str(error)}. "
            "请检查网络连接或尝试增加超时时间。"
        ) from error
        
    elif isinstance(error, openai.BadRequestError):
        # 请求错误
        raise RequestError(
            f"API请求格式错误: {str(error)}. "
            "请检查请求参数是否符合API要求。"
        ) from error
        
    elif isinstance(error, openai.NotFoundError):
        # 资源未找到
        raise ResourceNotFoundError(
            f"API资源未找到: {str(error)}. "
            "请检查请求的资源路径是否正确。"
        ) from error
        
    else:
        # 其他未知错误
        raise APIError(f"API调用发生未知错误: {str(error)}") from error

重试机制实现

Trae Agent使用重试装饰器实现网络请求的自动重试功能,结合指数退避策略,在网络不稳定的情况下提高请求成功率:

import time
from functools import wraps
from typing import Callable, TypeVar, Any

# 定义泛型类型变量
F = TypeVar('F', bound=Callable[..., Any])

def retry_api_call(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0,
    retry_on_exceptions: tuple[type[Exception], ...] = (
        ConnectionError, TimeoutError, RateLimitError
    )
) -> Callable[[F], F]:
    """
    API调用重试装饰器
    
    Args:
        max_retries: 最大重试次数
        initial_delay: 初始延迟(秒)
        backoff_factor: 退避因子,每次重试延迟 = initial_delay * (backoff_factor **重试次数)
        retry_on_exceptions: 需要重试的异常类型
    """
    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            retry_count = 0
            while retry_count <= max_retries:
                try:
                    return func(*args, **kwargs)
                except retry_on_exceptions as e:
                    # 检查是否还有重试次数
                    if retry_count == max_retries:
                        raise  # 达到最大重试次数,抛出异常
                        
                    # 计算重试延迟
                    delay = initial_delay * (backoff_factor **retry_count)
                    retry_count += 1
                    
                    # 记录重试信息
                    print(f"API调用失败: {str(e)}. "
                          f"将在{delay:.2f}秒后进行第{retry_count}次重试(共{max_retries}次)。")
                    
                    # 等待重试
                    time.sleep(delay)
            raise  # 理论上不会到达这里
        return wrapper  # type: ignore
    return decorator

# 使用示例
class OpenAICompatibleClient(BaseLLMClient):
    # ... 其他代码 ...
    
    @retry_api_call(
        max_retries=3,
        initial_delay=1.0,
        backoff_factor=2.0
    )
    def chat(
        self,
        messages: list[LLMMessage],
        model_config: ModelConfig,
        tools: list[Tool] | None = None,
        reuse_history: bool = True,
    ) -> LLMResponse:
        # ... 聊天实现 ...

错误监控与告警

为了及时发现和解决网络请求问题,建议实现错误监控与告警机制:

import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("trae_agent.network")

class ErrorMonitor:
    """错误监控器"""
    
    def __init__(self, alert_threshold: int = 5, alert_window: int = 60):
        """
        Args:
            alert_threshold: 告警阈值,在alert_window时间内超过此数量的错误将触发告警
            alert_window: 告警窗口大小(秒)
        """
        self.alert_threshold = alert_threshold
        self.alert_window = alert_window
        self.error_history: list[datetime] = []
        self.alert_sent = False
        
    def record_error(self, error: Exception) -> None:
        """记录错误"""
        now = datetime.now()
        self.error_history.append(now)
        
        # 移除窗口外的错误记录
        self.error_history = [
            t for t in self.error_history
            if (now - t).total_seconds() <= self.alert_window
        ]
        
        # 检查是否需要触发告警
        if len(self.error_history) >= self.alert_threshold and not self.alert_sent:
            self.trigger_alert()
            self.alert_sent = True  # 防止重复告警
            
        # 如果错误数量低于阈值,重置告警状态
        if len(self.error_history) < self.alert_threshold:
            self.alert_sent = False
            
        # 记录错误详情
        logger.error(f"网络请求错误: {str(error)}", exc_info=True)
        
    def trigger_alert(self) -> None:
        """触发告警"""
        error_count = len(self.error_history)
        logger.critical(
            f"网络请求错误率过高!在过去{self.alert_window}秒内发生了{error_count}次错误。"
        )
        
        # 这里可以添加发送邮件、短信或其他通知的代码
        # send_alert_email(error_count, self.alert_window)

MCP客户端与工具网络请求

除了LLM API调用外,Trae Agent还通过MCP(多工具协作协议)客户端与各种外部工具进行通信。MCP客户端负责管理与工具服务的连接,协调工具调用,并处理工具返回的数据。

MCP客户端实现

MCP客户端的核心功能是连接到MCP服务器,发现可用工具,并调用这些工具完成特定任务:

from contextlib import AsyncExitStack
from enum import Enum

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from ..tools.mcp_tool import MCPTool
from .config import MCPServerConfig


class MCPServerStatus(Enum):
    DISCONNECTED = "disconnected"  # 服务器未连接或出现错误
    CONNECTING = "connecting"      # 正在连接服务器
    CONNECTED = "connected"        # 服务器已连接并就绪


class MCPClient:
    def __init__(self):
        # 初始化会话和客户端对象
        self.session: ClientSession | None = None
        self.exit_stack = AsyncExitStack()
        self.mcp_servers_status: dict[str, MCPServerStatus] = {}

    def get_mcp_server_status(self, mcp_server_name: str) -> MCPServerStatus:
        """获取MCP服务器状态"""
        return self.mcp_servers_status.get(mcp_server_name, MCPServerStatus.DISCONNECTED)

    def update_mcp_server_status(self, mcp_server_name, status: MCPServerStatus):
        """更新MCP服务器状态"""
        self.mcp_servers_status[mcp_server_name] = status

    async def connect_and_discover(
        self,
        mcp_server_name: str,
        mcp_server_config: MCPServerConfig,
        mcp_tools_container: list,
        model_provider,
    ):
        """连接到MCP服务器并发现可用工具"""
        transport = None
        if mcp_server_config.http_url:
            # HTTP传输(尚未实现)
            raise NotImplementedError("HTTP传输尚未实现")
        elif mcp_server_config.url:
            # WebSocket传输(尚未实现)
            raise NotImplementedError("WebSocket传输尚未实现")
        elif mcp_server_config.command:
            # 标准输入输出传输
            params = StdioServerParameters(
                command=mcp_server_config.command,
                args=mcp_server_config.args,
                env=mcp_server_config.env,
                cwd=mcp_server_config.cwd,
            )
            transport = await self.exit_stack.enter_async_context(stdio_client(params))
        else:
            # 无效配置
            raise ValueError(
                f"{mcp_server_name}的MCP服务器配置无效。"
                "请提供命令或URL。"
            )
            
        try:
            # 连接服务器
            await self.connect_to_server(mcp_server_name, transport)
            
            # 列出可用工具
            mcp_tools = await self.list_tools()
            
            # 创建MCP工具对象
            for tool in mcp_tools.tools:
                mcp_tool = MCPTool(self, tool, model_provider)
                mcp_tools_container.append(mcp_tool)
                
        except Exception as e:
            # 连接或发现过程中出错
            logger.error(f"MCP服务器连接或工具发现失败: {str(e)}")
            raise

    async def connect_to_server(self, mcp_server_name, transport):
        """连接到MCP服务器"""
        if self.get_mcp_server_status(mcp_server_name) != MCPServerStatus.CONNECTED:
            self.update_mcp_server_status(mcp_server_name, MCPServerStatus.CONNECTING)
            try:
                stdio, write = transport
                self.session = await self.exit_stack.enter_async_context(
                    ClientSession(stdio, write)
                )
                await self.session.initialize()
                self.update_mcp_server_status(mcp_server_name, MCPServerStatus.CONNECTED)
                logger.info(f"成功连接到MCP服务器: {mcp_server_name}")
            except Exception as e:
                self.update_mcp_server_status(mcp_server_name, MCPServerStatus.DISCONNECTED)
                logger.error(f"MCP服务器连接失败: {str(e)}")
                raise

    async def call_tool(self, name, args):
        """调用MCP工具"""
        if not self.session:
            raise ConnectionError("MCP会话未初始化,请先连接到服务器")
            
        try:
            output = await self.session.call_tool(name, args)
            return output
        except Exception as e:
            logger.error(f"MCP工具调用失败: {name}, 参数: {args}, 错误: {str(e)}")
            raise

    async def list_tools(self):
        """列出可用工具"""
        if not self.session:
            raise ConnectionError("MCP会话未初始化,请先连接到服务器")
            
        try:
            tools = await self.session.list_tools()
            return tools
        except Exception as e:
            logger.error(f"获取MCP工具列表失败: {str(e)}")
            raise

    async def cleanup(self, mcp_server_name):
        """清理资源"""
        await self.exit_stack.aclose()
        self.update_mcp_server_status(mcp_server_name, MCPServerStatus.DISCONNECTED)
        logger.info(f"MCP服务器连接已关闭: {mcp_server_name}")

工具网络请求示例

以下是一个使用HTTP请求获取外部数据的工具实现示例:

import requests
from trae_agent.tools.base import Tool, ToolParameter, ToolResult

class HttpGetTool(Tool):
    """HTTP GET请求工具,用于获取网页内容或API数据"""
    
    def __init__(self):
        super().__init__(
            name="http_get",
            description="使用HTTP GET方法获取指定URL的内容",
            parameters=[
                ToolParameter(
                    name="url",
                    type="string",
                    description="要获取的URL地址",
                    required=True
                ),
                ToolParameter(
                    name="timeout",
                    type="integer",
                    description="请求超时时间(秒)",
                    required=False,
                    default=

【免费下载链接】trae-agent Trae 代理是一个基于大型语言模型(LLM)的通用软件开发任务代理。它提供了一个强大的命令行界面(CLI),能够理解自然语言指令,并使用各种工具和LLM提供者执行复杂的软件开发工作流程。 【免费下载链接】trae-agent 项目地址: https://gitcode.com/gh_mirrors/tr/trae-agent

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐