第六章 认证与安全之自定义认证提供者
MCP框架认证提供者设计与实现详解:本文档系统介绍了MCP框架中AuthProvider的抽象接口设计、核心方法实现和扩展机制。通过继承OAuthAuthorizationServerProvider协议类,开发者可自定义JWT、API Key等认证方式,文档包含客户端管理、授权流程、令牌管理等核心方法实现示例,并详细说明了认证上下文注入机制和API Key认证的具体实现,同时提供线程安全、异步支
目录
简介
本文档深入探讨了MCP(Model Context Protocol)框架中认证提供者(AuthProvider)的设计与扩展机制。通过分析OAuthAuthorizationServerProvider协议接口,详细说明如何继承和实现认证逻辑以支持JWT、API Key、LDAP等多种认证方式。文档还介绍了AuthContext如何在请求处理链中注入认证结果,并提供了自定义API Key认证的实现示例。
本节来源
AuthProvider抽象接口设计
OAuthAuthorizationServerProvider是一个泛型协议类,定义了OAuth授权服务器所需的核心方法。该接口采用类型变量AuthorizationCodeT、RefreshTokenT和AccessTokenT来支持不同类型的数据结构,增强了接口的灵活性和可扩展性。
接口主要包含以下方法类别:
- 客户端管理:
get_client和register_client用于客户端信息的查询与注册 - 授权流程:
authorize、load_authorization_code和exchange_authorization_code处理授权码流程 - 令牌管理:
load_access_token、exchange_refresh_token和revoke_token负责令牌的生命周期管理
图源
本节来源
核心方法实现
继承AuthProvider类
要创建自定义认证提供者,需要继承OAuthAuthorizationServerProvider并实现其抽象方法。以SimpleOAuthProvider为例,该类实现了基本的OAuth流程:
图源
authenticate方法实现
虽然接口中没有直接的authenticate方法,但认证逻辑主要在handle_simple_callback方法中实现。该方法验证用户名和密码是否与配置的演示凭据匹配:
async def handle_simple_callback(self, username: str, password: str, state: str) -> str:
# 获取状态数据
state_data = self.state_mapping.get(state)
if not state_data:
raise HTTPException(400, "Invalid state parameter")
# 验证演示凭据
if username != self.settings.demo_username or password != self.settings.demo_password:
raise HTTPException(401, "Invalid credentials")
# 创建授权码并存储
new_code = f"mcp_{secrets.token_hex(16)}"
auth_code = AuthorizationCode(
code=new_code,
client_id=client_id,
redirect_uri=AnyHttpUrl(redirect_uri),
expires_at=time.time() + 300,
scopes=[self.settings.mcp_scope],
code_challenge=code_challenge,
)
self.auth_codes[new_code] = auth_code
# 清理状态映射
del self.state_mapping[state]
return construct_redirect_uri(redirect_uri, code=new_code, state=state)
本节来源
get_client_info方法实现
get_client方法用于根据客户端ID检索客户端信息,这是OAuth流程中的重要环节:
async def get_client(self, client_id: str) -> OAuthClientInformationFull | None:
"""Get OAuth client information."""
return self.clients.get(client_id)
该方法简单地从内存字典中查找客户端信息,实际生产环境中可能需要从数据库或其他持久化存储中查询。
本节来源
AuthContext注入机制
AuthContextMiddleware负责在请求处理链中注入认证结果。该中间件从请求scope中提取经过身份验证的用户,并将其存储在上下文变量中,以便在整个请求生命周期内访问。
get_access_token()函数提供了便捷的方式来获取当前上下文中的访问令牌:
def get_access_token() -> AccessToken | None:
"""
从当前上下文中获取访问令牌。
返回:
如果存在经过身份验证的用户,则返回访问令牌,否则返回None。
"""
auth_user = auth_context_var.get()
return auth_user.access_token if auth_user else None
本节来源
API Key认证示例
以下是一个自定义API Key认证提供者的实现示例:
class APIKeyAuthProvider(OAuthAuthorizationServerProvider[AuthorizationCode, RefreshToken, AccessToken]):
def __init__(self, api_keys: dict[str, dict], settings: AuthSettings):
self.api_keys = api_keys
self.settings = settings
self.tokens: dict[str, AccessToken] = {}
async def authenticate(self, api_key: str) -> bool:
"""验证API Key是否有效"""
return api_key in self.api_keys and self.api_keys[api_key].get("active", True)
async def get_client_info(self, api_key: str) -> dict:
"""获取API Key关联的客户端信息"""
if api_key in self.api_keys:
return self.api_keys[api_key]
raise HTTPException(401, "Invalid API Key")
async def authorize(self, client: OAuthClientInformationFull, params: AuthorizationParams) -> str:
# 对于API Key认证,可以直接生成令牌
if "api_key" in params.scopes:
token = f"mcp_{secrets.token_hex(32)}"
self.tokens[token] = AccessToken(
token=token,
client_id=client.client_id,
scopes=params.scopes,
expires_at=int(time.time()) + 3600,
)
return f"{params.redirect_uri}?code={token}&state={params.state}"
raise AuthorizeError("invalid_scope", "API Key scope required")
async def load_access_token(self, token: str) -> AccessToken | None:
access_token = self.tokens.get(token)
if access_token and access_token.expires_at and access_token.expires_at < time.time():
del self.tokens[token]
return None
return access_token
async def revoke_token(self, token: AccessToken | RefreshToken) -> None:
if isinstance(token, AccessToken) and token.token in self.tokens:
del self.tokens[token.token]
本节来源
认证配置项说明
settings.py文件定义了认证相关的配置选项,主要包括:
class ClientRegistrationOptions(BaseModel):
enabled: bool = False
client_secret_expiry_seconds: int | None = None
valid_scopes: list[str] | None = None
default_scopes: list[str] | None = None
class RevocationOptions(BaseModel):
enabled: bool = False
class AuthSettings(BaseModel):
issuer_url: AnyHttpUrl = Field(
...,
description="颁发令牌的OAuth授权服务器URL。",
)
service_documentation_url: AnyHttpUrl | None = None
client_registration_options: ClientRegistrationOptions | None = None
revocation_options: RevocationOptions | None = None
required_scopes: list[str] | None = None
resource_server_url: AnyHttpUrl | None = Field(
...,
description="用作资源标识符的MCP服务器URL。",
)
关键配置项解释:
- issuer_url:授权服务器的URL,用于签发令牌
- client_registration_options:控制客户端注册功能的启用状态和相关选项
- revocation_options:控制令牌撤销功能的启用状态
- required_scopes:请求资源时必需的权限范围列表
- resource_server_url:资源服务器的URL,用作资源标识符
本节来源
线程安全与异步支持
认证提供者的设计充分考虑了线程安全和异步支持:
- 异步方法:所有核心方法都声明为
async,确保非阻塞执行 - 上下文变量:使用
contextvars.ContextVar来存储每个请求的认证上下文,保证线程安全 - 数据结构:内存存储使用字典等线程安全的数据结构
在高并发场景下,建议:
- 使用异步数据库连接池替代内存存储
- 实现分布式缓存来共享令牌状态
- 使用连接池管理数据库连接
图源
性能优化建议
- 缓存策略:对于频繁访问的客户端信息和令牌,使用内存缓存(如Redis)减少数据库查询
- 批量操作:在处理大量认证请求时,考虑批量验证和更新操作
- 连接池:使用数据库连接池避免频繁创建和销毁连接
- 异步I/O:确保所有I/O操作都是异步的,避免阻塞事件循环
- 监控和日志:添加适当的监控指标和日志记录,便于性能分析和问题排查
通过合理实现和优化,自定义认证提供者可以高效地支持各种认证需求,同时保持系统的可扩展性和安全性。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)