FastAPI WebSocket与实时通信应用开发
FastAPI WebSocket与实时通信应用开发【免费下载链接】fastapiFastAPI framework, high performance, easy to learn, fast to code, ready for production...
FastAPI WebSocket与实时通信应用开发
本文深入探讨了FastAPI框架中WebSocket的实现与应用,涵盖了WebSocket连接的建立流程、消息处理机制、连接状态管理、心跳检测技术以及与HTTP API的混合应用开发。文章通过详细的代码示例和架构图展示了如何在FastAPI中构建高效的实时通信系统,包括连接握手、消息类型处理、异常处理、性能优化策略和安全认证机制。
WebSocket连接建立与消息处理
FastAPI基于Starlette提供了强大的WebSocket支持,使得构建实时通信应用变得异常简单。WebSocket协议允许在客户端和服务器之间建立持久连接,实现双向实时数据传输,非常适合聊天应用、实时通知、在线游戏等场景。
WebSocket连接建立流程
在FastAPI中建立WebSocket连接需要遵循特定的流程,主要包括连接握手、连接接受和消息循环处理三个阶段。
连接握手与接受
WebSocket连接的建立始于客户端发起握手请求,服务器需要正确响应才能建立连接:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# 1. 接受WebSocket连接请求
await websocket.accept()
# 2. 进入消息处理循环
while True:
# 3. 接收消息
data = await websocket.receive_text()
# 4. 处理并发送响应
await websocket.send_text(f"Message text was: {data}")
上述代码展示了最基本的WebSocket端点实现。websocket.accept()方法是建立连接的关键步骤,必须在处理任何消息之前调用。
连接建立流程图
消息接收与处理
FastAPI的WebSocket支持多种消息类型,开发者可以根据需要选择合适的数据格式:
消息类型支持
| 消息类型 | 方法 | 描述 | 适用场景 |
|---|---|---|---|
| 文本消息 | receive_text() |
接收UTF-8编码的文本数据 | 聊天消息、JSON数据 |
| 二进制消息 | receive_bytes() |
接收二进制数据 | 文件传输、图片 |
| JSON消息 | receive_json() |
接收并解析JSON数据 | 结构化数据交换 |
消息处理示例
@app.websocket("/ws/advanced")
async def advanced_websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 等待接收消息
message = await websocket.receive()
if "text" in message:
# 处理文本消息
text_data = message["text"]
await websocket.send_text(f"文本消息: {text_data}")
elif "bytes" in message:
# 处理二进制消息
binary_data = message["bytes"]
await websocket.send_bytes(binary_data)
elif "json" in message:
# 处理JSON消息
json_data = message["json"]
response = {"received": json_data, "status": "processed"}
await websocket.send_json(response)
except WebSocketDisconnect:
print("客户端断开连接")
连接状态管理
WebSocket连接有不同的状态,了解这些状态对于正确处理连接生命周期至关重要:
WebSocket状态常量
from starlette.websockets import WebSocketState
# 连接状态检查示例
@app.websocket("/ws/status")
async def status_aware_websocket(websocket: WebSocket):
await websocket.accept()
# 检查连接状态
if websocket.client_state == WebSocketState.CONNECTING:
print("连接正在建立")
elif websocket.client_state == WebSocketState.CONNECTED:
print("连接已建立")
elif websocket.client_state == WebSocketState.DISCONNECTED:
print("连接已断开")
连接状态转换图
错误处理与连接关闭
健壮的WebSocket应用需要妥善处理异常情况和连接关闭:
异常处理模式
@app.websocket("/ws/robust")
async def robust_websocket_endpoint(websocket: WebSocket):
try:
await websocket.accept()
while True:
try:
data = await websocket.receive_text()
# 业务逻辑处理
if data == "close":
# 正常关闭连接
await websocket.close(code=1000, reason="正常关闭")
break
await websocket.send_text(f"处理: {data}")
except ValueError as e:
# 处理数据格式错误
await websocket.send_text(f"错误: {str(e)}")
except RuntimeError as e:
# 处理运行时错误
await websocket.close(code=1011, reason=f"服务器错误: {str(e)}")
break
except WebSocketDisconnect as e:
print(f"客户端断开,代码: {e.code}, 原因: {e.reason}")
finally:
# 清理资源
print("连接清理完成")
WebSocket关闭代码说明
| 关闭代码 | 常量 | 描述 | 适用场景 |
|---|---|---|---|
| 1000 | NORMAL_CLOSURE | 正常关闭 | 正常终止连接 |
| 1001 | GOING_AWAY | 端点离开 | 服务器关闭或浏览器导航 |
| 1002 | PROTOCOL_ERROR | 协议错误 | 协议违反 |
| 1003 | UNSUPPORTED_DATA | 不支持的数据类型 | 接收到不支持的数据 |
| 1008 | POLICY_VIOLATION | 策略违反 | 身份验证失败等 |
| 1011 | INTERNAL_ERROR | 服务器内部错误 | 服务器处理异常 |
性能优化建议
对于生产环境的WebSocket应用,需要考虑以下性能优化措施:
- 连接池管理: 使用连接管理器维护活跃连接
- 消息批处理: 对高频消息进行批量处理减少IO操作
- 心跳机制: 实现心跳包检测保持连接活跃
- 负载均衡: 在多个服务器实例间分配连接负载
# 简单的连接管理器示例
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"用户消息: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
通过上述模式,开发者可以构建出稳定、高效的WebSocket实时通信应用,充分利用FastAPI提供的WebSocket支持能力。
实时数据推送与双向通信机制
在FastAPI的WebSocket实现中,实时数据推送与双向通信机制是其最强大的特性之一。通过WebSocket协议,服务器能够主动向客户端推送数据,同时客户端也可以实时向服务器发送消息,实现了真正的双向实时通信。
WebSocket连接的生命周期管理
WebSocket连接的生命周期包括连接建立、消息传输和连接关闭三个阶段。FastAPI通过@app.websocket()装饰器提供了简洁的API来处理这些生命周期事件。
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# 1. 连接建立阶段
await websocket.accept()
try:
# 2. 消息传输阶段
while True:
# 接收客户端消息
data = await websocket.receive_text()
# 处理消息并发送响应
await websocket.send_text(f"Echo: {data}")
except Exception as e:
# 3. 连接关闭阶段
print(f"Connection closed: {e}")
消息接收与发送机制
FastAPI提供了多种消息接收和发送方法,支持文本、二进制和JSON格式的数据传输:
| 方法 | 描述 | 返回值类型 |
|---|---|---|
websocket.receive_text() |
接收文本消息 | str |
websocket.receive_bytes() |
接收二进制消息 | bytes |
websocket.receive_json() |
接收JSON消息 | dict/list |
websocket.send_text() |
发送文本消息 | None |
websocket.send_bytes() |
发送二进制消息 | None |
websocket.send_json() |
发送JSON消息 | None |
@app.websocket("/ws/data")
async def data_websocket(websocket: WebSocket):
await websocket.accept()
while True:
# 接收JSON格式的数据
data = await websocket.receive_json()
# 处理数据
processed_data = process_data(data)
# 发送处理后的JSON响应
await websocket.send_json({
"status": "success",
"processed_data": processed_data,
"timestamp": datetime.now().isoformat()
})
连接状态管理与异常处理
在实时通信中,连接状态的稳定性至关重要。FastAPI提供了完善的异常处理机制:
from fastapi import WebSocketDisconnect
import asyncio
@app.websocket("/ws/robust")
async def robust_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
try:
data = await asyncio.wait_for(
websocket.receive_text(),
timeout=30.0
)
await websocket.send_text(f"Processed: {data.upper()}")
except asyncio.TimeoutError:
# 发送心跳包保持连接
await websocket.send_text("ping")
continue
except WebSocketDisconnect:
print("Client disconnected normally")
except Exception as e:
print(f"Unexpected error: {e}")
# 尝试发送错误信息
try:
await websocket.send_json({
"error": "internal_error",
"message": str(e)
})
except:
pass
多客户端连接管理
对于需要处理多个客户端连接的场景,可以使用连接管理器来维护活跃连接:
from typing import List, Dict
import json
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.connection_data: Dict[str, dict] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
self.connection_data[client_id] = {
"connected_at": datetime.now(),
"message_count": 0
}
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
if client_id in self.connection_data:
del self.connection_data[client_id]
async def send_personal_message(self, message: str, client_id: str):
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
self.connection_data[client_id]["message_count"] += 1
async def broadcast(self, message: str):
for connection in self.active_connections.values():
await connection.send_text(message)
for data in self.connection_data.values():
data["message_count"] += 1
def get_connection_stats(self):
return {
"total_connections": len(self.active_connections),
"connections": self.connection_data
}
manager = ConnectionManager()
实时数据推送模式
在实时应用中,常见的数据推送模式包括:
- 定时推送模式:定期向客户端发送数据更新
- 事件驱动模式:在特定事件发生时立即推送数据
- 请求-响应模式:客户端请求数据,服务器立即响应
- 广播模式:向所有连接的客户端发送相同数据
import asyncio
from datetime import datetime, timedelta
@app.websocket("/ws/realtime")
async def realtime_updates(websocket: WebSocket):
await websocket.accept()
# 定时推送示例:每秒发送一次时间更新
async def send_time_updates():
while True:
try:
current_time = datetime.now().isoformat()
await websocket.send_json({
"type": "time_update",
"data": current_time
})
await asyncio.sleep(1)
except:
break
# 启动定时任务
update_task = asyncio.create_task(send_time_updates())
try:
# 处理客户端消息
while True:
data = await websocket.receive_json()
if data.get("type") == "request_data":
# 立即响应客户端请求
await websocket.send_json({
"type": "data_response",
"data": fetch_requested_data(data["request"]),
"timestamp": datetime.now().isoformat()
})
except WebSocketDisconnect:
update_task.cancel()
except Exception as e:
update_task.cancel()
print(f"Error in realtime updates: {e}")
性能优化与最佳实践
为了确保实时通信的性能和可靠性,建议采用以下最佳实践:
- 连接心跳机制:定期发送心跳包保持连接活跃
- 消息压缩:对大型消息进行压缩处理
- 连接池管理:合理管理连接资源,避免内存泄漏
- 错误重试机制:实现自动重连和错误恢复
- 消息队列:使用消息队列处理高并发消息
from typing import Optional
import zlib
import json
class OptimizedConnectionManager:
def __init__(self):
self.connections = {}
self.heartbeat_interval = 30 # 30秒心跳间隔
async def send_compressed_json(self, websocket: WebSocket, data: dict):
"""发送压缩的JSON数据"""
json_str = json.dumps(data)
compressed_data = zlib.compress(json_str.encode())
await websocket.send_bytes(compressed_data)
async def receive_compressed_json(self, websocket: WebSocket) -> Optional[dict]:
"""接收压缩的JSON数据"""
try:
compressed_data = await websocket.receive_bytes()
json_str = zlib.decompress(compressed_data).decode()
return json.loads(json_str)
except:
return None
async def maintain_connection(self, websocket: WebSocket, client_id: str):
"""维护连接,包括心跳和错误处理"""
await websocket.accept()
self.connections[client_id] = websocket
try:
while True:
# 等待消息或超时
try:
data = await asyncio.wait_for(
self.receive_compressed_json(websocket),
timeout=self.heartbeat_interval
)
if data and data.get("type") == "heartbeat":
continue # 忽略心跳包
# 处理业务消息
await self.handle_message(data, client_id)
except asyncio.TimeoutError:
# 发送心跳包
await self.send_compressed_json(websocket, {
"type": "heartbeat",
"timestamp": datetime.now().isoformat()
})
except WebSocketDisconnect:
self.disconnect(client_id)
except Exception as e:
print(f"Connection error for {client_id}: {e}")
self.disconnect(client_id)
def disconnect(self, client_id: str):
if client_id in self.connections:
del self.connections[client_id]
安全考虑与认证机制
在实时通信中,安全性同样重要。FastAPI支持WebSocket连接的认证和授权:
from fastapi import WebSocketException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_websocket_user(websocket: WebSocket):
"""验证WebSocket连接的用户身份"""
token = websocket.query_params.get("token")
if not token:
raise WebSocketException(
code=status.WS_1008_POLICY_VIOLATION,
reason="Authentication required"
)
user = await verify_token(token)
if not user:
raise WebSocketException(
code=status.WS_1008_POLICY_VIOLATION,
reason="Invalid authentication credentials"
)
return user
@app.websocket("/ws/secure")
async def secure_websocket(websocket: WebSocket):
# 身份验证
user = await get_websocket_user(websocket)
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
# 基于用户权限处理消息
if not has_permission(user, data.get("action")):
await websocket.send_json({
"error": "permission_denied",
"message": "Insufficient permissions"
})
continue
# 处理授权后的消息
result = await process_authorized_message(user, data)
await websocket.send_json(result)
except WebSocketDisconnect:
print(f"User {user.username} disconnected")
通过上述机制,FastAPI提供了强大而灵活的实时数据推送与双向通信能力,能够满足各种实时应用场景的需求,从简单的聊天应用到复杂的实时数据监控系统。
连接状态管理与心跳检测
在FastAPI WebSocket实时通信应用中,连接状态管理和心跳检测是确保系统稳定性和可靠性的关键技术。WebSocket连接虽然是持久化的,但在复杂的网络环境中,连接可能会因为各种原因意外断开,因此需要一套完善的机制来监控连接状态并及时处理异常情况。
连接状态管理
FastAPI基于Starlette框架构建,其WebSocket实现提供了完整的连接状态管理机制。每个WebSocket连接都有明确的状态生命周期,我们可以通过状态机来跟踪和管理连接。
WebSocket状态机
FastAPI WebSocket连接包含以下核心状态:
| 状态 | 描述 | 触发条件 |
|---|---|---|
CONNECTING |
连接建立中 | WebSocket握手阶段 |
CONNECTED |
连接已建立 | 握手成功,可进行数据传输 |
DISCONNECTED |
连接已断开 | 主动关闭或异常断开 |
状态监控实现
在FastAPI中,我们可以通过WebSocketState来监控连接状态:
from fastapi import WebSocket, WebSocketState
from fastapi.websockets import WebSocketDisconnect
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
self.connection_states: dict[str, WebSocketState] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
self.connection_states[client_id] = websocket.client_state
def get_connection_state(self, client_id: str) -> WebSocketState:
return self.connection_states.get(client_id, WebSocketState.DISCONNECTED)
async def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
self.connection_states[client_id] = WebSocketState.DISCONNECTED
心跳检测机制
心跳检测是维持WebSocket连接活跃性的重要技术,通过定期发送小数据包来确认连接是否仍然有效。
心跳协议设计
实现方案对比
下表展示了不同心跳检测方案的优缺点:
| 方案类型 | 实现复杂度 | 可靠性 | 资源消耗 | 适用场景 |
|---|---|---|---|---|
| 客户端发起 | 低 | 中 | 低 | 简单应用 |
| 服务端发起 | 中 | 高 | 中 | 大多数场景 |
| 双向心跳 | 高 | 最高 | 高 | 关键业务 |
完整的心跳检测实现
import asyncio
import time
from typing import Dict
from fastapi import WebSocket, WebSocketDisconnect
class HeartbeatManager:
def __init__(self, timeout: int = 30, interval: int = 10):
self.timeout = timeout # 心跳超时时间(秒)
self.interval = interval # 心跳间隔(秒)
self.last_heartbeats: Dict[str, float] = {}
self.heartbeat_tasks: Dict[str, asyncio.Task] = {}
async def start_heartbeat(self, websocket: WebSocket, client_id: str):
"""启动心跳检测任务"""
self.last_heartbeats[client_id] = time.time()
# 创建心跳发送任务
heartbeat_task = asyncio.create_task(
self._send_heartbeats(websocket, client_id)
)
self.heartbeat_tasks[client_id] = heartbeat_task
# 创建心跳检测任务
check_task = asyncio.create_task(
self._check_heartbeats(websocket, client_id)
)
self.heartbeat_tasks[f"{client_id}_check"] = check_task
async def _send_heartbeats(self, websocket: WebSocket, client_id: str):
"""定期发送心跳包"""
try:
while True:
await asyncio.sleep(self.interval)
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.send_json({
"type": "heartbeat",
"timestamp": time.time()
})
except WebSocketDisconnect:
await self.cleanup(client_id)
except Exception as e:
print(f"Heartbeat send error for {client_id}: {e}")
await self.cleanup(client_id)
async def _check_heartbeats(self, websocket: WebSocket, client_id: str):
"""检查心跳响应"""
try:
while True:
await asyncio.sleep(5) # 每5秒检查一次
last_beat = self.last_heartbeats.get(client_id)
if last_beat and time.time() - last_beat > self.timeout:
print(f"Heartbeat timeout for {client_id}")
await self.cleanup(client_id)
break
except Exception as e:
print(f"Heartbeat check error for {client_id}: {e}")
await self.cleanup(client_id)
async def handle_heartbeat_response(self, client_id: str):
"""处理心跳响应"""
self.last_heartbeats[client_id] = time.time()
async def cleanup(self, client_id: str):
"""清理资源"""
for task_key in [client_id, f"{client_id}_check"]:
if task_key in self.heartbeat_tasks:
self.heartbeat_tasks[task_key].cancel()
del self.heartbeat_tasks[task_key]
if client_id in self.last_heartbeats:
del self.last_heartbeats[client_id]
集成到WebSocket处理器
将状态管理和心跳检测集成到实际的WebSocket处理器中:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
connection_manager = ConnectionManager()
heartbeat_manager = HeartbeatManager(timeout=30, interval=15)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await connection_manager.connect(websocket, client_id)
await heartbeat_manager.start_heartbeat(websocket, client_id)
try:
while True:
data = await websocket.receive_json()
# 处理心跳响应
if data.get("type") == "heartbeat_ack":
await heartbeat_manager.handle_heartbeat_response(client_id)
continue
# 处理业务消息
if data.get("type") == "message":
await connection_manager.broadcast_message(
f"Client {client_id} says: {data['content']}"
)
except WebSocketDisconnect:
await connection_manager.disconnect(client_id)
await heartbeat_manager.cleanup(client_id)
except Exception as e:
print(f"WebSocket error for {client_id}: {e}")
await connection_manager.disconnect(client_id)
await heartbeat_manager.cleanup(client_id)
客户端心跳处理
完整的解决方案还需要客户端的配合:
// 客户端JavaScript示例
let heartbeatInterval;
let lastHeartbeatTime = 0;
const HEARTBEAT_TIMEOUT = 35000; // 35秒超时
function connectWebSocket() {
const ws = new WebSocket("ws://localhost:8000/ws/client123");
ws.onopen = function() {
console.log("WebSocket连接已建立");
// 启动心跳检测
heartbeatInterval = setInterval(() => {
if (Date.now() - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
console.log("心跳超时,重新连接...");
ws.close();
reconnect();
return;
}
ws.send(JSON.stringify({
type: "heartbeat",
timestamp: Date.now()
}));
}, 15000); // 每15秒发送一次心跳
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.type === "heartbeat") {
// 响应服务器心跳
ws.send(JSON.stringify({
type: "heartbeat_ack",
timestamp: data.timestamp
}));
lastHeartbeatTime = Date.now();
}
};
ws.onclose = function() {
clearInterval(heartbeatInterval);
console.log("WebSocket连接已关闭");
};
}
function reconnect() {
setTimeout(connectWebSocket, 5000); // 5秒后重连
}
性能优化建议
在实际生产环境中,心跳检测需要考虑性能优化:
- 连接池管理:使用连接池来管理大量WebSocket连接
- 批量处理:对心跳检测进行批量处理,减少系统调用
- 自适应间隔:根据网络状况动态调整心跳间隔
- 优雅降级:在系统负载高时适当降低心跳频率
class AdaptiveHeartbeatManager(HeartbeatManager):
def __init__(self, base_interval: int = 10, max_interval: int = 60):
super().__init__()
self.base_interval = base_interval
self.max_interval = max_interval
self.network_quality = 1.0 # 网络质量系数(0.0-1.0)
def calculate_interval(self) -> int:
"""根据网络质量计算自适应间隔"""
# 网络质量越好,间隔可以越长
interval = self.base_interval + int(
(self.max_interval - self.base_interval) * (1 - self.network_quality)
)
return max(self.base_interval, min(interval, self.max_interval))
async def update_network_quality(self, success_rate: float):
"""更新网络质量评估"""
self.network_quality = success_rate
通过完善的连接状态管理和智能心跳检测机制,可以显著提升FastAPI WebSocket应用的稳定性和可靠性,确保实时通信系统在各种网络环境下都能正常工作。
WebSocket与HTTP API的混合应用
在现代Web应用开发中,实时通信功能已成为不可或缺的需求。FastAPI作为高性能的Python Web框架,不仅提供了强大的HTTP API支持,还内置了完整的WebSocket功能。本文将深入探讨如何在FastAPI应用中同时使用WebSocket和HTTP API,构建功能丰富的混合应用。
混合应用架构设计
在FastAPI中,WebSocket和HTTP API可以完美共存于同一个应用中。这种混合架构允许我们:
- 使用HTTP API处理传统的请求-响应交互
- 利用WebSocket实现实时双向通信
- 共享相同的依赖注入系统
- 统一的路由管理和中间件处理
基础混合应用示例
下面是一个简单的聊天应用示例,同时包含HTTP API和WebSocket功能:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import List, Dict
import uuid
app = FastAPI()
# 数据模型
class ChatMessage(BaseModel):
message: str
user_id: str
class UserConnection:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.user_rooms: Dict[str, List[str]] = {}
# 全局连接管理器
connection_manager = UserConnection()
# HTTP API端点
@app.get("/")
async def get():
return HTMLResponse("""
<html>
<body>
<h1>聊天应用</h1>
<form action="/send-message" method="post">
<input type="text" name="message" placeholder="输入消息">
<input type="text" name="user_id" placeholder="用户ID">
<button type="submit">发送</button>
</form>
<div id="messages"></div>
<script>
const ws = new WebSocket(`ws://localhost:8000/ws`);
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const message = document.createElement('div');
message.textContent = event.data;
messages.appendChild(message);
};
</script>
</body>
</html>
""")
@app.post("/send-message")
async def send_message(message: ChatMessage):
"""通过HTTP API发送消息到WebSocket连接"""
if message.user_id not in connection_manager.active_connections:
raise HTTPException(status_code=404, detail="用户未连接")
# 这里可以添加消息处理逻辑
await connection_manager.active_connections[message.user_id].send_text(
f"API消息: {message.message}"
)
return {"status": "消息已发送"}
# WebSocket端点
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await websocket.accept()
connection_manager.active_connections[user_id] = websocket
try:
while True:
data = await websocket.receive_text()
# 广播消息给所有连接的用户
for connection in connection_manager.active_connections.values():
await connection.send_text(f"用户{user_id}: {data}")
except WebSocketDisconnect:
del connection_manager.active_connections[user_id]
# 通知其他用户该用户已断开连接
for connection in connection_manager.active_connections.values():
await connection.send_text(f"用户{user_id}已离开聊天")
高级混合模式:实时通知系统
下面是一个更复杂的示例,展示如何构建一个实时通知系统,其中HTTP API用于创建通知,WebSocket用于实时推送:
from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import asyncio
import json
from datetime import datetime
app = FastAPI()
# 数据模型
class Notification(BaseModel):
id: str
title: str
message: str
user_id: str
created_at: datetime
read: bool = False
class NotificationCreate(BaseModel):
title: str
message: str
user_id: str
# 模拟数据库
notifications_db = {}
connected_clients = {}
# 依赖注入
async def get_user_notifications(user_id: str) -> List[Notification]:
return [n for n in notifications_db.values() if n.user_id == user_id]
# HTTP API端点
@app.post("/notifications", response_model=Notification)
async def create_notification(notification: NotificationCreate):
"""创建新通知"""
notification_id = str(len(notifications_db) + 1)
new_notification = Notification(
id=notification_id,
**notification.dict(),
created_at=datetime.now(),
read=False
)
notifications_db[notification_id] = new_notification
# 实时推送给相应用户
if notification.user_id in connected_clients:
await connected_clients[notification.user_id].send_text(
json.dumps({
"type": "new_notification",
"data": new_notification.dict()
})
)
return new_notification
@app.get("/notifications/{user_id}", response_model=List[Notification])
async def get_notifications(user_id: str):
"""获取用户通知"""
return await get_user_notifications(user_id)
# WebSocket端点
@app.websocket("/ws/notifications/{user_id}")
async def websocket_notifications(websocket: WebSocket, user_id: str):
await websocket.accept()
connected_clients[user_id] = websocket
try:
# 发送现有通知
user_notifications = await get_user_notifications(user_id)
await websocket.send_text(json.dumps({
"type": "initial_data",
"data": [n.dict() for n in user_notifications]
}))
while True:
# 保持连接活跃,处理可能的客户端消息
data = await websocket.receive_text()
message = json.loads(data)
if message.get("type") == "mark_as_read":
# 处理标记为已读
notification_id = message.get("notification_id")
if notification_id in notifications_db:
notifications_db[notification_id].read = True
except Exception as e:
print(f"WebSocket错误: {e}")
finally:
if user_id in connected_clients:
del connected_clients[user_id]
性能优化与最佳实践
在构建混合应用时,需要考虑以下性能优化策略:
连接管理优化
from fastapi import FastAPI, WebSocket
import asyncio
from collections import defaultdict
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections = defaultdict(dict)
self.connection_lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, user_id: str, room: str = "default"):
async with self.connection_lock:
self.active_connections[room][user_id] = websocket
async def disconnect(self, user_id: str, room: str = "default"):
async with self.connection_lock:
if room in self.active_connections and user_id in self.active_connections[room]:
del self.active_connections[room][user_id]
async def broadcast(self, message: str, room: str = "default"):
async with self.connection_lock:
disconnected = []
for user_id, websocket in self.active_connections[room].items():
try:
await websocket.send_text(message)
except Exception:
disconnected.append(user_id)
for user_id in disconnected:
del self.active_connections[room][user_id]
manager = ConnectionManager()
消息序列化优化
import orjson
from fastapi.encoders import jsonable_encoder
async def send_json_message(websocket: WebSocket, data: dict):
"""优化JSON消息发送"""
try:
# 使用orjson获得更好的性能
message = orjson.dumps(
data,
option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_UTC_Z
)
await websocket.send_bytes(message)
except Exception as e:
print(f"消息发送失败: {e}")
安全考虑
在混合应用中,安全性至关重要:
from fastapi import WebSocket, WebSocketDisconnect, status
from fastapi.security import OAuth2PasswordBearer
import jwt
from jwt import PyJWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user_websocket(websocket: WebSocket, token: str):
"""WebSocket连接认证"""
try:
payload = jwt.decode(token, "SECRET_KEY", algorithms=["HS256"])
return payload.get("sub")
except PyJWTError:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise WebSocketDisconnect()
@app.websocket("/ws/secure/{token}")
async def secure_websocket_endpoint(websocket: WebSocket, token: str):
await websocket.accept()
try:
user_id = await get_current_user_websocket(websocket, token)
# 安全连接建立成功
await websocket.send_text(f"欢迎用户 {user_id}")
while True:
data = await websocket.receive_text()
# 处理安全消息
await websocket.send_text(f"收到安全消息: {data}")
except WebSocketDisconnect:
print("安全连接已关闭")
监控和日志记录
为了确保混合应用的稳定性,需要完善的监控系统:
总结
FastAPI提供了强大而灵活的WebSocket支持,使其成为构建实时通信应用的理想选择。通过本文的探讨,我们了解了WebSocket连接的生命周期管理、消息处理机制、心跳检测的重要性以及与HTTP API的混合应用模式。这些技术结合FastAPI的高性能和易用性,能够帮助开发者构建出稳定、高效、安全的实时应用系统,满足从简单聊天应用到复杂实时数据推送的各种场景需求。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)