FastAPI WebSocket 进阶:实时消息推送与房间管理

核心概念
  1. 实时消息推送
    通过 WebSocket 双向通道实现服务器主动推送数据,无需客户端轮询。

  2. 房间管理
    将客户端分组到逻辑"房间",实现:

    • 定向广播(向特定房间发送消息)
    • 隔离通信(不同房间消息互不干扰)
    • 动态成员管理

实现方案
1. 房间管理结构
from collections import defaultdict
import asyncio

# 房间管理核心数据结构
class RoomManager:
    def __init__(self):
        self.rooms: dict[str, set[WebSocket]] = defaultdict(set)
        self.lock = asyncio.Lock()
    
    async def join(self, room_id: str, websocket: WebSocket):
        async with self.lock:
            self.rooms[room_id].add(websocket)
    
    async def exit(self, room_id: str, websocket: WebSocket):
        async with self.lock:
            self.rooms[room_id].discard(websocket)
            if not self.rooms[room_id]:
                del self.rooms[room_id]
    
    async def broadcast(self, room_id: str, message: str):
        async with self.lock:
            for ws in self.rooms[room_id].copy():  # 避免迭代时修改
                await ws.send_text(message)

2. WebSocket 路由实现
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()
manager = RoomManager()

@app.websocket("/ws/{room_id}/{user_id}")
async def websocket_endpoint(
    websocket: WebSocket, 
    room_id: str, 
    user_id: str
):
    await websocket.accept()
    
    try:
        # 加入房间
        await manager.join(room_id, websocket)
        
        # 通知房间有新成员
        await manager.broadcast(
            room_id, 
            f"User {user_id} joined room {room_id}"
        )
        
        # 消息处理循环
        while True:
            data = await websocket.receive_text()
            # 广播消息格式: [用户ID] 消息内容
            await manager.broadcast(
                room_id, 
                f"[{user_id}] {data}"
            )
            
    except WebSocketDisconnect:
        # 退出处理
        await manager.exit(room_id, websocket)
        await manager.broadcast(
            room_id, 
            f"User {user_id} left room {room_id}"
        )


进阶功能实现
1. 跨房间消息路由
async def route_message(source_room: str, target_room: str, message: str):
    if target_room in manager.rooms:
        await manager.broadcast(target_room, f"[From {source_room}] {message}")

2. 房间状态查询
@app.get("/rooms/{room_id}/users")
async def list_users(room_id: str):
    return {
        "room": room_id,
        "user_count": len(manager.rooms.get(room_id, set())),
        "active_rooms": list(manager.rooms.keys())
    }

3. 异常处理增强
async def safe_broadcast(room_id: str, message: str):
    broken = []
    for ws in manager.rooms[room_id]:
        try:
            await ws.send_text(message)
        except:
            broken.append(ws)
    
    # 清理失效连接
    async with manager.lock:
        for ws in broken:
            manager.rooms[room_id].discard(ws)


性能优化建议
  1. 连接管理

    • 使用weakref避免内存泄漏
    • 设置心跳机制检测失效连接:
      async def heartbeat(websocket: WebSocket, interval: int = 30):
          while True:
              await asyncio.sleep(interval)
              await websocket.send_json({"type": "ping"})
      

  2. 消息压缩

    import zlib
    async def send_compressed(websocket: WebSocket, data: str):
        compressed = zlib.compress(data.encode())
        await websocket.send_bytes(compressed)
    

  3. 负载均衡

    • 使用 Redis Pub/Sub 扩展多实例:
    # 伪代码示例
    redis.publish(f"room:{room_id}", message)
    


使用场景示例
graph LR
A[客户端A] -->|加入| ROOM1
B[客户端B] -->|加入| ROOM1
C[客户端C] -->|加入| ROOM2
SERVER -->|广播| ROOM1
SERVER -->|定向推送| ROOM2

  1. 聊天室

    • 每个房间独立聊天组
    • 支持私聊(创建临时房间)
  2. 实时协作

    • 文档编辑房间同步光标位置
    • 设计工具实时预览更新
  3. 游戏大厅

    • 匹配房间管理
    • 实时状态推送

注意事项
  1. 连接限制
    使用websocket.max_message_size控制消息大小:

    @app.websocket("/ws", max_message_size=10 * 1024 * 1024)  # 10MB
    

  2. 认证集成

    @app.websocket("/ws")
    async def auth_endpoint(websocket: WebSocket, token: str = Query(...)):
        if not validate_token(token):
            await websocket.close(code=1008)
        # ...后续逻辑
    

  3. 监控指标
    使用 Prometheus 监控:

    • 房间数量
    • 活跃连接数
    • 消息吞吐量

通过此方案可构建高并发实时系统,支持数千并发连接(需配合异步消息队列优化广播性能)。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐