FastMCP日志工具:分布式日志管理的统一接口

【免费下载链接】fastmcp The fast, Pythonic way to build Model Context Protocol servers 🚀 【免费下载链接】fastmcp 项目地址: https://gitcode.com/GitHub_Trending/fa/fastmcp

概述

在现代AI应用开发中,分布式系统的日志管理是一个关键挑战。FastMCP作为Model Context Protocol(模型上下文协议)的Python实现框架,提供了一套完整的分布式日志解决方案,让开发者能够在MCP服务器和客户端之间实现无缝的日志传递和管理。

本文将深入探讨FastMCP的日志工具体系,展示如何利用其统一的日志接口构建可靠的分布式AI应用。

FastMCP日志架构概览

FastMCP的日志系统采用分层架构设计,支持从服务器到客户端的全链路日志传递:

mermaid

服务器端日志发送

基础日志方法

在FastMCP服务器中,通过Context对象提供丰富的日志方法:

from fastmcp import FastMCP, Context

mcp = FastMCP("AnalyticsServer")

@mcp.tool
async def analyze_data(data: dict, ctx: Context) -> dict:
    """分析数据并记录详细日志"""
    
    # 不同级别的日志记录
    await ctx.debug("开始数据分析过程")
    await ctx.info(f"接收数据大小: {len(data)} bytes")
    await ctx.warning("数据包含空值,将进行清理")
    
    try:
        # 数据处理逻辑
        result = process_data(data)
        await ctx.info("数据分析完成", extra={"processing_time": "2.3s"})
        return result
    except Exception as e:
        await ctx.error("数据分析失败", extra={"error": str(e)})
        raise

结构化日志支持

FastMCP 2.0+ 支持结构化日志,允许传递丰富的上下文信息:

@mcp.tool
async def process_user_request(user_id: int, request_data: dict, ctx: Context):
    """处理用户请求并记录结构化日志"""
    
    await ctx.info(
        "开始处理用户请求",
        extra={
            "user_id": user_id,
            "request_type": request_data.get("type"),
            "timestamp": "2024-01-15T10:30:00Z",
            "processing_stage": "initial"
        }
    )
    
    # 处理逻辑...
    
    await ctx.info(
        "请求处理完成",
        extra={
            "user_id": user_id,
            "processing_time": "150ms",
            "result_status": "success",
            "affected_records": 3
        }
    )

客户端日志处理

自定义日志处理器

客户端可以定义灵活的日志处理策略:

import logging
from fastmcp import Client
from fastmcp.client.logging import LogMessage, LogHandler

# 配置标准Python日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mcp_server.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

async def advanced_log_handler(message: LogMessage) -> None:
    """高级日志处理器,支持多目标输出"""
    
    msg = message.data.get('msg', '')
    extra = message.data.get('extra', {})
    
    # 级别映射表
    level_mapping = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'notice': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'critical': logging.CRITICAL,
        'alert': logging.CRITICAL,
        'emergency': logging.CRITICAL
    }
    
    level = level_mapping.get(message.level, logging.INFO)
    
    # 添加服务器标识
    log_context = {**extra, 'source': 'mcp_server'}
    if message.logger:
        log_context['logger_name'] = message.logger
    
    logger.log(level, f"MCP: {msg}", extra=log_context)

# 创建客户端并配置日志处理器
client = Client(
    "analytics_server.py",
    log_handler=advanced_log_handler
)

多服务器日志聚合

FastMCP支持同时连接多个MCP服务器并统一管理日志:

from fastmcp import Client
import logging

# 配置集中式日志管理
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - [%(server)s] %(message)s'
)

async def centralized_log_handler(server_name: str):
    """为每个服务器创建专用的日志处理器"""
    
    async def handler(message: LogMessage):
        msg = message.data.get('msg', '')
        extra = {**message.data.get('extra', {}), 'server': server_name}
        
        level_map = {
            'debug': logging.DEBUG,
            'info': logging.INFO,
            'warning': logging.WARNING,
            'error': logging.ERROR,
            'critical': logging.CRITICAL
        }
        
        level = level_map.get(message.level, logging.INFO)
        logging.log(level, msg, extra=extra)
    
    return handler

# 连接多个服务器并配置各自的日志处理器
servers = {
    'analytics': {'command': 'python', 'args': ['analytics_server.py']},
    'database': {'command': 'python', 'args': ['database_server.py']},
    'weather': {'url': 'https://weather-api.example.com/mcp'}
}

clients = {}
for name, config in servers.items():
    clients[name] = Client(
        config,
        log_handler=await centralized_log_handler(name)
    )

日志级别与分类

FastMCP支持完整的日志级别体系:

MCP级别 Python映射 使用场景
debug DEBUG 详细的调试信息
info INFO 常规的运行信息
notice INFO 重要的通知信息
warning WARNING 警告信息,不影响运行
error ERROR 错误信息,功能受影响
critical CRITICAL 严重错误,系统可能崩溃
alert CRITICAL 需要立即处理的警报
emergency CRITICAL 系统不可用的紧急情况

实战案例:分布式AI应用日志系统

场景描述

构建一个包含多个MCP服务器的AI应用,需要统一的日志管理:

  • 数据分析服务器
  • 模型推理服务器
  • 数据库访问服务器
  • 外部API网关

实现方案

from fastmcp import Client, FastMCP, Context
import logging
from datetime import datetime

# 中央日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'app_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler(),
        # 可以添加更多处理器:Elasticsearch、Datadog等
    ]
)

class DistributedLogger:
    """分布式日志管理器"""
    
    def __init__(self):
        self.clients = {}
        self.setup_servers()
    
    def setup_servers(self):
        """配置所有MCP服务器"""
        servers = {
            'analytics': 'analytics_server.py',
            'model': 'model_server.py', 
            'database': 'database_server.py',
            'api_gateway': 'api_gateway_server.py'
        }
        
        for name, script in servers.items():
            self.clients[name] = Client(
                script,
                log_handler=self.create_server_log_handler(name)
            )
    
    def create_server_log_handler(self, server_name: str):
        """为每个服务器创建专用的日志处理器"""
        
        async def handler(message: LogMessage):
            msg = message.data.get('msg', '')
            extra = {
                **message.data.get('extra', {}),
                'server': server_name,
                'log_level': message.level,
                'timestamp': datetime.now().isoformat()
            }
            
            # 根据级别处理日志
            if message.level in ['error', 'critical', 'alert', 'emergency']:
                logging.error(f"[{server_name}] {msg}", extra=extra)
                # 可以添加告警逻辑:发送邮件、Slack通知等
            elif message.level == 'warning':
                logging.warning(f"[{server_name}] {msg}", extra=extra)
            else:
                logging.info(f"[{server_name}] {msg}", extra=extra)
        
        return handler
    
    async def execute_workflow(self):
        """执行分布式工作流并监控日志"""
        async with self.clients['analytics'] as analytics, \
                   self.clients['model'] as model, \
                   self.clients['database'] as db:
            
            # 执行工作流
            try:
                # 数据分析
                data = await analytics.call_tool('analyze_data', {'input': 'sample'})
                
                # 模型推理
                prediction = await model.call_tool('predict', data)
                
                # 存储结果
                await db.call_tool('store_result', prediction)
                
                logging.info("工作流执行成功")
                
            except Exception as e:
                logging.error(f"工作流执行失败: {e}")
                # 详细的错误信息已经通过MCP日志传递

# 使用示例
async def main():
    logger = DistributedLogger()
    await logger.execute_workflow()

最佳实践与性能优化

1. 日志级别管理

# 根据环境配置日志级别
import os

LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=getattr(logging, LOG_LEVEL))

# 生产环境减少debug日志,开发环境增加详细日志
if os.getenv('ENVIRONMENT') == 'production':
    logging.getLogger('FastMCP').setLevel(logging.WARNING)
else:
    logging.getLogger('FastMCP').setLevel(logging.DEBUG)

2. 异步日志处理

import asyncio
from concurrent.futures import ThreadPoolExecutor

# 使用线程池处理CPU密集型的日志操作
executor = ThreadPoolExecutor(max_workers=4)

async def async_log_handler(message: LogMessage):
    """异步日志处理器,避免阻塞主线程"""
    
    def sync_log_processing():
        # 复杂的日志处理逻辑
        process_log_message(message)
        # 可以在这里添加日志压缩、归档等操作
    
    # 在线程池中执行CPU密集型操作
    await asyncio.get_event_loop().run_in_executor(
        executor, sync_log_processing
    )

3. 日志采样与降噪

class SmartLogHandler:
    """智能日志处理器,支持采样和降噪"""
    
    def __init__(self, sample_rate: float = 1.0):
        self.sample_rate = sample_rate
        self.error_count = 0
        self.last_error_time = None
    
    async def __call__(self, message: LogMessage):
        # 错误日志采样:高频错误进行降采样
        if message.level in ['error', 'critical']:
            current_time = asyncio.get_event_loop().time()
            if (self.last_error_time and 
                current_time - self.last_error_time < 1.0 and
                self.error_count > 10):
                # 高频错误,进行采样
                if random.random() > 0.1:  # 90%的采样率
                    return
            
            self.error_count += 1
            self.last_error_time = current_time
        
        # 处理日志消息
        await self.process_message(message)

监控与告警集成

Prometheus指标集成

from prometheus_client import Counter, Histogram

# 定义监控指标
LOG_MESSAGES_TOTAL = Counter(
    'mcp_log_messages_total',
    'Total MCP log messages',
    ['level', 'server']
)

LOG_PROCESSING_TIME = Histogram(
    'mcp_log_processing_seconds',
    'Time spent processing log messages',
    ['level']
)

async def monitored_log_handler(message: LogMessage, server_name: str):
    """带监控的日志处理器"""
    
    with LOG_PROCESSING_TIME.labels(level=message.level).time():
        # 记录日志数量指标
        LOG_MESSAGES_TOTAL.labels(
            level=message.level, 
            server=server_name
        ).inc()
        
        # 正常的日志处理逻辑
        await default_log_handler(message)
        
        # 错误日志触发告警
        if message.level in ['error', 'critical']:
            await trigger_alert(message, server_name)

总结

FastMCP的日志工具提供了分布式AI应用中完整的日志管理解决方案:

  1. 统一的日志接口:通过Context对象提供一致的日志API
  2. 结构化日志支持:支持丰富的元数据和上下文信息
  3. 灵活的客户端处理:可定制的日志处理器和多目标输出
  4. 分布式协调:支持多服务器环境的集中式日志管理
  5. 生产就绪:包含监控、告警、性能优化等企业级特性

通过合理利用FastMCP的日志工具,开发者可以构建出可靠、可观测的分布式AI应用系统,大大简化了复杂环境下的调试和运维工作。

提示:在实际项目中,建议结合具体的业务需求设计日志策略,平衡日志详细程度和系统性能之间的关系。

【免费下载链接】fastmcp The fast, Pythonic way to build Model Context Protocol servers 🚀 【免费下载链接】fastmcp 项目地址: https://gitcode.com/GitHub_Trending/fa/fastmcp

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐