FastAPI WebSocket与实时通信应用开发

【免费下载链接】fastapi FastAPI framework, high performance, easy to learn, fast to code, ready for production 【免费下载链接】fastapi 项目地址: https://gitcode.com/gh_mirrors/fa/fastapi

本文深入探讨了FastAPI框架中WebSocket的实现与应用,涵盖了WebSocket连接的建立流程、消息处理机制、连接状态管理、心跳检测技术以及与HTTP API的混合应用开发。文章通过详细的代码示例和架构图展示了如何在FastAPI中构建高效的实时通信系统,包括连接握手、消息类型处理、异常处理、性能优化策略和安全认证机制。

WebSocket连接建立与消息处理

FastAPI基于Starlette提供了强大的WebSocket支持,使得构建实时通信应用变得异常简单。WebSocket协议允许在客户端和服务器之间建立持久连接,实现双向实时数据传输,非常适合聊天应用、实时通知、在线游戏等场景。

WebSocket连接建立流程

在FastAPI中建立WebSocket连接需要遵循特定的流程,主要包括连接握手、连接接受和消息循环处理三个阶段。

连接握手与接受

WebSocket连接的建立始于客户端发起握手请求,服务器需要正确响应才能建立连接:

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # 1. 接受WebSocket连接请求
    await websocket.accept()
    
    # 2. 进入消息处理循环
    while True:
        # 3. 接收消息
        data = await websocket.receive_text()
        
        # 4. 处理并发送响应
        await websocket.send_text(f"Message text was: {data}")

上述代码展示了最基本的WebSocket端点实现。websocket.accept()方法是建立连接的关键步骤,必须在处理任何消息之前调用。

连接建立流程图

mermaid

消息接收与处理

FastAPI的WebSocket支持多种消息类型,开发者可以根据需要选择合适的数据格式:

消息类型支持
消息类型 方法 描述 适用场景
文本消息 receive_text() 接收UTF-8编码的文本数据 聊天消息、JSON数据
二进制消息 receive_bytes() 接收二进制数据 文件传输、图片
JSON消息 receive_json() 接收并解析JSON数据 结构化数据交换
消息处理示例
@app.websocket("/ws/advanced")
async def advanced_websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # 等待接收消息
            message = await websocket.receive()
            
            if "text" in message:
                # 处理文本消息
                text_data = message["text"]
                await websocket.send_text(f"文本消息: {text_data}")
                
            elif "bytes" in message:
                # 处理二进制消息
                binary_data = message["bytes"]
                await websocket.send_bytes(binary_data)
                
            elif "json" in message:
                # 处理JSON消息
                json_data = message["json"]
                response = {"received": json_data, "status": "processed"}
                await websocket.send_json(response)
                
    except WebSocketDisconnect:
        print("客户端断开连接")

连接状态管理

WebSocket连接有不同的状态,了解这些状态对于正确处理连接生命周期至关重要:

WebSocket状态常量
from starlette.websockets import WebSocketState

# 连接状态检查示例
@app.websocket("/ws/status")
async def status_aware_websocket(websocket: WebSocket):
    await websocket.accept()
    
    # 检查连接状态
    if websocket.client_state == WebSocketState.CONNECTING:
        print("连接正在建立")
    elif websocket.client_state == WebSocketState.CONNECTED:
        print("连接已建立")
    elif websocket.client_state == WebSocketState.DISCONNECTED:
        print("连接已断开")
连接状态转换图

mermaid

错误处理与连接关闭

健壮的WebSocket应用需要妥善处理异常情况和连接关闭:

异常处理模式
@app.websocket("/ws/robust")
async def robust_websocket_endpoint(websocket: WebSocket):
    try:
        await websocket.accept()
        
        while True:
            try:
                data = await websocket.receive_text()
                
                # 业务逻辑处理
                if data == "close":
                    # 正常关闭连接
                    await websocket.close(code=1000, reason="正常关闭")
                    break
                    
                await websocket.send_text(f"处理: {data}")
                
            except ValueError as e:
                # 处理数据格式错误
                await websocket.send_text(f"错误: {str(e)}")
                
            except RuntimeError as e:
                # 处理运行时错误
                await websocket.close(code=1011, reason=f"服务器错误: {str(e)}")
                break
                
    except WebSocketDisconnect as e:
        print(f"客户端断开,代码: {e.code}, 原因: {e.reason}")
        
    finally:
        # 清理资源
        print("连接清理完成")
WebSocket关闭代码说明
关闭代码 常量 描述 适用场景
1000 NORMAL_CLOSURE 正常关闭 正常终止连接
1001 GOING_AWAY 端点离开 服务器关闭或浏览器导航
1002 PROTOCOL_ERROR 协议错误 协议违反
1003 UNSUPPORTED_DATA 不支持的数据类型 接收到不支持的数据
1008 POLICY_VIOLATION 策略违反 身份验证失败等
1011 INTERNAL_ERROR 服务器内部错误 服务器处理异常

性能优化建议

对于生产环境的WebSocket应用,需要考虑以下性能优化措施:

  1. 连接池管理: 使用连接管理器维护活跃连接
  2. 消息批处理: 对高频消息进行批量处理减少IO操作
  3. 心跳机制: 实现心跳包检测保持连接活跃
  4. 负载均衡: 在多个服务器实例间分配连接负载
# 简单的连接管理器示例
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"用户消息: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)

通过上述模式,开发者可以构建出稳定、高效的WebSocket实时通信应用,充分利用FastAPI提供的WebSocket支持能力。

实时数据推送与双向通信机制

在FastAPI的WebSocket实现中,实时数据推送与双向通信机制是其最强大的特性之一。通过WebSocket协议,服务器能够主动向客户端推送数据,同时客户端也可以实时向服务器发送消息,实现了真正的双向实时通信。

WebSocket连接的生命周期管理

WebSocket连接的生命周期包括连接建立、消息传输和连接关闭三个阶段。FastAPI通过@app.websocket()装饰器提供了简洁的API来处理这些生命周期事件。

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # 1. 连接建立阶段
    await websocket.accept()
    
    try:
        # 2. 消息传输阶段
        while True:
            # 接收客户端消息
            data = await websocket.receive_text()
            
            # 处理消息并发送响应
            await websocket.send_text(f"Echo: {data}")
            
    except Exception as e:
        # 3. 连接关闭阶段
        print(f"Connection closed: {e}")

消息接收与发送机制

FastAPI提供了多种消息接收和发送方法,支持文本、二进制和JSON格式的数据传输:

方法 描述 返回值类型
websocket.receive_text() 接收文本消息 str
websocket.receive_bytes() 接收二进制消息 bytes
websocket.receive_json() 接收JSON消息 dict/list
websocket.send_text() 发送文本消息 None
websocket.send_bytes() 发送二进制消息 None
websocket.send_json() 发送JSON消息 None
@app.websocket("/ws/data")
async def data_websocket(websocket: WebSocket):
    await websocket.accept()
    
    while True:
        # 接收JSON格式的数据
        data = await websocket.receive_json()
        
        # 处理数据
        processed_data = process_data(data)
        
        # 发送处理后的JSON响应
        await websocket.send_json({
            "status": "success",
            "processed_data": processed_data,
            "timestamp": datetime.now().isoformat()
        })

连接状态管理与异常处理

在实时通信中,连接状态的稳定性至关重要。FastAPI提供了完善的异常处理机制:

from fastapi import WebSocketDisconnect
import asyncio

@app.websocket("/ws/robust")
async def robust_websocket(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            try:
                data = await asyncio.wait_for(
                    websocket.receive_text(), 
                    timeout=30.0
                )
                await websocket.send_text(f"Processed: {data.upper()}")
                
            except asyncio.TimeoutError:
                # 发送心跳包保持连接
                await websocket.send_text("ping")
                continue
                
    except WebSocketDisconnect:
        print("Client disconnected normally")
    except Exception as e:
        print(f"Unexpected error: {e}")
        # 尝试发送错误信息
        try:
            await websocket.send_json({
                "error": "internal_error",
                "message": str(e)
            })
        except:
            pass

多客户端连接管理

对于需要处理多个客户端连接的场景,可以使用连接管理器来维护活跃连接:

from typing import List, Dict
import json

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.connection_data: Dict[str, dict] = {}

    async def connect(self, websocket: WebSocket, client_id: str):
        await websocket.accept()
        self.active_connections[client_id] = websocket
        self.connection_data[client_id] = {
            "connected_at": datetime.now(),
            "message_count": 0
        }

    def disconnect(self, client_id: str):
        if client_id in self.active_connections:
            del self.active_connections[client_id]
        if client_id in self.connection_data:
            del self.connection_data[client_id]

    async def send_personal_message(self, message: str, client_id: str):
        if client_id in self.active_connections:
            await self.active_connections[client_id].send_text(message)
            self.connection_data[client_id]["message_count"] += 1

    async def broadcast(self, message: str):
        for connection in self.active_connections.values():
            await connection.send_text(message)
        for data in self.connection_data.values():
            data["message_count"] += 1

    def get_connection_stats(self):
        return {
            "total_connections": len(self.active_connections),
            "connections": self.connection_data
        }

manager = ConnectionManager()

实时数据推送模式

在实时应用中,常见的数据推送模式包括:

  1. 定时推送模式:定期向客户端发送数据更新
  2. 事件驱动模式:在特定事件发生时立即推送数据
  3. 请求-响应模式:客户端请求数据,服务器立即响应
  4. 广播模式:向所有连接的客户端发送相同数据
import asyncio
from datetime import datetime, timedelta

@app.websocket("/ws/realtime")
async def realtime_updates(websocket: WebSocket):
    await websocket.accept()
    
    # 定时推送示例:每秒发送一次时间更新
    async def send_time_updates():
        while True:
            try:
                current_time = datetime.now().isoformat()
                await websocket.send_json({
                    "type": "time_update",
                    "data": current_time
                })
                await asyncio.sleep(1)
            except:
                break
    
    # 启动定时任务
    update_task = asyncio.create_task(send_time_updates())
    
    try:
        # 处理客户端消息
        while True:
            data = await websocket.receive_json()
            
            if data.get("type") == "request_data":
                # 立即响应客户端请求
                await websocket.send_json({
                    "type": "data_response",
                    "data": fetch_requested_data(data["request"]),
                    "timestamp": datetime.now().isoformat()
                })
                
    except WebSocketDisconnect:
        update_task.cancel()
    except Exception as e:
        update_task.cancel()
        print(f"Error in realtime updates: {e}")

性能优化与最佳实践

为了确保实时通信的性能和可靠性,建议采用以下最佳实践:

  1. 连接心跳机制:定期发送心跳包保持连接活跃
  2. 消息压缩:对大型消息进行压缩处理
  3. 连接池管理:合理管理连接资源,避免内存泄漏
  4. 错误重试机制:实现自动重连和错误恢复
  5. 消息队列:使用消息队列处理高并发消息
from typing import Optional
import zlib
import json

class OptimizedConnectionManager:
    def __init__(self):
        self.connections = {}
        self.heartbeat_interval = 30  # 30秒心跳间隔

    async def send_compressed_json(self, websocket: WebSocket, data: dict):
        """发送压缩的JSON数据"""
        json_str = json.dumps(data)
        compressed_data = zlib.compress(json_str.encode())
        await websocket.send_bytes(compressed_data)

    async def receive_compressed_json(self, websocket: WebSocket) -> Optional[dict]:
        """接收压缩的JSON数据"""
        try:
            compressed_data = await websocket.receive_bytes()
            json_str = zlib.decompress(compressed_data).decode()
            return json.loads(json_str)
        except:
            return None

    async def maintain_connection(self, websocket: WebSocket, client_id: str):
        """维护连接,包括心跳和错误处理"""
        await websocket.accept()
        self.connections[client_id] = websocket
        
        try:
            while True:
                # 等待消息或超时
                try:
                    data = await asyncio.wait_for(
                        self.receive_compressed_json(websocket),
                        timeout=self.heartbeat_interval
                    )
                    if data and data.get("type") == "heartbeat":
                        continue  # 忽略心跳包
                    
                    # 处理业务消息
                    await self.handle_message(data, client_id)
                    
                except asyncio.TimeoutError:
                    # 发送心跳包
                    await self.send_compressed_json(websocket, {
                        "type": "heartbeat",
                        "timestamp": datetime.now().isoformat()
                    })
                    
        except WebSocketDisconnect:
            self.disconnect(client_id)
        except Exception as e:
            print(f"Connection error for {client_id}: {e}")
            self.disconnect(client_id)

    def disconnect(self, client_id: str):
        if client_id in self.connections:
            del self.connections[client_id]

安全考虑与认证机制

在实时通信中,安全性同样重要。FastAPI支持WebSocket连接的认证和授权:

from fastapi import WebSocketException, status
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_websocket_user(websocket: WebSocket):
    """验证WebSocket连接的用户身份"""
    token = websocket.query_params.get("token")
    if not token:
        raise WebSocketException(
            code=status.WS_1008_POLICY_VIOLATION,
            reason="Authentication required"
        )
    
    user = await verify_token(token)
    if not user:
        raise WebSocketException(
            code=status.WS_1008_POLICY_VIOLATION,
            reason="Invalid authentication credentials"
        )
    
    return user

@app.websocket("/ws/secure")
async def secure_websocket(websocket: WebSocket):
    # 身份验证
    user = await get_websocket_user(websocket)
    await websocket.accept()
    
    try:
        while True:
            data = await websocket.receive_json()
            
            # 基于用户权限处理消息
            if not has_permission(user, data.get("action")):
                await websocket.send_json({
                    "error": "permission_denied",
                    "message": "Insufficient permissions"
                })
                continue
                
            # 处理授权后的消息
            result = await process_authorized_message(user, data)
            await websocket.send_json(result)
            
    except WebSocketDisconnect:
        print(f"User {user.username} disconnected")

通过上述机制,FastAPI提供了强大而灵活的实时数据推送与双向通信能力,能够满足各种实时应用场景的需求,从简单的聊天应用到复杂的实时数据监控系统。

连接状态管理与心跳检测

在FastAPI WebSocket实时通信应用中,连接状态管理和心跳检测是确保系统稳定性和可靠性的关键技术。WebSocket连接虽然是持久化的,但在复杂的网络环境中,连接可能会因为各种原因意外断开,因此需要一套完善的机制来监控连接状态并及时处理异常情况。

连接状态管理

FastAPI基于Starlette框架构建,其WebSocket实现提供了完整的连接状态管理机制。每个WebSocket连接都有明确的状态生命周期,我们可以通过状态机来跟踪和管理连接。

WebSocket状态机

mermaid

FastAPI WebSocket连接包含以下核心状态:

状态 描述 触发条件
CONNECTING 连接建立中 WebSocket握手阶段
CONNECTED 连接已建立 握手成功,可进行数据传输
DISCONNECTED 连接已断开 主动关闭或异常断开
状态监控实现

在FastAPI中,我们可以通过WebSocketState来监控连接状态:

from fastapi import WebSocket, WebSocketState
from fastapi.websockets import WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[str, WebSocket] = {}
        self.connection_states: dict[str, WebSocketState] = {}

    async def connect(self, websocket: WebSocket, client_id: str):
        await websocket.accept()
        self.active_connections[client_id] = websocket
        self.connection_states[client_id] = websocket.client_state
        
    def get_connection_state(self, client_id: str) -> WebSocketState:
        return self.connection_states.get(client_id, WebSocketState.DISCONNECTED)

    async def disconnect(self, client_id: str):
        if client_id in self.active_connections:
            del self.active_connections[client_id]
            self.connection_states[client_id] = WebSocketState.DISCONNECTED

心跳检测机制

心跳检测是维持WebSocket连接活跃性的重要技术,通过定期发送小数据包来确认连接是否仍然有效。

心跳协议设计

mermaid

实现方案对比

下表展示了不同心跳检测方案的优缺点:

方案类型 实现复杂度 可靠性 资源消耗 适用场景
客户端发起 简单应用
服务端发起 大多数场景
双向心跳 最高 关键业务
完整的心跳检测实现
import asyncio
import time
from typing import Dict
from fastapi import WebSocket, WebSocketDisconnect

class HeartbeatManager:
    def __init__(self, timeout: int = 30, interval: int = 10):
        self.timeout = timeout  # 心跳超时时间(秒)
        self.interval = interval  # 心跳间隔(秒)
        self.last_heartbeats: Dict[str, float] = {}
        self.heartbeat_tasks: Dict[str, asyncio.Task] = {}

    async def start_heartbeat(self, websocket: WebSocket, client_id: str):
        """启动心跳检测任务"""
        self.last_heartbeats[client_id] = time.time()
        
        # 创建心跳发送任务
        heartbeat_task = asyncio.create_task(
            self._send_heartbeats(websocket, client_id)
        )
        self.heartbeat_tasks[client_id] = heartbeat_task

        # 创建心跳检测任务
        check_task = asyncio.create_task(
            self._check_heartbeats(websocket, client_id)
        )
        self.heartbeat_tasks[f"{client_id}_check"] = check_task

    async def _send_heartbeats(self, websocket: WebSocket, client_id: str):
        """定期发送心跳包"""
        try:
            while True:
                await asyncio.sleep(self.interval)
                if websocket.client_state == WebSocketState.CONNECTED:
                    await websocket.send_json({
                        "type": "heartbeat",
                        "timestamp": time.time()
                    })
        except WebSocketDisconnect:
            await self.cleanup(client_id)
        except Exception as e:
            print(f"Heartbeat send error for {client_id}: {e}")
            await self.cleanup(client_id)

    async def _check_heartbeats(self, websocket: WebSocket, client_id: str):
        """检查心跳响应"""
        try:
            while True:
                await asyncio.sleep(5)  # 每5秒检查一次
                last_beat = self.last_heartbeats.get(client_id)
                if last_beat and time.time() - last_beat > self.timeout:
                    print(f"Heartbeat timeout for {client_id}")
                    await self.cleanup(client_id)
                    break
        except Exception as e:
            print(f"Heartbeat check error for {client_id}: {e}")
            await self.cleanup(client_id)

    async def handle_heartbeat_response(self, client_id: str):
        """处理心跳响应"""
        self.last_heartbeats[client_id] = time.time()

    async def cleanup(self, client_id: str):
        """清理资源"""
        for task_key in [client_id, f"{client_id}_check"]:
            if task_key in self.heartbeat_tasks:
                self.heartbeat_tasks[task_key].cancel()
                del self.heartbeat_tasks[task_key]
        
        if client_id in self.last_heartbeats:
            del self.last_heartbeats[client_id]

集成到WebSocket处理器

将状态管理和心跳检测集成到实际的WebSocket处理器中:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse

app = FastAPI()
connection_manager = ConnectionManager()
heartbeat_manager = HeartbeatManager(timeout=30, interval=15)

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await connection_manager.connect(websocket, client_id)
    await heartbeat_manager.start_heartbeat(websocket, client_id)
    
    try:
        while True:
            data = await websocket.receive_json()
            
            # 处理心跳响应
            if data.get("type") == "heartbeat_ack":
                await heartbeat_manager.handle_heartbeat_response(client_id)
                continue
                
            # 处理业务消息
            if data.get("type") == "message":
                await connection_manager.broadcast_message(
                    f"Client {client_id} says: {data['content']}"
                )
                
    except WebSocketDisconnect:
        await connection_manager.disconnect(client_id)
        await heartbeat_manager.cleanup(client_id)
    except Exception as e:
        print(f"WebSocket error for {client_id}: {e}")
        await connection_manager.disconnect(client_id)
        await heartbeat_manager.cleanup(client_id)

客户端心跳处理

完整的解决方案还需要客户端的配合:

// 客户端JavaScript示例
let heartbeatInterval;
let lastHeartbeatTime = 0;
const HEARTBEAT_TIMEOUT = 35000; // 35秒超时

function connectWebSocket() {
    const ws = new WebSocket("ws://localhost:8000/ws/client123");
    
    ws.onopen = function() {
        console.log("WebSocket连接已建立");
        
        // 启动心跳检测
        heartbeatInterval = setInterval(() => {
            if (Date.now() - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
                console.log("心跳超时,重新连接...");
                ws.close();
                reconnect();
                return;
            }
            
            ws.send(JSON.stringify({
                type: "heartbeat",
                timestamp: Date.now()
            }));
        }, 15000); // 每15秒发送一次心跳
    };
    
    ws.onmessage = function(event) {
        const data = JSON.parse(event.data);
        
        if (data.type === "heartbeat") {
            // 响应服务器心跳
            ws.send(JSON.stringify({
                type: "heartbeat_ack",
                timestamp: data.timestamp
            }));
            lastHeartbeatTime = Date.now();
        }
    };
    
    ws.onclose = function() {
        clearInterval(heartbeatInterval);
        console.log("WebSocket连接已关闭");
    };
}

function reconnect() {
    setTimeout(connectWebSocket, 5000); // 5秒后重连
}

性能优化建议

在实际生产环境中,心跳检测需要考虑性能优化:

  1. 连接池管理:使用连接池来管理大量WebSocket连接
  2. 批量处理:对心跳检测进行批量处理,减少系统调用
  3. 自适应间隔:根据网络状况动态调整心跳间隔
  4. 优雅降级:在系统负载高时适当降低心跳频率
class AdaptiveHeartbeatManager(HeartbeatManager):
    def __init__(self, base_interval: int = 10, max_interval: int = 60):
        super().__init__()
        self.base_interval = base_interval
        self.max_interval = max_interval
        self.network_quality = 1.0  # 网络质量系数(0.0-1.0)
        
    def calculate_interval(self) -> int:
        """根据网络质量计算自适应间隔"""
        # 网络质量越好,间隔可以越长
        interval = self.base_interval + int(
            (self.max_interval - self.base_interval) * (1 - self.network_quality)
        )
        return max(self.base_interval, min(interval, self.max_interval))
    
    async def update_network_quality(self, success_rate: float):
        """更新网络质量评估"""
        self.network_quality = success_rate

通过完善的连接状态管理和智能心跳检测机制,可以显著提升FastAPI WebSocket应用的稳定性和可靠性,确保实时通信系统在各种网络环境下都能正常工作。

WebSocket与HTTP API的混合应用

在现代Web应用开发中,实时通信功能已成为不可或缺的需求。FastAPI作为高性能的Python Web框架,不仅提供了强大的HTTP API支持,还内置了完整的WebSocket功能。本文将深入探讨如何在FastAPI应用中同时使用WebSocket和HTTP API,构建功能丰富的混合应用。

混合应用架构设计

在FastAPI中,WebSocket和HTTP API可以完美共存于同一个应用中。这种混合架构允许我们:

  • 使用HTTP API处理传统的请求-响应交互
  • 利用WebSocket实现实时双向通信
  • 共享相同的依赖注入系统
  • 统一的路由管理和中间件处理

mermaid

基础混合应用示例

下面是一个简单的聊天应用示例,同时包含HTTP API和WebSocket功能:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import List, Dict
import uuid

app = FastAPI()

# 数据模型
class ChatMessage(BaseModel):
    message: str
    user_id: str

class UserConnection:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.user_rooms: Dict[str, List[str]] = {}

# 全局连接管理器
connection_manager = UserConnection()

# HTTP API端点
@app.get("/")
async def get():
    return HTMLResponse("""
    <html>
        <body>
            <h1>聊天应用</h1>
            <form action="/send-message" method="post">
                <input type="text" name="message" placeholder="输入消息">
                <input type="text" name="user_id" placeholder="用户ID">
                <button type="submit">发送</button>
            </form>
            <div id="messages"></div>
            <script>
                const ws = new WebSocket(`ws://localhost:8000/ws`);
                ws.onmessage = function(event) {
                    const messages = document.getElementById('messages');
                    const message = document.createElement('div');
                    message.textContent = event.data;
                    messages.appendChild(message);
                };
            </script>
        </body>
    </html>
    """)

@app.post("/send-message")
async def send_message(message: ChatMessage):
    """通过HTTP API发送消息到WebSocket连接"""
    if message.user_id not in connection_manager.active_connections:
        raise HTTPException(status_code=404, detail="用户未连接")
    
    # 这里可以添加消息处理逻辑
    await connection_manager.active_connections[message.user_id].send_text(
        f"API消息: {message.message}"
    )
    return {"status": "消息已发送"}

# WebSocket端点
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await websocket.accept()
    connection_manager.active_connections[user_id] = websocket
    
    try:
        while True:
            data = await websocket.receive_text()
            # 广播消息给所有连接的用户
            for connection in connection_manager.active_connections.values():
                await connection.send_text(f"用户{user_id}: {data}")
    except WebSocketDisconnect:
        del connection_manager.active_connections[user_id]
        # 通知其他用户该用户已断开连接
        for connection in connection_manager.active_connections.values():
            await connection.send_text(f"用户{user_id}已离开聊天")

高级混合模式:实时通知系统

下面是一个更复杂的示例,展示如何构建一个实时通知系统,其中HTTP API用于创建通知,WebSocket用于实时推送:

from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import asyncio
import json
from datetime import datetime

app = FastAPI()

# 数据模型
class Notification(BaseModel):
    id: str
    title: str
    message: str
    user_id: str
    created_at: datetime
    read: bool = False

class NotificationCreate(BaseModel):
    title: str
    message: str
    user_id: str

# 模拟数据库
notifications_db = {}
connected_clients = {}

# 依赖注入
async def get_user_notifications(user_id: str) -> List[Notification]:
    return [n for n in notifications_db.values() if n.user_id == user_id]

# HTTP API端点
@app.post("/notifications", response_model=Notification)
async def create_notification(notification: NotificationCreate):
    """创建新通知"""
    notification_id = str(len(notifications_db) + 1)
    new_notification = Notification(
        id=notification_id,
        **notification.dict(),
        created_at=datetime.now(),
        read=False
    )
    notifications_db[notification_id] = new_notification
    
    # 实时推送给相应用户
    if notification.user_id in connected_clients:
        await connected_clients[notification.user_id].send_text(
            json.dumps({
                "type": "new_notification",
                "data": new_notification.dict()
            })
        )
    
    return new_notification

@app.get("/notifications/{user_id}", response_model=List[Notification])
async def get_notifications(user_id: str):
    """获取用户通知"""
    return await get_user_notifications(user_id)

# WebSocket端点
@app.websocket("/ws/notifications/{user_id}")
async def websocket_notifications(websocket: WebSocket, user_id: str):
    await websocket.accept()
    connected_clients[user_id] = websocket
    
    try:
        # 发送现有通知
        user_notifications = await get_user_notifications(user_id)
        await websocket.send_text(json.dumps({
            "type": "initial_data",
            "data": [n.dict() for n in user_notifications]
        }))
        
        while True:
            # 保持连接活跃,处理可能的客户端消息
            data = await websocket.receive_text()
            message = json.loads(data)
            
            if message.get("type") == "mark_as_read":
                # 处理标记为已读
                notification_id = message.get("notification_id")
                if notification_id in notifications_db:
                    notifications_db[notification_id].read = True
                    
    except Exception as e:
        print(f"WebSocket错误: {e}")
    finally:
        if user_id in connected_clients:
            del connected_clients[user_id]

性能优化与最佳实践

在构建混合应用时,需要考虑以下性能优化策略:

连接管理优化
from fastapi import FastAPI, WebSocket
import asyncio
from collections import defaultdict

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections = defaultdict(dict)
        self.connection_lock = asyncio.Lock()
    
    async def connect(self, websocket: WebSocket, user_id: str, room: str = "default"):
        async with self.connection_lock:
            self.active_connections[room][user_id] = websocket
    
    async def disconnect(self, user_id: str, room: str = "default"):
        async with self.connection_lock:
            if room in self.active_connections and user_id in self.active_connections[room]:
                del self.active_connections[room][user_id]
    
    async def broadcast(self, message: str, room: str = "default"):
        async with self.connection_lock:
            disconnected = []
            for user_id, websocket in self.active_connections[room].items():
                try:
                    await websocket.send_text(message)
                except Exception:
                    disconnected.append(user_id)
            
            for user_id in disconnected:
                del self.active_connections[room][user_id]

manager = ConnectionManager()
消息序列化优化
import orjson
from fastapi.encoders import jsonable_encoder

async def send_json_message(websocket: WebSocket, data: dict):
    """优化JSON消息发送"""
    try:
        # 使用orjson获得更好的性能
        message = orjson.dumps(
            data,
            option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_UTC_Z
        )
        await websocket.send_bytes(message)
    except Exception as e:
        print(f"消息发送失败: {e}")

安全考虑

在混合应用中,安全性至关重要:

from fastapi import WebSocket, WebSocketDisconnect, status
from fastapi.security import OAuth2PasswordBearer
import jwt
from jwt import PyJWTError

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user_websocket(websocket: WebSocket, token: str):
    """WebSocket连接认证"""
    try:
        payload = jwt.decode(token, "SECRET_KEY", algorithms=["HS256"])
        return payload.get("sub")
    except PyJWTError:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        raise WebSocketDisconnect()

@app.websocket("/ws/secure/{token}")
async def secure_websocket_endpoint(websocket: WebSocket, token: str):
    await websocket.accept()
    try:
        user_id = await get_current_user_websocket(websocket, token)
        # 安全连接建立成功
        await websocket.send_text(f"欢迎用户 {user_id}")
        
        while True:
            data = await websocket.receive_text()
            # 处理安全消息
            await websocket.send_text(f"收到安全消息: {data}")
            
    except WebSocketDisconnect:
        print("安全连接已关闭")

监控和日志记录

为了确保混合应用的稳定性,需要完善的监控系统:

总结

FastAPI提供了强大而灵活的WebSocket支持,使其成为构建实时通信应用的理想选择。通过本文的探讨,我们了解了WebSocket连接的生命周期管理、消息处理机制、心跳检测的重要性以及与HTTP API的混合应用模式。这些技术结合FastAPI的高性能和易用性,能够帮助开发者构建出稳定、高效、安全的实时应用系统,满足从简单聊天应用到复杂实时数据推送的各种场景需求。

【免费下载链接】fastapi FastAPI framework, high performance, easy to learn, fast to code, ready for production 【免费下载链接】fastapi 项目地址: https://gitcode.com/gh_mirrors/fa/fastapi

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐