不使用sockte.io的方法通过封装uni.connectSocket和适配器的方法,在小程序封装websocket

首先下载stompjs

npm i stompjs -S

封装包括创建websocket连接 ,订阅消息,发送心跳机制,关闭websocket,以及添加消息监听器

在utils.js中封装websocket 因为涉及多人场景操作,所以添加了消息监听器

// utils/socket.js
import Stomp from 'stompjs'
import config from '@/config/env.js'
import store from '@/store'

const WS_URL = config.websocketURL
const HEARTBEAT_INTERVAL = 20000
const RECONNECT_INTERVAL = 5000

let stompClient = null
let reconnectTimer = null
let heartBeatTimer = null
let isConnected = false
let messageListeners = [] // 存储多个消息监听器

/**
 * 创建 uni-app WebSocket 包装器
 * @param {String} url WebSocket URL
 * @returns {Object} WebSocket 包装对象
 */
function createUniSocket(url) {
  const socketTask = uni.connectSocket({ url })
  const ws = {
    onopen: null,
    onmessage: null,
    onclose: null,
    onerror: null,
    send(data) {
      if (socketTask && typeof socketTask.send === 'function') {
        socketTask.send({ data })
      } else {
        uni.sendSocketMessage({ data })
      }
    },
    close() {
      if (socketTask && typeof socketTask.close === 'function') {
        socketTask.close()
      } else {
        uni.closeSocket()
      }
    }
  }

  const hasPerTask = socketTask && typeof socketTask.onOpen === 'function'

  if (hasPerTask) {
    socketTask.onOpen((event) => ws.onopen && ws.onopen(event))
    socketTask.onMessage((e) => ws.onmessage && ws.onmessage({ data: e.data }))
    socketTask.onClose((event) => ws.onclose && ws.onclose(event))
    socketTask.onError((event) => ws.onerror && ws.onerror(event))
  } else {
    uni.onSocketOpen((event) => ws.onopen && ws.onopen(event))
    uni.onSocketMessage((e) => ws.onmessage && ws.onmessage({ data: e.data }))
    uni.onSocketClose((event) => ws.onclose && ws.onclose(event))
    uni.onSocketError((event) => ws.onerror && ws.onerror(event))
  }

  return ws
}

/**
 * 连接 STOMP
 * @param {String} userId 当前用户ID(用于订阅私有频道)
 * @param {Function} onMessage 回调 (type, data)
 * @param {Function} onConnected 连接成功回调
 * @returns {Promise} 连接 Promise
 */
export function connectStomp(userId, onMessage, onConnected) {
  // 如果已连接,添加新的消息监听器
  if (isConnected) {
    console.log('已连接 STOMP,添加消息监听器', onMessage)
    if (onMessage && typeof onMessage === 'function') {
      // 检查是否已存在相同的监听器,避免重复添加
      if (!messageListeners.includes(onMessage)) {
        messageListeners.push(onMessage)
        console.log('消息监听器已添加,当前监听器数量:', messageListeners.length)
      } else {
        console.log('消息监听器已存在,跳过添加')
      }
    }
    onConnected && onConnected()
    return Promise.resolve()
  }

  // 清空之前的监听器列表
  messageListeners = []
  if (onMessage && typeof onMessage === 'function') {
    messageListeners.push(onMessage)
  }

  console.log('尝试连接 STOMP...')
  const ws = createUniSocket(WS_URL)
  stompClient = Stomp.over(ws)
  stompClient.debug = () => {}

  // 获取设备ID用于连接headers
  const deviceId = store.state.match?.device_id
  const connectHeaders = {}
  if (deviceId !== null && deviceId !== undefined) {
    connectHeaders.deviceId = String(deviceId)
    console.log('连接 headers 设置 deviceId:', connectHeaders.deviceId, typeof connectHeaders.deviceId)
  }

  return new Promise((resolve) => {
    stompClient.connect(connectHeaders, (frame) => {
      console.log('STOMP 已连接:', frame.headers)
      isConnected = true

      if (reconnectTimer) {
        clearTimeout(reconnectTimer)
        reconnectTimer = null
      }

      // 自动订阅各种消息
      const subscriptions = [
        { path: '/user/queue/subscription', type: 'subscription' }, // 订阅确认
        { path: `/topic/device/${String(deviceId)}`, type: 'broadcast' }, // 广播消息
        { path: '/user/queue/private', type: 'private' }, // 单播消息
        { path: '/user/queue/reply', type: 'reply' } // 消息回复
      ]

      subscriptions.forEach(({ path, type }) => {
        stompClient.subscribe(path, (msg) => {
          try {
            const data = JSON.parse(msg.body)
            console.log(`${type} 消息:`, data, '当前监听器数量:', messageListeners.length)
            // 调用所有注册的监听器
            messageListeners.forEach((listener, index) => {
              try {
                console.log(`调用监听器 ${index}:`, listener)
                listener(type, data)
              } catch (err) {
                console.error(`监听器 ${index} 执行失败:`, err)
              }
            })
          } catch (err) {
            console.error('消息解析失败:', err)
          }
        })
      })

      // 发送订阅请求给服务端
      const subscribeDeviceId = store.state.match?.device_id
      if (subscribeDeviceId !== null && subscribeDeviceId !== undefined) {
        const subscribeHeaders = {
          deviceId: String(subscribeDeviceId)
        }
        sendStompMessage('/app/device.subscribe', {}, subscribeHeaders)
        console.log('订阅请求已发送:', subscribeHeaders)
      } else {
        console.warn('未获取到设备ID,无法发送订阅请求')
      }

      startHeartbeat()

      onConnected && onConnected()
      resolve()
    }, (err) => {
      console.error('STOMP 连接失败:', err)
      isConnected = false
      tryReconnect(userId, onMessage)
    })
  })
}

/**
 * 发送消息
 * @param {String} destination 目标地址
 * @param {Object} body 消息体
 * @param {Object} headers 可选的消息头
 */
export function sendStompMessage(destination, body, headers = {}) {
  if (!stompClient || !isConnected) {
    console.warn('STOMP 未连接,发送失败')
    return
  }
  stompClient.send(destination, headers, JSON.stringify(body))
}

/**
 * 启动心跳机制
 */
function startHeartbeat() {
  stopHeartbeat()

  const sendPing = () => {
    if (!stompClient || !isConnected) {
      console.warn('STOMP 未连接,跳过发送心跳')
      return
    }

    const deviceId = store.state.match?.device_id
    if (!deviceId) {
      console.warn('未获取到设备ID,跳过发送心跳')
      return 
    }

  const now = new Date();
const localTime = new Date(now.getTime() - now.getTimezoneOffset() * 60000);
const timestamp = localTime.toISOString().replace('T', ' ').substring(0, 19);
    const heartbeatMsg = {
      type: 'PING',
      timestamp: timestamp
    }

    try {
      const headers = {
        deviceId: String(deviceId)
      }
      sendStompMessage('/app/device.message', heartbeatMsg, headers)
      console.log('心跳包已发送:', { ...heartbeatMsg, deviceId })
    } catch (error) {
      console.error('发送心跳失败:', error)
    }
  }
   
 

  // 立即发送一次心跳
  sendPing()

  // 设置定时器
  heartBeatTimer = setInterval(sendPing, HEARTBEAT_INTERVAL)
}

/**
 * 停止心跳
 */
function stopHeartbeat() {
  if (heartBeatTimer) {
    clearInterval(heartBeatTimer)
    heartBeatTimer = null
  }
}

/**
 * 尝试重连
 * @param {String} userId 用户ID
 * @param {Function} onMessage 消息回调
 */
function tryReconnect(userId, onMessage) {
  if (reconnectTimer) return

  reconnectTimer = setTimeout(() => {
    console.log('尝试重连中...')
    // 重连时保留所有监听器
    const listeners = [...messageListeners]
    messageListeners = []
    if (onMessage && typeof onMessage === 'function') {
      messageListeners.push(onMessage)
    }
    // 重新添加之前的监听器
    listeners.forEach(listener => {
      if (listener !== onMessage && !messageListeners.includes(listener)) {
        messageListeners.push(listener)
      }
    })
    connectStomp(userId, onMessage)
  }, RECONNECT_INTERVAL)
}

/**
 * 移除消息监听器
 * @param {Function} listener 要移除的监听器函数
 */
export function removeMessageListener(listener) {
  const index = messageListeners.indexOf(listener)
  if (index > -1) {
    messageListeners.splice(index, 1)
    console.log('已移除消息监听器')
  }
}

/**
 * 主动关闭连接
 */
export function closeStomp() {
  if (stompClient && isConnected) {
    stompClient.disconnect(() => {
      console.log('STOMP 已断开')
    })
  }
  stopHeartbeat()
  stompClient = null
  isConnected = false
  messageListeners = [] // 清空所有监听器
}

/**
 * 获取底层 stompClient
 * @returns {Object} STOMP 客户端实例
 */
export function getStompClient() {
  return stompClient
}

在页面中使用

import { connectStomp, closeStomp } from '@/utils/socket.js'

  /**
     * 初始化WebSocket连接
     * @param {Object} qrData 二维码数据
     */
    async initWebSocket(qrData) {
      // 清理之前的连接和订阅
      this.cleanup()

      // 保存设备ID
      this.deviceId = String(qrData.device_id)
      const userId = this.$store.state.user?.userInfo?.id || ''

      try {
        // 连接STOMP
        await connectStomp(userId, this.handleIncomingMessage)

        return true
      } catch (error) {
        console.error('WebSocket初始化失败:', error)
        this.cleanup()
        throw error
      }
    },
    /**
     * 处理接收到的消息
     * @param {String} type 消息类型
     * @param {Object} msg 消息对象
     */
    handleMessage(type, msg) {
      try {
        const data = JSON.parse(msg.body)
        this.messages.unshift({
          type,
          body: JSON.stringify(data)
        })

        // 处理特定类型的消息
        if (type === 'status') {
          this.setMatchInfo({
            device_id: this.deviceId,
            status: data.status
          })
        }
	

        // 处理PONG响应
        if (type === 'reply' && data.type === 'ACK' && data.message === 'PONG') {
          console.log('收到心跳响应:', data)
        }
      } catch (err) {
        console.error(`解析${type}消息失败`, err)
      }
    },
    /**
     * 处理所有传入消息
     * @param {String} type 消息类型
     * @param {Object} data 消息数据
     */
    handleIncomingMessage(type, data) {
      console.log('[onMessage]', type, data)
      this.messages.unshift({
        type,
        body: JSON.stringify(data)
      })

      // 处理特定类型的消息
      if (type === 'status') {
        this.setMatchInfo({
          device_id: this.deviceId,
          status: data.status
        })
      } 

      // 处理PONG响应
      if (type === 'reply' && data.type === 'ACK' && data.message === 'PONG') {
        console.log('收到心跳响应:', data)
      }
    },
    
    /**
     * 清理资源
     */
    cleanup() {
      // 关闭WebSocket连接
      closeStomp()

      // 清理超时定时器
      if (this.loadTimeout) {
        clearTimeout(this.loadTimeout)
        this.loadTimeout = null
      }

      // 重置状态
      this.deviceId = null
    },
    /**
     * 关闭WebSocket连接
     */
    closeWebSocket() {
      this.cleanup()
    },

如果只是接受websocket消息做逻辑处理如下

import { connectStomp,  removeMessageListener, sendStompMessage } from '@/utils/socket.js'
wsMessageHandler: null, // 保存WebSocket消息处理器的引用
onShow(){
// 初始化webSocket连接
 this.initWebSocket()
},
 onHide() {
    // 页面隐藏时不清理监听器,保持监听以便收到消息后能跳转
  },

  onUnload() {
    // 清理WebSocket监听器
    this.cleanupWebSocket()
  },
methods:{
// * 初始化WebSocket连接
async initWebSocket(){
      const { device_id } = this.$store.state.match
 const userId = this.$store.state.user?.userInfo?.id || ''
try{
//创建消息处理函数
this.wsMessageHandler=(type,data)=>{

 this.handleWebSocketMessage(type,data)
 }
// 连接scoket
await connectStomp(userid,this.wsMessageHandler,()=>{// console.log('WebSocket连接成功,消息监听器已注册')})

}catch(error){
 console.error('WebSocket连接失败:', error)
}
  },

// 处理webSocket消息
  handleWebSocketMessage(type,data){
// 处理 ORDER_STATUS 类型的消息
if(data.type==='ORDER_STATUS' && data.status==1){
//逻辑处理
}
 },
// 清理socket连接
 cleanupWebSocket() {
      // 移除本页面的消息监听器,不关闭连接(因为其他页面也在使用)
      if (this.wsMessageHandler) {
        removeMessageListener(this.wsMessageHandler)
        this.wsMessageHandler = null
        // console.log('已移除WebSocket消息监听器')
      }
    }
}

如果需要发送消息 import {sendStompMessage} from '@utils/socket.js'

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐