目录

  1. 简介
  2. AuthProvider抽象接口设计
  3. 核心方法实现
  4. AuthContext注入机制
  5. API Key认证示例
  6. 认证配置项说明
  7. 线程安全与异步支持
  8. 性能优化建议

简介

本文档深入探讨了MCP(Model Context Protocol)框架中认证提供者(AuthProvider)的设计与扩展机制。通过分析OAuthAuthorizationServerProvider协议接口,详细说明如何继承和实现认证逻辑以支持JWT、API Key、LDAP等多种认证方式。文档还介绍了AuthContext如何在请求处理链中注入认证结果,并提供了自定义API Key认证的实现示例。

本节来源

AuthProvider抽象接口设计

OAuthAuthorizationServerProvider是一个泛型协议类,定义了OAuth授权服务器所需的核心方法。该接口采用类型变量AuthorizationCodeTRefreshTokenTAccessTokenT来支持不同类型的数据结构,增强了接口的灵活性和可扩展性。

接口主要包含以下方法类别:

  • 客户端管理get_clientregister_client用于客户端信息的查询与注册
  • 授权流程authorizeload_authorization_codeexchange_authorization_code处理授权码流程
  • 令牌管理load_access_tokenexchange_refresh_tokenrevoke_token负责令牌的生命周期管理
"实现"
«protocol»
OAuthAuthorizationServerProvider
+get_client(client_id)
+register_client(client_info) : None
+authorize(client, params) : str
+load_authorization_code(client, auth_code)
+exchange_authorization_code(client, auth_code) : OAuthToken
+load_refresh_token(client, refresh_token)
+exchange_refresh_token(client, refresh_token, scopes) : OAuthToken
+load_access_token(token)
+revoke_token(token) : None
SimpleOAuthProvider
-settings : SimpleAuthSettings
-clients : dict[str, OAuthClientInformationFull]
-auth_codes : dict[str, AuthorizationCode]
-tokens : dict[str, AccessToken]
+get_client(client_id)
+register_client(client_info) : None
+authorize(client, params) : str
+load_authorization_code(client, auth_code)
+exchange_authorization_code(client, auth_code) : OAuthToken
+load_access_token(token)

图源

本节来源

核心方法实现

继承AuthProvider类

要创建自定义认证提供者,需要继承OAuthAuthorizationServerProvider并实现其抽象方法。以SimpleOAuthProvider为例,该类实现了基本的OAuth流程:

"客户端" "认证提供者" "资源服务器" /authorize请求 生成state并存储状态映射 重定向到登录页面 提交登录表单 验证凭据 创建授权码 重定向到回调URL /token请求交换令牌 验证授权码 生成访问令牌 返回访问令牌 带令牌的API请求 验证令牌 返回资源 "客户端" "认证提供者" "资源服务器"

图源

authenticate方法实现

虽然接口中没有直接的authenticate方法,但认证逻辑主要在handle_simple_callback方法中实现。该方法验证用户名和密码是否与配置的演示凭据匹配:

async def handle_simple_callback(self, username: str, password: str, state: str) -> str:
    # 获取状态数据
    state_data = self.state_mapping.get(state)
    if not state_data:
        raise HTTPException(400, "Invalid state parameter")
    
    # 验证演示凭据
    if username != self.settings.demo_username or password != self.settings.demo_password:
        raise HTTPException(401, "Invalid credentials")
    
    # 创建授权码并存储
    new_code = f"mcp_{secrets.token_hex(16)}"
    auth_code = AuthorizationCode(
        code=new_code,
        client_id=client_id,
        redirect_uri=AnyHttpUrl(redirect_uri),
        expires_at=time.time() + 300,
        scopes=[self.settings.mcp_scope],
        code_challenge=code_challenge,
    )
    self.auth_codes[new_code] = auth_code
    
    # 清理状态映射
    del self.state_mapping[state]
    return construct_redirect_uri(redirect_uri, code=new_code, state=state)

本节来源

get_client_info方法实现

get_client方法用于根据客户端ID检索客户端信息,这是OAuth流程中的重要环节:

async def get_client(self, client_id: str) -> OAuthClientInformationFull | None:
    """Get OAuth client information."""
    return self.clients.get(client_id)

该方法简单地从内存字典中查找客户端信息,实际生产环境中可能需要从数据库或其他持久化存储中查询。

本节来源

AuthContext注入机制

AuthContextMiddleware负责在请求处理链中注入认证结果。该中间件从请求scope中提取经过身份验证的用户,并将其存储在上下文变量中,以便在整个请求生命周期内访问。

请求开始
从请求scope提取用户
用户已认证?
将用户设置到contextvar
处理请求
重置contextvar
请求结束

get_access_token()函数提供了便捷的方式来获取当前上下文中的访问令牌:

def get_access_token() -> AccessToken | None:
    """
    从当前上下文中获取访问令牌。

    返回:
        如果存在经过身份验证的用户,则返回访问令牌,否则返回None。
    """
    auth_user = auth_context_var.get()
    return auth_user.access_token if auth_user else None

本节来源

API Key认证示例

以下是一个自定义API Key认证提供者的实现示例:

class APIKeyAuthProvider(OAuthAuthorizationServerProvider[AuthorizationCode, RefreshToken, AccessToken]):
    def __init__(self, api_keys: dict[str, dict], settings: AuthSettings):
        self.api_keys = api_keys
        self.settings = settings
        self.tokens: dict[str, AccessToken] = {}
    
    async def authenticate(self, api_key: str) -> bool:
        """验证API Key是否有效"""
        return api_key in self.api_keys and self.api_keys[api_key].get("active", True)
    
    async def get_client_info(self, api_key: str) -> dict:
        """获取API Key关联的客户端信息"""
        if api_key in self.api_keys:
            return self.api_keys[api_key]
        raise HTTPException(401, "Invalid API Key")
    
    async def authorize(self, client: OAuthClientInformationFull, params: AuthorizationParams) -> str:
        # 对于API Key认证,可以直接生成令牌
        if "api_key" in params.scopes:
            token = f"mcp_{secrets.token_hex(32)}"
            self.tokens[token] = AccessToken(
                token=token,
                client_id=client.client_id,
                scopes=params.scopes,
                expires_at=int(time.time()) + 3600,
            )
            return f"{params.redirect_uri}?code={token}&state={params.state}"
        raise AuthorizeError("invalid_scope", "API Key scope required")
    
    async def load_access_token(self, token: str) -> AccessToken | None:
        access_token = self.tokens.get(token)
        if access_token and access_token.expires_at and access_token.expires_at < time.time():
            del self.tokens[token]
            return None
        return access_token
    
    async def revoke_token(self, token: AccessToken | RefreshToken) -> None:
        if isinstance(token, AccessToken) and token.token in self.tokens:
            del self.tokens[token.token]

本节来源

认证配置项说明

settings.py文件定义了认证相关的配置选项,主要包括:

class ClientRegistrationOptions(BaseModel):
    enabled: bool = False
    client_secret_expiry_seconds: int | None = None
    valid_scopes: list[str] | None = None
    default_scopes: list[str] | None = None

class RevocationOptions(BaseModel):
    enabled: bool = False

class AuthSettings(BaseModel):
    issuer_url: AnyHttpUrl = Field(
        ...,
        description="颁发令牌的OAuth授权服务器URL。",
    )
    service_documentation_url: AnyHttpUrl | None = None
    client_registration_options: ClientRegistrationOptions | None = None
    revocation_options: RevocationOptions | None = None
    required_scopes: list[str] | None = None
    resource_server_url: AnyHttpUrl | None = Field(
        ...,
        description="用作资源标识符的MCP服务器URL。",
    )

关键配置项解释:

  • issuer_url:授权服务器的URL,用于签发令牌
  • client_registration_options:控制客户端注册功能的启用状态和相关选项
  • revocation_options:控制令牌撤销功能的启用状态
  • required_scopes:请求资源时必需的权限范围列表
  • resource_server_url:资源服务器的URL,用作资源标识符

本节来源

线程安全与异步支持

认证提供者的设计充分考虑了线程安全和异步支持:

  1. 异步方法:所有核心方法都声明为async,确保非阻塞执行
  2. 上下文变量:使用contextvars.ContextVar来存储每个请求的认证上下文,保证线程安全
  3. 数据结构:内存存储使用字典等线程安全的数据结构

在高并发场景下,建议:

  • 使用异步数据库连接池替代内存存储
  • 实现分布式缓存来共享令牌状态
  • 使用连接池管理数据库连接
客户端请求
认证中间件
提取Authorization头
异步验证令牌
令牌有效?
设置认证上下文
处理业务逻辑
返回401错误
响应客户端

图源

性能优化建议

  1. 缓存策略:对于频繁访问的客户端信息和令牌,使用内存缓存(如Redis)减少数据库查询
  2. 批量操作:在处理大量认证请求时,考虑批量验证和更新操作
  3. 连接池:使用数据库连接池避免频繁创建和销毁连接
  4. 异步I/O:确保所有I/O操作都是异步的,避免阻塞事件循环
  5. 监控和日志:添加适当的监控指标和日志记录,便于性能分析和问题排查

通过合理实现和优化,自定义认证提供者可以高效地支持各种认证需求,同时保持系统的可扩展性和安全性。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐