FastAPI WebSocket 进阶:实时消息推送与房间管理
通过此方案可构建高并发实时系统,支持数千并发连接(需配合异步消息队列优化广播性能)。通过 WebSocket 双向通道实现服务器主动推送数据,无需客户端轮询。
·
FastAPI WebSocket 进阶:实时消息推送与房间管理
核心概念
-
实时消息推送
通过 WebSocket 双向通道实现服务器主动推送数据,无需客户端轮询。 -
房间管理
将客户端分组到逻辑"房间",实现:- 定向广播(向特定房间发送消息)
- 隔离通信(不同房间消息互不干扰)
- 动态成员管理
实现方案
1. 房间管理结构
from collections import defaultdict
import asyncio
# 房间管理核心数据结构
class RoomManager:
def __init__(self):
self.rooms: dict[str, set[WebSocket]] = defaultdict(set)
self.lock = asyncio.Lock()
async def join(self, room_id: str, websocket: WebSocket):
async with self.lock:
self.rooms[room_id].add(websocket)
async def exit(self, room_id: str, websocket: WebSocket):
async with self.lock:
self.rooms[room_id].discard(websocket)
if not self.rooms[room_id]:
del self.rooms[room_id]
async def broadcast(self, room_id: str, message: str):
async with self.lock:
for ws in self.rooms[room_id].copy(): # 避免迭代时修改
await ws.send_text(message)
2. WebSocket 路由实现
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
manager = RoomManager()
@app.websocket("/ws/{room_id}/{user_id}")
async def websocket_endpoint(
websocket: WebSocket,
room_id: str,
user_id: str
):
await websocket.accept()
try:
# 加入房间
await manager.join(room_id, websocket)
# 通知房间有新成员
await manager.broadcast(
room_id,
f"User {user_id} joined room {room_id}"
)
# 消息处理循环
while True:
data = await websocket.receive_text()
# 广播消息格式: [用户ID] 消息内容
await manager.broadcast(
room_id,
f"[{user_id}] {data}"
)
except WebSocketDisconnect:
# 退出处理
await manager.exit(room_id, websocket)
await manager.broadcast(
room_id,
f"User {user_id} left room {room_id}"
)
进阶功能实现
1. 跨房间消息路由
async def route_message(source_room: str, target_room: str, message: str):
if target_room in manager.rooms:
await manager.broadcast(target_room, f"[From {source_room}] {message}")
2. 房间状态查询
@app.get("/rooms/{room_id}/users")
async def list_users(room_id: str):
return {
"room": room_id,
"user_count": len(manager.rooms.get(room_id, set())),
"active_rooms": list(manager.rooms.keys())
}
3. 异常处理增强
async def safe_broadcast(room_id: str, message: str):
broken = []
for ws in manager.rooms[room_id]:
try:
await ws.send_text(message)
except:
broken.append(ws)
# 清理失效连接
async with manager.lock:
for ws in broken:
manager.rooms[room_id].discard(ws)
性能优化建议
-
连接管理
- 使用
weakref避免内存泄漏 - 设置心跳机制检测失效连接:
async def heartbeat(websocket: WebSocket, interval: int = 30): while True: await asyncio.sleep(interval) await websocket.send_json({"type": "ping"})
- 使用
-
消息压缩
import zlib async def send_compressed(websocket: WebSocket, data: str): compressed = zlib.compress(data.encode()) await websocket.send_bytes(compressed) -
负载均衡
- 使用 Redis Pub/Sub 扩展多实例:
# 伪代码示例 redis.publish(f"room:{room_id}", message)
使用场景示例
graph LR
A[客户端A] -->|加入| ROOM1
B[客户端B] -->|加入| ROOM1
C[客户端C] -->|加入| ROOM2
SERVER -->|广播| ROOM1
SERVER -->|定向推送| ROOM2
-
聊天室
- 每个房间独立聊天组
- 支持私聊(创建临时房间)
-
实时协作
- 文档编辑房间同步光标位置
- 设计工具实时预览更新
-
游戏大厅
- 匹配房间管理
- 实时状态推送
注意事项
-
连接限制
使用websocket.max_message_size控制消息大小:@app.websocket("/ws", max_message_size=10 * 1024 * 1024) # 10MB -
认证集成
@app.websocket("/ws") async def auth_endpoint(websocket: WebSocket, token: str = Query(...)): if not validate_token(token): await websocket.close(code=1008) # ...后续逻辑 -
监控指标
使用 Prometheus 监控:- 房间数量
- 活跃连接数
- 消息吞吐量
通过此方案可构建高并发实时系统,支持数千并发连接(需配合异步消息队列优化广播性能)。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)