Open WebUI实时更新:WebSocket推送

【免费下载链接】open-webui Open WebUI 是一个可扩展、功能丰富且用户友好的自托管 WebUI,设计用于完全离线操作,支持各种大型语言模型(LLM)运行器,包括Ollama和兼容OpenAI的API。 【免费下载链接】open-webui 项目地址: https://gitcode.com/GitHub_Trending/op/open-webui

1. WebSocket在Open WebUI中的应用价值

Open WebUI作为自托管的大型语言模型(LLM)Web界面,其核心用户体验依赖于实时交互能力。传统HTTP请求-响应模式无法满足聊天消息实时推送、模型状态动态更新等需求,而WebSocket(套接字)技术通过在客户端与服务器之间建立持久连接,实现全双工通信,为Open WebUI提供了低延迟、高并发的实时数据传输能力。

WebSocket推送功能主要应用于以下场景:

  • 实时聊天消息同步(单聊/群聊)
  • 模型加载状态更新
  • 用户在线状态显示
  • 打字状态实时反馈
  • 任务进度推送(如文档处理、模型训练)

2. 技术架构与实现原理

2.1 整体架构

Open WebUI的WebSocket服务基于Python的socketio库实现,采用ASGI(异步服务器网关接口)架构,支持Redis分布式部署。核心模块位于backend/open_webui/socket/目录,包含连接管理、事件处理和消息推送三大功能。

WebSocket架构图

注:实际项目中若无架构图,可通过以下mermaid图表理解组件关系

mermaid

2.2 核心配置

WebSocket服务配置位于backend/open_webui/socket/main.py,支持两种部署模式:

# Redis分布式模式
if WEBSOCKET_MANAGER == "redis":
    mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL)
    sio = socketio.AsyncServer(
        cors_allowed_origins=[],
        async_mode="asgi",
        transports=(["websocket"] if ENABLE_WEBSOCKET_SUPPORT else ["polling"]),
        allow_upgrades=ENABLE_WEBSOCKET_SUPPORT,
        always_connect=True,
        client_manager=mgr,
    )
else:
    # 单节点模式
    sio = socketio.AsyncServer(
        cors_allowed_origins=[],
        async_mode="asgi",
        transports=(["websocket"] if ENABLE_WEBSOCKET_SUPPORT else ["polling"]),
        allow_upgrades=ENABLE_WEBSOCKET_SUPPORT,
        always_connect=True,
    )

关键环境变量:

  • WEBSOCKET_MANAGER: 连接管理器类型(redismemory
  • WEBSOCKET_REDIS_URL: Redis连接地址(分布式部署必填)
  • ENABLE_WEBSOCKET_SUPPORT: 是否启用WebSocket传输(默认true

3. 连接管理机制

3.1 会话池设计

系统通过三个核心数据结构维护WebSocket连接状态:

# 会话池 - 存储当前连接的会话信息
SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL)
# 用户池 - 映射用户ID到会话ID列表
USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL)
# 使用池 - 跟踪模型使用状态
USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL)

数据结构实现位于backend/open_webui/socket/utils.py,采用Redis哈希表存储,支持分布式环境下的共享访问:

class RedisDict:
    def __init__(self, name, redis_url):
        self.name = name
        self.redis = redis.Redis.from_url(redis_url, decode_responses=True)
    
    def __setitem__(self, key, value):
        serialized_value = json.dumps(value)
        self.redis.hset(self.name, key, serialized_value)
    
    def __getitem__(self, key):
        value = self.redis.hget(self.name, key)
        if value is None:
            raise KeyError(key)
        return json.loads(value)
    # ... 其他字典方法实现

3.2 连接生命周期

连接建立流程

mermaid

核心连接处理代码:

@sio.event
async def connect(sid, environ, auth):
    user = None
    if auth and "token" in auth:
        data = decode_token(auth["token"])
        if data is not None and "id" in data:
            user = Users.get_user_by_id(data["id"])
        
        if user:
            SESSION_POOL[sid] = user.model_dump()
            if user.id in USER_POOL:
                USER_POOL[user.id] = USER_POOL[user.id] + [sid]
            else:
                USER_POOL[user.id] = [sid]
            
            await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())})
            await sio.emit("usage", {"models": get_models_in_use()})

连接断开流程

@sio.event
async def disconnect(sid):
    if sid in SESSION_POOL:
        user = SESSION_POOL[sid]
        del SESSION_POOL[sid]
        
        user_id = user["id"]
        USER_POOL[user_id] = [_sid for _sid in USER_POOL[user_id] if _sid != sid]
        
        if len(USER_POOL[user_id]) == 0:
            del USER_POOL[user_id]
        
        await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())})

4. 实时消息推送机制

4.1 频道消息系统

Open WebUI通过"房间"(Room)机制实现多用户实时通信,每个频道对应一个独立房间:

@sio.on("join-channels")
async def join_channel(sid, data):
    # 验证用户身份...
    channels = Channels.get_channels_by_user_id(user.id)
    for channel in channels:
        await sio.enter_room(sid, f"channel:{channel.id}")

用户发送消息时,服务器将消息广播到对应频道的所有在线用户:

@sio.on("channel-events")
async def channel_events(sid, data):
    room = f"channel:{data['channel_id']}"
    participants = sio.manager.get_participants(namespace="/", room=room)
    
    sids = [sid for sid, _ in participants]
    if sid not in sids:
        return
    
    event_data = data["data"]
    event_type = event_data["type"]
    
    if event_type == "typing":
        await sio.emit(
            "channel-events",
            {
                "channel_id": data["channel_id"],
                "message_id": data.get("message_id", None),
                "data": event_data,
                "user": UserNameResponse(**SESSION_POOL[sid]).model_dump(),
            },
            room=room,
        )

4.2 模型使用状态推送

系统会周期性清理超时连接并推送当前活跃模型使用状态:

async def periodic_usage_pool_cleanup():
    if not aquire_func():
        log.debug("Usage pool cleanup lock already exists. Not running it.")
        return
    log.debug("Running periodic_usage_pool_cleanup")
    try:
        while True:
            if not renew_func():
                log.error(f"Unable to renew cleanup lock. Exiting usage pool cleanup.")
                raise Exception("Unable to renew usage pool cleanup lock.")
            
            now = int(time.time())
            send_usage = False
            for model_id, connections in list(USAGE_POOL.items()):
                # 清理超时连接
                expired_sids = [
                    sid
                    for sid, details in connections.items()
                    if now - details["updated_at"] > TIMEOUT_DURATION
                ]
                
                for sid in expired_sids:
                    del connections[sid]
                
                if not connections:
                    log.debug(f"Cleaning up model {model_id} from usage pool")
                    del USAGE_POOL[model_id]
                else:
                    USAGE_POOL[model_id] = connections
                
                send_usage = True
            
            if send_usage:
                # 推送更新后的使用状态
                await sio.emit("usage", {"models": get_models_in_use()})
            
            await asyncio.sleep(TIMEOUT_DURATION)
    finally:
        release_func()

5. 实战应用示例

5.1 前端连接示例

虽然前端代码未在项目文件列表中完全展示,但通常客户端会使用socket.io-client建立连接:

import io from 'socket.io-client';

// 建立WebSocket连接
const socket = io('/ws', {
  auth: {
    token: localStorage.getItem('token')
  },
  transports: ['websocket']
});

// 监听连接成功事件
socket.on('connect', () => {
  console.log('WebSocket连接成功');
  socket.emit('user-join', { auth: { token: localStorage.getItem('token') } });
});

// 监听用户列表更新
socket.on('user-list', (data) => {
  console.log('当前在线用户:', data.user_ids);
  // 更新UI显示在线用户
});

// 发送打字状态
function sendTypingStatus(channelId) {
  socket.emit('channel-events', {
    channel_id: channelId,
    data: {
      type: 'typing',
      status: true
    }
  });
}

5.2 后端消息推送API

Open WebUI提供了便捷的消息推送接口,可在其他模块中调用:

# 获取事件推送器
def get_event_emitter(request_info):
    async def __event_emitter__(event_data):
        user_id = request_info["user_id"]
        session_ids = list(
            set(USER_POOL.get(user_id, []) + [request_info["session_id"]])
        )
        
        for session_id in session_ids:
            await sio.emit(
                "chat-events",
                {
                    "chat_id": request_info["chat_id"],
                    "message_id": request_info["message_id"],
                    "data": event_data,
                },
                to=session_id,
            )
    return __event_emitter__

# 使用示例
emitter = get_event_emitter({
    "user_id": "123",
    "session_id": "abc123",
    "chat_id": "chat_456",
    "message_id": "msg_789"
})
await emitter({
    "type": "message",
    "data": {
        "content": "模型生成的消息内容..."
    }
})

6. 配置与扩展

6.1 环境变量配置

WebSocket功能可通过环境变量进行配置,主要参数如下:

参数名 说明 默认值
WEBSOCKET_MANAGER 连接管理器类型,可选redismemory memory
WEBSOCKET_REDIS_URL Redis连接地址,分布式部署必填 redis://localhost:6379/0
ENABLE_WEBSOCKET_SUPPORT 是否启用WebSocket传输 true
TIMEOUT_DURATION 连接超时时间(秒) 3

6.2 水平扩展

当需要部署多个Open WebUI实例以支持高并发时,需使用Redis作为连接管理器:

  1. 确保Redis服务正常运行
  2. 配置环境变量:
    export WEBSOCKET_MANAGER=redis
    export WEBSOCKET_REDIS_URL=redis://your-redis-server:6379/0
    
  3. 启动多个Open WebUI实例,所有实例将通过Redis共享连接状态

7. 故障排除与最佳实践

7.1 常见问题解决

连接失败问题排查

  1. 检查JWT token是否有效:backend/open_webui/utils/auth.py
  2. 确认Redis服务状态(分布式模式)
  3. 验证跨域配置是否正确:backend/open_webui/main.py中的CORS设置

消息延迟问题

  • 检查服务器负载,确保CPU/内存资源充足
  • 优化Redis网络连接,减少延迟
  • 调整超时清理周期TIMEOUT_DURATION

7.2 性能优化建议

  1. 减少广播范围:精确指定消息接收房间,避免全服广播

    # 推荐:只向特定房间广播
    await sio.emit("message", data, room=f"channel:{channel_id}")
    
    # 避免:向所有用户广播
    # await sio.emit("message", data)
    
  2. 批量处理消息:对高频事件进行节流处理

  3. 合理设置Redis过期时间:避免连接信息长期占用内存

  4. 监控连接状态:通过USER_POOLSESSION_POOL监控在线用户数

8. 总结与展望

WebSocket技术为Open WebUI提供了高效的实时通信能力,通过socketio库与Redis分布式管理,实现了用户连接、消息推送和状态同步等核心功能。当前实现支持单节点和分布式部署,可满足不同规模的使用需求。

未来可能的优化方向:

  • 实现消息持久化,确保断线重连后消息不丢失
  • 添加WebSocket连接加密,增强安全性
  • 支持消息优先级,优化关键消息传输效率
  • 提供更详细的监控指标,便于问题排查

WebSocket模块作为Open WebUI的重要组成部分,其稳定性和性能直接影响用户体验,建议开发者在使用过程中持续关注相关改进和更新。

官方文档:docs/README.md 核心源码:backend/open_webui/socket/ 配置示例:docker-compose.yaml

【免费下载链接】open-webui Open WebUI 是一个可扩展、功能丰富且用户友好的自托管 WebUI,设计用于完全离线操作,支持各种大型语言模型(LLM)运行器,包括Ollama和兼容OpenAI的API。 【免费下载链接】open-webui 项目地址: https://gitcode.com/GitHub_Trending/op/open-webui

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐