Python服务器:xiaozhi-esp32官方后端实现
xiaozhi-esp32是一个基于ESP32的开源AI聊天机器人项目,支持语音交互、离线唤醒、MCP协议控制等功能。本文将详细介绍如何实现一个完整的Python后端服务器,用于与xiaozhi-esp32设备进行通信。## 核心通信协议### WebSocket通信架构xiaozhi-esp32设备与服务器之间主要通过WebSocket协议进行双向通信,支持文本JSON消息和二进制音...
·
Python服务器:xiaozhi-esp32官方后端实现
概述
xiaozhi-esp32是一个基于ESP32的开源AI聊天机器人项目,支持语音交互、离线唤醒、MCP协议控制等功能。本文将详细介绍如何实现一个完整的Python后端服务器,用于与xiaozhi-esp32设备进行通信。
核心通信协议
WebSocket通信架构
xiaozhi-esp32设备与服务器之间主要通过WebSocket协议进行双向通信,支持文本JSON消息和二进制音频数据传输。
消息类型汇总表
| 消息类型 | 方向 | 描述 | 关键字段 |
|---|---|---|---|
| hello | 双向 | 连接握手消息 | type, version, features, audio_params |
| listen | 设备→服务器 | 开始/停止录音 | type, state, mode, session_id |
| stt | 服务器→设备 | 语音识别结果 | type, text, session_id |
| tts | 服务器→设备 | 文本转语音控制 | type, state, text, session_id |
| mcp | 双向 | MCP协议消息 | type, payload(JSON-RPC 2.0) |
| abort | 设备→服务器 | 终止当前操作 | type, reason, session_id |
Python服务器实现
基础WebSocket服务器
import asyncio
import websockets
import json
import logging
from dataclasses import dataclass
from typing import Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DeviceSession:
websocket: websockets.WebSocketServerProtocol
session_id: str
device_id: str
audio_params: Dict
features: Dict
class XiaozhiServer:
def __init__(self, host: str = "0.0.0.0", port: int = 8765):
self.host = host
self.port = port
self.sessions: Dict[str, DeviceSession] = {}
async def handle_hello(self, websocket, message: Dict) -> Dict:
"""处理设备Hello消息"""
session_id = message.get("session_id") or self.generate_session_id()
device_id = message.get("device_id", "unknown")
# 验证设备能力
features = message.get("features", {})
audio_params = message.get("audio_params", {})
# 创建会话
session = DeviceSession(
websocket=websocket,
session_id=session_id,
device_id=device_id,
audio_params=audio_params,
features=features
)
self.sessions[session_id] = session
# 返回服务器Hello响应
return {
"type": "hello",
"transport": "websocket",
"session_id": session_id,
"audio_params": {
"format": "opus",
"sample_rate": 24000,
"channels": 1,
"frame_duration": 60
}
}
async def handle_listen(self, session: DeviceSession, message: Dict):
"""处理监听状态变化"""
state = message.get("state")
mode = message.get("mode", "auto")
if state == "start":
logger.info(f"设备 {session.device_id} 开始录音,模式: {mode}")
# 准备接收音频数据
elif state == "stop":
logger.info(f"设备 {session.device_id} 停止录音")
elif state == "detect":
wake_word = message.get("text", "")
logger.info(f"设备检测到唤醒词: {wake_word}")
async def handle_audio_data(self, session: DeviceSession, audio_data: bytes):
"""处理接收到的音频数据"""
# 这里可以对接语音识别服务
try:
# 模拟语音识别过程
recognized_text = await self.speech_to_text(audio_data)
if recognized_text:
# 发送识别结果给设备
stt_message = {
"type": "stt",
"session_id": session.session_id,
"text": recognized_text
}
await session.websocket.send(json.dumps(stt_message))
# 生成AI回复
ai_response = await self.generate_ai_response(recognized_text)
await self.send_tts_response(session, ai_response)
except Exception as e:
logger.error(f"音频处理错误: {e}")
async def speech_to_text(self, audio_data: bytes) -> Optional[str]:
"""语音识别接口"""
# 这里可以集成Whisper、百度语音识别等服务
# 返回识别到的文本
return "这是一段测试语音识别文本"
async def generate_ai_response(self, text: str) -> str:
"""AI回复生成"""
# 这里可以集成ChatGPT、文心一言等大模型
return f"你好,我听到你说:{text}"
async def send_tts_response(self, session: DeviceSession, text: str):
"""发送TTS回复"""
# 发送TTS开始通知
tts_start = {
"type": "tts",
"session_id": session.session_id,
"state": "start"
}
await session.websocket.send(json.dumps(tts_start))
# 生成TTS音频(这里需要集成TTS服务)
tts_audio = await self.text_to_speech(text)
# 发送音频数据(二进制)
await session.websocket.send(tts_audio)
# 发送TTS结束通知
tts_stop = {
"type": "tts",
"session_id": session.session_id,
"state": "stop"
}
await session.websocket.send(json.dumps(tts_stop))
async def text_to_speech(self, text: str) -> bytes:
"""文本转语音接口"""
# 这里可以集成Edge-TTS、百度TTS等服务
# 返回Opus编码的音频数据
return b"" # 实际的音频数据
def generate_session_id(self) -> str:
"""生成唯一的会话ID"""
import uuid
return str(uuid.uuid4())
async def connection_handler(self, websocket):
"""WebSocket连接处理"""
session = None
try:
async for message in websocket:
if isinstance(message, bytes):
# 处理二进制音频数据
if session:
await self.handle_audio_data(session, message)
else:
# 处理文本JSON消息
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "hello":
response = await self.handle_hello(websocket, data)
await websocket.send(json.dumps(response))
session = self.sessions.get(response["session_id"])
elif msg_type == "listen" and session:
await self.handle_listen(session, data)
elif msg_type == "mcp" and session:
await self.handle_mcp_message(session, data)
elif msg_type == "abort" and session:
logger.info(f"会话中止: {data.get('reason')}")
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket连接关闭")
finally:
if session:
self.sessions.pop(session.session_id, None)
async def start_server(self):
"""启动WebSocket服务器"""
server = await websockets.serve(
self.connection_handler,
self.host,
self.port
)
logger.info(f"服务器启动在 {self.host}:{self.port}")
await server.wait_closed()
# 启动服务器
if __name__ == "__main__":
server = XiaozhiServer()
asyncio.run(server.start_server())
MCP协议实现
MCP(Model Context Protocol)是设备与服务器之间的控制协议,基于JSON-RPC 2.0规范。
class McpHandler:
"""MCP协议处理器"""
def __init__(self):
self.tools = self._initialize_tools()
def _initialize_tools(self) -> Dict:
"""初始化设备工具列表"""
return {
"self.get_device_status": {
"description": "获取设备状态信息",
"inputSchema": {
"type": "object",
"properties": {}
}
},
"self.audio_speaker.set_volume": {
"description": "设置设备音量",
"inputSchema": {
"type": "object",
"properties": {
"volume": {
"type": "integer",
"minimum": 0,
"maximum": 100
}
},
"required": ["volume"]
}
},
"self.light.set_rgb": {
"description": "设置RGB灯光",
"inputSchema": {
"type": "object",
"properties": {
"r": {"type": "integer", "minimum": 0, "maximum": 255},
"g": {"type": "integer", "minimum": 0, "maximum": 255},
"b": {"type": "integer", "minimum": 0, "maximum": 255}
},
"required": ["r", "g", "b"]
}
}
}
async def handle_initialize(self, params: Dict) -> Dict:
"""处理MCP初始化请求"""
return {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "Xiaozhi Python Server",
"version": "1.0.0"
}
}
async def handle_tools_list(self, params: Dict) -> Dict:
"""处理工具列表请求"""
cursor = params.get("cursor", "")
tools_list = list(self.tools.values())
return {
"tools": tools_list,
"nextCursor": "" # 简单实现,不支持分页
}
async def handle_tools_call(self, params: Dict) -> Dict:
"""处理工具调用请求"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "self.get_device_status":
result = await self._get_device_status()
elif tool_name == "self.audio_speaker.set_volume":
result = await self._set_volume(arguments)
elif tool_name == "self.light.set_rgb":
result = await self._set_rgb_light(arguments)
else:
raise Exception(f"未知工具: {tool_name}")
return {
"content": [{"type": "text", "text": str(result)}],
"isError": False
}
async def _get_device_status(self) -> Dict:
"""获取设备状态"""
return {
"battery_level": 85,
"network_status": "connected",
"memory_usage": "45%"
}
async def _set_volume(self, arguments: Dict) -> bool:
"""设置音量"""
volume = arguments.get("volume", 50)
logger.info(f"设置音量: {volume}")
return True
async def _set_rgb_light(self, arguments: Dict) -> bool:
"""设置RGB灯光"""
r = arguments.get("r", 0)
g = arguments.get("g", 0)
b = arguments.get("b", 0)
logger.info(f"设置RGB灯光: R={r}, G={g}, B={b}")
return True
# 在XiaozhiServer中添加MCP处理
async def handle_mcp_message(self, session: DeviceSession, message: Dict):
"""处理MCP协议消息"""
mcp_handler = McpHandler()
payload = message.get("payload", {})
method = payload.get("method")
params = payload.get("params", {})
request_id = payload.get("id")
try:
if method == "initialize":
result = await mcp_handler.handle_initialize(params)
elif method == "tools/list":
result = await mcp_handler.handle_tools_list(params)
elif method == "tools/call":
result = await mcp_handler.handle_tools_call(params)
else:
raise Exception(f"未知MCP方法: {method}")
# 发送成功响应
response = {
"session_id": session.session_id,
"type": "mcp",
"payload": {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
}
await session.websocket.send(json.dumps(response))
except Exception as e:
# 发送错误响应
error_response = {
"session_id": session.session_id,
"type": "mcp",
"payload": {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": str(e)
}
}
}
await session.websocket.send(json.dumps(error_response))
音频处理集成
Opus编解码实现
import opuslib
import numpy as np
from io import BytesIO
class AudioProcessor:
"""音频处理器"""
def __init__(self, sample_rate=16000, channels=1):
self.sample_rate = sample_rate
self.channels = channels
# 创建Opus编码器
self.encoder = opuslib.Encoder(sample_rate, channels, opuslib.APPLICATION_VOIP)
self.decoder = opuslib.Decoder(sample_rate, channels)
def encode_audio(self, pcm_data: bytes) -> bytes:
"""将PCM数据编码为Opus格式"""
# 将PCM数据转换为numpy数组
pcm_array = np.frombuffer(pcm_data, dtype=np.int16)
# 编码为Opus
encoded_data = self.encoder.encode(pcm_array, len(pcm_array) // 2, 1000)
return encoded_data
def decode_audio(self, opus_data: bytes) -> bytes:
"""将Opus数据解码为PCM格式"""
# 解码Opus数据
decoded_data = self.decoder.decode(opus_data, len(opus_data), 0)
return decoded_data.tobytes()
def resample_audio(self, pcm_data: bytes, from_rate: int, to_rate: int) -> bytes:
"""音频重采样"""
from scipy import signal
import numpy as np
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(pcm_data, dtype=np.int16)
# 计算重采样比例
resample_ratio = to_rate / from_rate
num_samples = int(len(audio_array) * resample_ratio)
# 执行重采样
resampled = signal.resample(audio_array, num_samples)
return resampled.astype(np.int16).tobytes()
语音服务集成示例
class SpeechServices:
"""语音服务集成"""
@staticmethod
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)