记录uniapp websocket 适配器连接stomp订阅主题
不使用sockte.io的方法通过封装uni.connectSocket和适配器的方法,在小程序封装websocket。封装包括创建websocket连接 ,订阅消息,发送心跳机制,关闭websocket,以及添加消息监听器。在utils.js中封装websocket 因为涉及多人场景操作,所以添加了消息监听器。首先下载stompjs。
·
不使用sockte.io的方法通过封装uni.connectSocket和适配器的方法,在小程序封装websocket
首先下载stompjs
npm i stompjs -S
封装包括创建websocket连接 ,订阅消息,发送心跳机制,关闭websocket,以及添加消息监听器
在utils.js中封装websocket 因为涉及多人场景操作,所以添加了消息监听器
// utils/socket.js
import Stomp from 'stompjs'
import config from '@/config/env.js'
import store from '@/store'
const WS_URL = config.websocketURL
const HEARTBEAT_INTERVAL = 20000
const RECONNECT_INTERVAL = 5000
let stompClient = null
let reconnectTimer = null
let heartBeatTimer = null
let isConnected = false
let messageListeners = [] // 存储多个消息监听器
/**
* 创建 uni-app WebSocket 包装器
* @param {String} url WebSocket URL
* @returns {Object} WebSocket 包装对象
*/
function createUniSocket(url) {
const socketTask = uni.connectSocket({ url })
const ws = {
onopen: null,
onmessage: null,
onclose: null,
onerror: null,
send(data) {
if (socketTask && typeof socketTask.send === 'function') {
socketTask.send({ data })
} else {
uni.sendSocketMessage({ data })
}
},
close() {
if (socketTask && typeof socketTask.close === 'function') {
socketTask.close()
} else {
uni.closeSocket()
}
}
}
const hasPerTask = socketTask && typeof socketTask.onOpen === 'function'
if (hasPerTask) {
socketTask.onOpen((event) => ws.onopen && ws.onopen(event))
socketTask.onMessage((e) => ws.onmessage && ws.onmessage({ data: e.data }))
socketTask.onClose((event) => ws.onclose && ws.onclose(event))
socketTask.onError((event) => ws.onerror && ws.onerror(event))
} else {
uni.onSocketOpen((event) => ws.onopen && ws.onopen(event))
uni.onSocketMessage((e) => ws.onmessage && ws.onmessage({ data: e.data }))
uni.onSocketClose((event) => ws.onclose && ws.onclose(event))
uni.onSocketError((event) => ws.onerror && ws.onerror(event))
}
return ws
}
/**
* 连接 STOMP
* @param {String} userId 当前用户ID(用于订阅私有频道)
* @param {Function} onMessage 回调 (type, data)
* @param {Function} onConnected 连接成功回调
* @returns {Promise} 连接 Promise
*/
export function connectStomp(userId, onMessage, onConnected) {
// 如果已连接,添加新的消息监听器
if (isConnected) {
console.log('已连接 STOMP,添加消息监听器', onMessage)
if (onMessage && typeof onMessage === 'function') {
// 检查是否已存在相同的监听器,避免重复添加
if (!messageListeners.includes(onMessage)) {
messageListeners.push(onMessage)
console.log('消息监听器已添加,当前监听器数量:', messageListeners.length)
} else {
console.log('消息监听器已存在,跳过添加')
}
}
onConnected && onConnected()
return Promise.resolve()
}
// 清空之前的监听器列表
messageListeners = []
if (onMessage && typeof onMessage === 'function') {
messageListeners.push(onMessage)
}
console.log('尝试连接 STOMP...')
const ws = createUniSocket(WS_URL)
stompClient = Stomp.over(ws)
stompClient.debug = () => {}
// 获取设备ID用于连接headers
const deviceId = store.state.match?.device_id
const connectHeaders = {}
if (deviceId !== null && deviceId !== undefined) {
connectHeaders.deviceId = String(deviceId)
console.log('连接 headers 设置 deviceId:', connectHeaders.deviceId, typeof connectHeaders.deviceId)
}
return new Promise((resolve) => {
stompClient.connect(connectHeaders, (frame) => {
console.log('STOMP 已连接:', frame.headers)
isConnected = true
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
// 自动订阅各种消息
const subscriptions = [
{ path: '/user/queue/subscription', type: 'subscription' }, // 订阅确认
{ path: `/topic/device/${String(deviceId)}`, type: 'broadcast' }, // 广播消息
{ path: '/user/queue/private', type: 'private' }, // 单播消息
{ path: '/user/queue/reply', type: 'reply' } // 消息回复
]
subscriptions.forEach(({ path, type }) => {
stompClient.subscribe(path, (msg) => {
try {
const data = JSON.parse(msg.body)
console.log(`${type} 消息:`, data, '当前监听器数量:', messageListeners.length)
// 调用所有注册的监听器
messageListeners.forEach((listener, index) => {
try {
console.log(`调用监听器 ${index}:`, listener)
listener(type, data)
} catch (err) {
console.error(`监听器 ${index} 执行失败:`, err)
}
})
} catch (err) {
console.error('消息解析失败:', err)
}
})
})
// 发送订阅请求给服务端
const subscribeDeviceId = store.state.match?.device_id
if (subscribeDeviceId !== null && subscribeDeviceId !== undefined) {
const subscribeHeaders = {
deviceId: String(subscribeDeviceId)
}
sendStompMessage('/app/device.subscribe', {}, subscribeHeaders)
console.log('订阅请求已发送:', subscribeHeaders)
} else {
console.warn('未获取到设备ID,无法发送订阅请求')
}
startHeartbeat()
onConnected && onConnected()
resolve()
}, (err) => {
console.error('STOMP 连接失败:', err)
isConnected = false
tryReconnect(userId, onMessage)
})
})
}
/**
* 发送消息
* @param {String} destination 目标地址
* @param {Object} body 消息体
* @param {Object} headers 可选的消息头
*/
export function sendStompMessage(destination, body, headers = {}) {
if (!stompClient || !isConnected) {
console.warn('STOMP 未连接,发送失败')
return
}
stompClient.send(destination, headers, JSON.stringify(body))
}
/**
* 启动心跳机制
*/
function startHeartbeat() {
stopHeartbeat()
const sendPing = () => {
if (!stompClient || !isConnected) {
console.warn('STOMP 未连接,跳过发送心跳')
return
}
const deviceId = store.state.match?.device_id
if (!deviceId) {
console.warn('未获取到设备ID,跳过发送心跳')
return
}
const now = new Date();
const localTime = new Date(now.getTime() - now.getTimezoneOffset() * 60000);
const timestamp = localTime.toISOString().replace('T', ' ').substring(0, 19);
const heartbeatMsg = {
type: 'PING',
timestamp: timestamp
}
try {
const headers = {
deviceId: String(deviceId)
}
sendStompMessage('/app/device.message', heartbeatMsg, headers)
console.log('心跳包已发送:', { ...heartbeatMsg, deviceId })
} catch (error) {
console.error('发送心跳失败:', error)
}
}
// 立即发送一次心跳
sendPing()
// 设置定时器
heartBeatTimer = setInterval(sendPing, HEARTBEAT_INTERVAL)
}
/**
* 停止心跳
*/
function stopHeartbeat() {
if (heartBeatTimer) {
clearInterval(heartBeatTimer)
heartBeatTimer = null
}
}
/**
* 尝试重连
* @param {String} userId 用户ID
* @param {Function} onMessage 消息回调
*/
function tryReconnect(userId, onMessage) {
if (reconnectTimer) return
reconnectTimer = setTimeout(() => {
console.log('尝试重连中...')
// 重连时保留所有监听器
const listeners = [...messageListeners]
messageListeners = []
if (onMessage && typeof onMessage === 'function') {
messageListeners.push(onMessage)
}
// 重新添加之前的监听器
listeners.forEach(listener => {
if (listener !== onMessage && !messageListeners.includes(listener)) {
messageListeners.push(listener)
}
})
connectStomp(userId, onMessage)
}, RECONNECT_INTERVAL)
}
/**
* 移除消息监听器
* @param {Function} listener 要移除的监听器函数
*/
export function removeMessageListener(listener) {
const index = messageListeners.indexOf(listener)
if (index > -1) {
messageListeners.splice(index, 1)
console.log('已移除消息监听器')
}
}
/**
* 主动关闭连接
*/
export function closeStomp() {
if (stompClient && isConnected) {
stompClient.disconnect(() => {
console.log('STOMP 已断开')
})
}
stopHeartbeat()
stompClient = null
isConnected = false
messageListeners = [] // 清空所有监听器
}
/**
* 获取底层 stompClient
* @returns {Object} STOMP 客户端实例
*/
export function getStompClient() {
return stompClient
}
在页面中使用
import { connectStomp, closeStomp } from '@/utils/socket.js'
/**
* 初始化WebSocket连接
* @param {Object} qrData 二维码数据
*/
async initWebSocket(qrData) {
// 清理之前的连接和订阅
this.cleanup()
// 保存设备ID
this.deviceId = String(qrData.device_id)
const userId = this.$store.state.user?.userInfo?.id || ''
try {
// 连接STOMP
await connectStomp(userId, this.handleIncomingMessage)
return true
} catch (error) {
console.error('WebSocket初始化失败:', error)
this.cleanup()
throw error
}
},
/**
* 处理接收到的消息
* @param {String} type 消息类型
* @param {Object} msg 消息对象
*/
handleMessage(type, msg) {
try {
const data = JSON.parse(msg.body)
this.messages.unshift({
type,
body: JSON.stringify(data)
})
// 处理特定类型的消息
if (type === 'status') {
this.setMatchInfo({
device_id: this.deviceId,
status: data.status
})
}
// 处理PONG响应
if (type === 'reply' && data.type === 'ACK' && data.message === 'PONG') {
console.log('收到心跳响应:', data)
}
} catch (err) {
console.error(`解析${type}消息失败`, err)
}
},
/**
* 处理所有传入消息
* @param {String} type 消息类型
* @param {Object} data 消息数据
*/
handleIncomingMessage(type, data) {
console.log('[onMessage]', type, data)
this.messages.unshift({
type,
body: JSON.stringify(data)
})
// 处理特定类型的消息
if (type === 'status') {
this.setMatchInfo({
device_id: this.deviceId,
status: data.status
})
}
// 处理PONG响应
if (type === 'reply' && data.type === 'ACK' && data.message === 'PONG') {
console.log('收到心跳响应:', data)
}
},
/**
* 清理资源
*/
cleanup() {
// 关闭WebSocket连接
closeStomp()
// 清理超时定时器
if (this.loadTimeout) {
clearTimeout(this.loadTimeout)
this.loadTimeout = null
}
// 重置状态
this.deviceId = null
},
/**
* 关闭WebSocket连接
*/
closeWebSocket() {
this.cleanup()
},
如果只是接受websocket消息做逻辑处理如下
import { connectStomp, removeMessageListener, sendStompMessage } from '@/utils/socket.js'
wsMessageHandler: null, // 保存WebSocket消息处理器的引用
onShow(){
// 初始化webSocket连接
this.initWebSocket()
},
onHide() {
// 页面隐藏时不清理监听器,保持监听以便收到消息后能跳转
},
onUnload() {
// 清理WebSocket监听器
this.cleanupWebSocket()
},
methods:{
// * 初始化WebSocket连接
async initWebSocket(){
const { device_id } = this.$store.state.match
const userId = this.$store.state.user?.userInfo?.id || ''
try{
//创建消息处理函数
this.wsMessageHandler=(type,data)=>{
this.handleWebSocketMessage(type,data)
}
// 连接scoket
await connectStomp(userid,this.wsMessageHandler,()=>{// console.log('WebSocket连接成功,消息监听器已注册')})
}catch(error){
console.error('WebSocket连接失败:', error)
}
},
// 处理webSocket消息
handleWebSocketMessage(type,data){
// 处理 ORDER_STATUS 类型的消息
if(data.type==='ORDER_STATUS' && data.status==1){
//逻辑处理
}
},
// 清理socket连接
cleanupWebSocket() {
// 移除本页面的消息监听器,不关闭连接(因为其他页面也在使用)
if (this.wsMessageHandler) {
removeMessageListener(this.wsMessageHandler)
this.wsMessageHandler = null
// console.log('已移除WebSocket消息监听器')
}
}
}
如果需要发送消息 import {sendStompMessage} from '@utils/socket.js'
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)