Python服务器:xiaozhi-esp32官方后端实现

【免费下载链接】xiaozhi-esp32 Build your own AI friend 【免费下载链接】xiaozhi-esp32 项目地址: https://gitcode.com/GitHub_Trending/xia/xiaozhi-esp32

概述

xiaozhi-esp32是一个基于ESP32的开源AI聊天机器人项目,支持语音交互、离线唤醒、MCP协议控制等功能。本文将详细介绍如何实现一个完整的Python后端服务器,用于与xiaozhi-esp32设备进行通信。

核心通信协议

WebSocket通信架构

xiaozhi-esp32设备与服务器之间主要通过WebSocket协议进行双向通信,支持文本JSON消息和二进制音频数据传输。

mermaid

消息类型汇总表

消息类型 方向 描述 关键字段
hello 双向 连接握手消息 type, version, features, audio_params
listen 设备→服务器 开始/停止录音 type, state, mode, session_id
stt 服务器→设备 语音识别结果 type, text, session_id
tts 服务器→设备 文本转语音控制 type, state, text, session_id
mcp 双向 MCP协议消息 type, payload(JSON-RPC 2.0)
abort 设备→服务器 终止当前操作 type, reason, session_id

Python服务器实现

基础WebSocket服务器

import asyncio
import websockets
import json
import logging
from dataclasses import dataclass
from typing import Dict, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class DeviceSession:
    websocket: websockets.WebSocketServerProtocol
    session_id: str
    device_id: str
    audio_params: Dict
    features: Dict

class XiaozhiServer:
    def __init__(self, host: str = "0.0.0.0", port: int = 8765):
        self.host = host
        self.port = port
        self.sessions: Dict[str, DeviceSession] = {}
        
    async def handle_hello(self, websocket, message: Dict) -> Dict:
        """处理设备Hello消息"""
        session_id = message.get("session_id") or self.generate_session_id()
        device_id = message.get("device_id", "unknown")
        
        # 验证设备能力
        features = message.get("features", {})
        audio_params = message.get("audio_params", {})
        
        # 创建会话
        session = DeviceSession(
            websocket=websocket,
            session_id=session_id,
            device_id=device_id,
            audio_params=audio_params,
            features=features
        )
        self.sessions[session_id] = session
        
        # 返回服务器Hello响应
        return {
            "type": "hello",
            "transport": "websocket",
            "session_id": session_id,
            "audio_params": {
                "format": "opus",
                "sample_rate": 24000,
                "channels": 1,
                "frame_duration": 60
            }
        }
    
    async def handle_listen(self, session: DeviceSession, message: Dict):
        """处理监听状态变化"""
        state = message.get("state")
        mode = message.get("mode", "auto")
        
        if state == "start":
            logger.info(f"设备 {session.device_id} 开始录音,模式: {mode}")
            # 准备接收音频数据
        elif state == "stop":
            logger.info(f"设备 {session.device_id} 停止录音")
        elif state == "detect":
            wake_word = message.get("text", "")
            logger.info(f"设备检测到唤醒词: {wake_word}")
    
    async def handle_audio_data(self, session: DeviceSession, audio_data: bytes):
        """处理接收到的音频数据"""
        # 这里可以对接语音识别服务
        try:
            # 模拟语音识别过程
            recognized_text = await self.speech_to_text(audio_data)
            
            if recognized_text:
                # 发送识别结果给设备
                stt_message = {
                    "type": "stt",
                    "session_id": session.session_id,
                    "text": recognized_text
                }
                await session.websocket.send(json.dumps(stt_message))
                
                # 生成AI回复
                ai_response = await self.generate_ai_response(recognized_text)
                await self.send_tts_response(session, ai_response)
                
        except Exception as e:
            logger.error(f"音频处理错误: {e}")
    
    async def speech_to_text(self, audio_data: bytes) -> Optional[str]:
        """语音识别接口"""
        # 这里可以集成Whisper、百度语音识别等服务
        # 返回识别到的文本
        return "这是一段测试语音识别文本"
    
    async def generate_ai_response(self, text: str) -> str:
        """AI回复生成"""
        # 这里可以集成ChatGPT、文心一言等大模型
        return f"你好,我听到你说:{text}"
    
    async def send_tts_response(self, session: DeviceSession, text: str):
        """发送TTS回复"""
        # 发送TTS开始通知
        tts_start = {
            "type": "tts",
            "session_id": session.session_id,
            "state": "start"
        }
        await session.websocket.send(json.dumps(tts_start))
        
        # 生成TTS音频(这里需要集成TTS服务)
        tts_audio = await self.text_to_speech(text)
        
        # 发送音频数据(二进制)
        await session.websocket.send(tts_audio)
        
        # 发送TTS结束通知
        tts_stop = {
            "type": "tts",
            "session_id": session.session_id,
            "state": "stop"
        }
        await session.websocket.send(json.dumps(tts_stop))
    
    async def text_to_speech(self, text: str) -> bytes:
        """文本转语音接口"""
        # 这里可以集成Edge-TTS、百度TTS等服务
        # 返回Opus编码的音频数据
        return b""  # 实际的音频数据
    
    def generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        import uuid
        return str(uuid.uuid4())
    
    async def connection_handler(self, websocket):
        """WebSocket连接处理"""
        session = None
        
        try:
            async for message in websocket:
                if isinstance(message, bytes):
                    # 处理二进制音频数据
                    if session:
                        await self.handle_audio_data(session, message)
                else:
                    # 处理文本JSON消息
                    data = json.loads(message)
                    msg_type = data.get("type")
                    
                    if msg_type == "hello":
                        response = await self.handle_hello(websocket, data)
                        await websocket.send(json.dumps(response))
                        session = self.sessions.get(response["session_id"])
                    
                    elif msg_type == "listen" and session:
                        await self.handle_listen(session, data)
                    
                    elif msg_type == "mcp" and session:
                        await self.handle_mcp_message(session, data)
                    
                    elif msg_type == "abort" and session:
                        logger.info(f"会话中止: {data.get('reason')}")
        
        except websockets.exceptions.ConnectionClosed:
            logger.info("WebSocket连接关闭")
        finally:
            if session:
                self.sessions.pop(session.session_id, None)
    
    async def start_server(self):
        """启动WebSocket服务器"""
        server = await websockets.serve(
            self.connection_handler,
            self.host,
            self.port
        )
        logger.info(f"服务器启动在 {self.host}:{self.port}")
        await server.wait_closed()

# 启动服务器
if __name__ == "__main__":
    server = XiaozhiServer()
    asyncio.run(server.start_server())

MCP协议实现

MCP(Model Context Protocol)是设备与服务器之间的控制协议,基于JSON-RPC 2.0规范。

class McpHandler:
    """MCP协议处理器"""
    
    def __init__(self):
        self.tools = self._initialize_tools()
    
    def _initialize_tools(self) -> Dict:
        """初始化设备工具列表"""
        return {
            "self.get_device_status": {
                "description": "获取设备状态信息",
                "inputSchema": {
                    "type": "object",
                    "properties": {}
                }
            },
            "self.audio_speaker.set_volume": {
                "description": "设置设备音量",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "volume": {
                            "type": "integer",
                            "minimum": 0,
                            "maximum": 100
                        }
                    },
                    "required": ["volume"]
                }
            },
            "self.light.set_rgb": {
                "description": "设置RGB灯光",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "r": {"type": "integer", "minimum": 0, "maximum": 255},
                        "g": {"type": "integer", "minimum": 0, "maximum": 255},
                        "b": {"type": "integer", "minimum": 0, "maximum": 255}
                    },
                    "required": ["r", "g", "b"]
                }
            }
        }
    
    async def handle_initialize(self, params: Dict) -> Dict:
        """处理MCP初始化请求"""
        return {
            "protocolVersion": "2024-11-05",
            "capabilities": {"tools": {}},
            "serverInfo": {
                "name": "Xiaozhi Python Server",
                "version": "1.0.0"
            }
        }
    
    async def handle_tools_list(self, params: Dict) -> Dict:
        """处理工具列表请求"""
        cursor = params.get("cursor", "")
        tools_list = list(self.tools.values())
        
        return {
            "tools": tools_list,
            "nextCursor": ""  # 简单实现,不支持分页
        }
    
    async def handle_tools_call(self, params: Dict) -> Dict:
        """处理工具调用请求"""
        tool_name = params.get("name")
        arguments = params.get("arguments", {})
        
        if tool_name == "self.get_device_status":
            result = await self._get_device_status()
        elif tool_name == "self.audio_speaker.set_volume":
            result = await self._set_volume(arguments)
        elif tool_name == "self.light.set_rgb":
            result = await self._set_rgb_light(arguments)
        else:
            raise Exception(f"未知工具: {tool_name}")
        
        return {
            "content": [{"type": "text", "text": str(result)}],
            "isError": False
        }
    
    async def _get_device_status(self) -> Dict:
        """获取设备状态"""
        return {
            "battery_level": 85,
            "network_status": "connected",
            "memory_usage": "45%"
        }
    
    async def _set_volume(self, arguments: Dict) -> bool:
        """设置音量"""
        volume = arguments.get("volume", 50)
        logger.info(f"设置音量: {volume}")
        return True
    
    async def _set_rgb_light(self, arguments: Dict) -> bool:
        """设置RGB灯光"""
        r = arguments.get("r", 0)
        g = arguments.get("g", 0)
        b = arguments.get("b", 0)
        logger.info(f"设置RGB灯光: R={r}, G={g}, B={b}")
        return True

# 在XiaozhiServer中添加MCP处理
async def handle_mcp_message(self, session: DeviceSession, message: Dict):
    """处理MCP协议消息"""
    mcp_handler = McpHandler()
    payload = message.get("payload", {})
    method = payload.get("method")
    params = payload.get("params", {})
    request_id = payload.get("id")
    
    try:
        if method == "initialize":
            result = await mcp_handler.handle_initialize(params)
        elif method == "tools/list":
            result = await mcp_handler.handle_tools_list(params)
        elif method == "tools/call":
            result = await mcp_handler.handle_tools_call(params)
        else:
            raise Exception(f"未知MCP方法: {method}")
        
        # 发送成功响应
        response = {
            "session_id": session.session_id,
            "type": "mcp",
            "payload": {
                "jsonrpc": "2.0",
                "id": request_id,
                "result": result
            }
        }
        await session.websocket.send(json.dumps(response))
    
    except Exception as e:
        # 发送错误响应
        error_response = {
            "session_id": session.session_id,
            "type": "mcp",
            "payload": {
                "jsonrpc": "2.0",
                "id": request_id,
                "error": {
                    "code": -32601,
                    "message": str(e)
                }
            }
        }
        await session.websocket.send(json.dumps(error_response))

音频处理集成

Opus编解码实现

import opuslib
import numpy as np
from io import BytesIO

class AudioProcessor:
    """音频处理器"""
    
    def __init__(self, sample_rate=16000, channels=1):
        self.sample_rate = sample_rate
        self.channels = channels
        
        # 创建Opus编码器
        self.encoder = opuslib.Encoder(sample_rate, channels, opuslib.APPLICATION_VOIP)
        self.decoder = opuslib.Decoder(sample_rate, channels)
    
    def encode_audio(self, pcm_data: bytes) -> bytes:
        """将PCM数据编码为Opus格式"""
        # 将PCM数据转换为numpy数组
        pcm_array = np.frombuffer(pcm_data, dtype=np.int16)
        
        # 编码为Opus
        encoded_data = self.encoder.encode(pcm_array, len(pcm_array) // 2, 1000)
        return encoded_data
    
    def decode_audio(self, opus_data: bytes) -> bytes:
        """将Opus数据解码为PCM格式"""
        # 解码Opus数据
        decoded_data = self.decoder.decode(opus_data, len(opus_data), 0)
        return decoded_data.tobytes()
    
    def resample_audio(self, pcm_data: bytes, from_rate: int, to_rate: int) -> bytes:
        """音频重采样"""
        from scipy import signal
        import numpy as np
        
        # 将字节数据转换为numpy数组
        audio_array = np.frombuffer(pcm_data, dtype=np.int16)
        
        # 计算重采样比例
        resample_ratio = to_rate / from_rate
        num_samples = int(len(audio_array) * resample_ratio)
        
        # 执行重采样
        resampled = signal.resample(audio_array, num_samples)
        return resampled.astype(np.int16).tobytes()

语音服务集成示例

class SpeechServices:
    """语音服务集成"""
    
    @staticmethod

【免费下载链接】xiaozhi-esp32 Build your own AI friend 【免费下载链接】xiaozhi-esp32 项目地址: https://gitcode.com/GitHub_Trending/xia/xiaozhi-esp32

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐