数字人直播开发实战:从入门到部署

第一部分:快速入门 - 基于SadTalker的2D数字人直播

1.1 环境准备

# 创建虚拟环境
conda create -n digital_human python=3.8
conda activate digital_human

# 安装基础依赖
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
pip install opencv-python pillow numpy scipy tqdm

# 克隆SadTalker
git clone https://github.com/OpenTalker/SadTalker.git
cd SadTalker
pip install -r requirements.txt

1.2 核心驱动代码

# sadtalker_driver.py
import os
import cv2
import torch
import numpy as np
from gtts import gTTS
from pydub import AudioSegment
from src.audio2pose_models.audio2pose import Audio2Pose
from src.facerender.animate import AnimateFromCoeff

class SimpleDigitalHuman:
    def __init__(self, source_image_path, checkpoint_dir="checkpoints"):
        """
        初始化数字人驱动
        Args:
            source_image_path: 源图片路径(数字人形象)
            checkpoint_dir: 模型权重目录
        """
        self.source_image = cv2.imread(source_image_path)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 加载预训练模型
        self.load_models(checkpoint_dir)
        
    def load_models(self, checkpoint_dir):
        """加载必要的模型"""
        # 音频到表情系数模型
        self.audio2pose = Audio2Pose()
        pose_checkpoint = os.path.join(checkpoint_dir, "auido2pose_00140-model.pth")
        self.audio2pose.load_state_dict(torch.load(pose_checkpoint, map_location=self.device))
        self.audio2pose.to(self.device).eval()
        
        # 系数到动画模型
        self.animate_model = AnimateFromCoeff()
        animate_checkpoint = os.path.join(checkpoint_dir, "epoch_20.pth")
        self.animate_model.load_state_dict(torch.load(animate_checkpoint, map_location=self.device))
        self.animate_model.to(self.device).eval()
        
    def text_to_speech(self, text, output_audio="temp_audio.wav"):
        """文本转语音"""
        tts = gTTS(text=text, lang='zh-cn', slow=False)
        tts.save("temp_audio.mp3")
        
        # 转换格式
        audio = AudioSegment.from_mp3("temp_audio.mp3")
        audio.export(output_audio, format="wav")
        return output_audio
    
    def generate_video(self, audio_path, output_video="output.mp4"):
        """生成数字人说话视频"""
        # 提取音频特征
        from src.audio2exp.get_mel import get_mel_from_audio
        mel_chunks = get_mel_from_audio(audio_path)
        
        # 生成表情系数
        with torch.no_grad():
            batch = torch.from_numpy(mel_chunks).float().to(self.device)
            coeffs = self.audio2pose(batch)
        
        # 生成动画帧
        frames = []
        for i in range(len(coeffs)):
            coeff = coeffs[i:i+1]
            frame = self.animate_model(self.source_image, coeff)
            frames.append(frame)
        
        # 保存视频
        self.save_video(frames, audio_path, output_video)
        return output_video
    
    def save_video(self, frames, audio_path, output_path, fps=25):
        """保存带音频的视频"""
        height, width = frames[0].shape[:2]
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter("temp_video.mp4", fourcc, fps, (width, height))
        
        for frame in frames:
            out.write(frame)
        out.release()
        
        # 合并音频
        import subprocess
        cmd = f"ffmpeg -i temp_video.mp4 -i {audio_path} -c:v copy -c:a aac -strict experimental {output_path}"
        subprocess.call(cmd, shell=True)
        
        # 清理临时文件
        os.remove("temp_video.mp4")
    
    def live_stream_setup(self, rtmp_url="rtmp://localhost/live/stream"):
        """设置直播推流"""
        import subprocess
        
        # FFmpeg推流命令
        ffmpeg_cmd = [
            'ffmpeg',
            '-re',  # 按照实际帧率读取
            '-i', 'pipe:0',  # 从标准输入读取视频
            '-c:v', 'libx264',
            '-preset', 'veryfast',
            '-tune', 'zerolatency',
            '-f', 'flv',
            rtmp_url
        ]
        
        self.stream_process = subprocess.Popen(
            ffmpeg_cmd,
            stdin=subprocess.PIPE
        )
        
    def stream_frame(self, frame):
        """推送单帧到直播流"""
        if hasattr(self, 'stream_process'):
            # 转换帧格式并写入
            _, buffer = cv2.imencode('.jpg', frame)
            self.stream_process.stdin.write(buffer.tobytes())

# 使用示例
if __name__ == "__main__":
    # 1. 初始化数字人
    dh = SimpleDigitalHuman("source_image.jpg")
    
    # 2. 生成语音
    text = "大家好,我是数字人主播,欢迎来到我的直播间!"
    audio_file = dh.text_to_speech(text)
    
    # 3. 生成视频
    video_file = dh.generate_video(audio_file, "first_video.mp4")
    print(f"视频已生成: {video_file}")
    
    # 4. 简单直播演示(需要OBS或直播服务器)
    # dh.live_stream_setup()
    # 实时生成并推送帧(此处为示例循环)
    # for frame in generated_frames:
    #     dh.stream_frame(frame)

1.3 简易直播集成脚本

# simple_live_bot.py
import time
import queue
import threading
from sadtalker_driver import SimpleDigitalHuman

class LiveStreamBot:
    def __init__(self, image_path):
        self.dh = SimpleDigitalHuman(image_path)
        self.message_queue = queue.Queue()
        self.is_streaming = False
        
    def add_message(self, text):
        """添加直播消息到队列"""
        self.message_queue.put(text)
        
    def process_queue(self):
        """处理消息队列"""
        while self.is_streaming:
            try:
                text = self.message_queue.get(timeout=1)
                print(f"处理消息: {text}")
                
                # 生成语音
                audio_file = self.dh.text_to_speech(text, f"temp_{int(time.time())}.wav")
                
                # 生成视频(简化版,实际应流式生成)
                # 这里只生成一帧作为示例
                # 实际应该生成完整视频或实时帧序列
                
            except queue.Empty:
                continue
                
    def start_stream(self, duration=3600):
        """开始直播"""
        self.is_streaming = True
        
        # 启动消息处理线程
        processor = threading.Thread(target=self.process_queue)
        processor.daemon = True
        processor.start()
        
        # 模拟直播循环
        start_time = time.time()
        while time.time() - start_time < duration and self.is_streaming:
            # 这里可以添加实时帧生成和推送逻辑
            time.sleep(0.1)
            
    def stop_stream(self):
        """停止直播"""
        self.is_streaming = False

# 使用示例
if __name__ == "__main__":
    bot = LiveStreamBot("avatar.jpg")
    
    # 预设直播脚本
    script = [
        "欢迎来到数字人直播间!",
        "今天给大家介绍AI技术的最新进展。",
        "数字人直播正在改变内容创作方式。",
        "感谢大家的观看,记得关注哦!"
    ]
    
    # 添加消息到队列
    for line in script:
        bot.add_message(line)
        time.sleep(5)  # 模拟间隔
        
    # 开始直播
    bot.start_stream(duration=30)

第二部分:深入开发 - 智能对话与3D数字人集成

2.1 智能对话引擎集成

# intelligent_dialogue.py
import json
import requests
import asyncio
import websockets
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class DialogueState:
    """对话状态管理"""
    context: List[Dict]
    user_profile: Dict
    session_id: str
    max_history: int = 10

class LLMDialogueEngine:
    """基于大语言模型的对话引擎"""
    
    def __init__(self, api_key: str, model: str = "gpt-4"):
        self.api_key = api_key
        self.model = model
        self.base_url = "https://api.openai.com/v1/chat/completions"
        self.sessions: Dict[str, DialogueState] = {}
        
    def create_session(self, session_id: str, initial_context: List[Dict] = None):
        """创建新的对话会话"""
        if initial_context is None:
            initial_context = [
                {"role": "system", "content": "你是一个专业的数字人主播,热情、专业、善于互动。"}
            ]
        
        self.sessions[session_id] = DialogueState(
            context=initial_context,
            user_profile={},
            session_id=session_id
        )
        return session_id
    
    async def generate_response(self, session_id: str, user_input: str) -> Dict:
        """生成智能回复"""
        if session_id not in self.sessions:
            self.create_session(session_id)
            
        session = self.sessions[session_id]
        
        # 添加上下文
        session.context.append({"role": "user", "content": user_input})
        
        # 保持上下文长度
        if len(session.context) > session.max_history:
            session.context = session.context[-session.max_history:]
        
        # 调用LLM API
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": self.model,
            "messages": session.context,
            "temperature": 0.7,
            "max_tokens": 500
        }
        
        try:
            response = requests.post(
                self.base_url,
                headers=headers,
                json=payload,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                assistant_reply = result["choices"][0]["message"]["content"]
                
                # 更新上下文
                session.context.append({"role": "assistant", "content": assistant_reply})
                
                return {
                    "text": assistant_reply,
                    "emotion": self.analyze_emotion(assistant_reply),
                    "actions": self.extract_actions(assistant_reply)
                }
            else:
                return {"text": "抱歉,我暂时无法回答这个问题。", "emotion": "neutral"}
                
        except Exception as e:
            print(f"API调用错误: {e}")
            return {"text": "网络连接出现问题,请稍后再试。", "emotion": "neutral"}
    
    def analyze_emotion(self, text: str) -> str:
        """简单情感分析"""
        positive_words = ["开心", "高兴", "很棒", "优秀", "感谢", "欢迎"]
        negative_words = ["抱歉", "遗憾", "问题", "困难", "抱歉"]
        
        if any(word in text for word in positive_words):
            return "happy"
        elif any(word in text for word in negative_words):
            return "sad"
        else:
            return "neutral"
    
    def extract_actions(self, text: str) -> List[str]:
        """从文本中提取动作指令"""
        actions = []
        
        # 简单规则匹配
        if "挥手" in text or "打招呼" in text:
            actions.append("wave")
        if "点头" in text:
            actions.append("nod")
        if "展示" in text or "请看" in text:
            actions.append("point")
            
        return actions

class RealTimeChatProcessor:
    """实时聊天处理器"""
    
    def __init__(self, llm_engine: LLMDialogueEngine):
        self.llm_engine = llm_engine
        self.chat_queue = asyncio.Queue()
        self.response_callbacks = []
        
    async def process_chat_messages(self, platform: str = "douyin"):
        """处理来自不同平台的聊天消息"""
        print(f"开始处理{platform}聊天消息...")
        
        while True:
            try:
                # 模拟接收消息(实际应接入平台API)
                message = await self.simulate_receive_message(platform)
                
                if message:
                    # 生成回复
                    session_id = message.get("user_id", "default")
                    response = await self.llm_engine.generate_response(
                        session_id, 
                        message["content"]
                    )
                    
                    # 触发回调
                    for callback in self.response_callbacks:
                        await callback(response)
                        
                    # 模拟发送回复(实际应调用平台API)
                    await self.simulate_send_response(platform, message, response)
                    
            except Exception as e:
                print(f"处理消息错误: {e}")
                
            await asyncio.sleep(0.1)
    
    async def simulate_receive_message(self, platform: str) -> Optional[Dict]:
        """模拟接收消息(用于测试)"""
        # 实际应接入平台WebSocket或API
        return None
    
    async def simulate_send_response(self, platform: str, original_msg: Dict, response: Dict):
        """模拟发送回复"""
        print(f"[{platform}] 回复用户 {original_msg.get('user_name')}: {response['text']}")
    
    def register_callback(self, callback):
        """注册响应回调"""
        self.response_callbacks.append(callback)

# 使用示例
async def main_dialogue():
    # 初始化对话引擎
    llm_engine = LLMDialogueEngine(api_key="your-api-key")
    
    # 初始化聊天处理器
    chat_processor = RealTimeChatProcessor(llm_engine)
    
    # 注册数字人驱动回调
    async def digital_human_callback(response: Dict):
        print(f"数字人动作: {response['actions']}, 情感: {response['emotion']}")
        # 这里应该触发数字人的动画和语音生成
        
    chat_processor.register_callback(digital_human_callback)
    
    # 启动消息处理
    await chat_processor.process_chat_messages("douyin")

if __name__ == "__main__":
    asyncio.run(main_dialogue())

2.2 Unity 3D数字人集成

// Unity C# 脚本:DigitalHumanController.cs
using UnityEngine;
using System.Collections;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

public class DigitalHumanController : MonoBehaviour
{
    [Header("网络设置")]
    public string serverURL = "ws://localhost:8765";
    
    [Header("动画控制")]
    public Animator digitalHumanAnimator;
    public SkinnedMeshRenderer faceRenderer;
    
    [Header("混合形状索引")]
    public int blinkLeftIndex = 0;
    public int blinkRightIndex = 1;
    public int smileIndex = 2;
    public int mouthOpenIndex = 3;
    
    // WebSocket客户端
    private ClientWebSocket webSocket;
    private CancellationTokenSource cancellationTokenSource;
    
    // 动画状态
    private float[] blendShapeWeights = new float[50];
    private Vector3 targetHeadRotation;
    private string currentSpeechText = "";
    
    void Start()
    {
        InitializeWebSocket();
        InitializeAnimationParameters();
    }
    
    void InitializeAnimationParameters()
    {
        // 初始化混合形状权重
        for (int i = 0; i < blendShapeWeights.Length; i++)
        {
            blendShapeWeights[i] = 0f;
        }
        
        // 设置动画控制器参数
        if (digitalHumanAnimator != null)
        {
            digitalHumanAnimator.SetBool("IsTalking", false);
            digitalHumanAnimator.SetFloat("TalkSpeed", 1.0f);
        }
    }
    
    async void InitializeWebSocket()
    {
        try
        {
            webSocket = new ClientWebSocket();
            cancellationTokenSource = new CancellationTokenSource();
            
            await webSocket.ConnectAsync(new System.Uri(serverURL), cancellationTokenSource.Token);
            Debug.Log("WebSocket连接成功");
            
            // 启动接收消息协程
            StartCoroutine(ReceiveMessages());
        }
        catch (System.Exception e)
        {
            Debug.LogError($"WebSocket连接失败: {e.Message}");
        }
    }
    
    IEnumerator ReceiveMessages()
    {
        var buffer = new byte[4096];
        
        while (webSocket.State == WebSocketState.Open)
        {
            var segment = new ArraySegment<byte>(buffer);
            var receiveTask = webSocket.ReceiveAsync(segment, cancellationTokenSource.Token);
            
            // 等待消息(非阻塞方式)
            while (!receiveTask.IsCompleted)
            {
                yield return null;
            }
            
            if (receiveTask.IsCompletedSuccessfully)
            {
                var result = receiveTask.Result;
                if (result.MessageType == WebSocketMessageType.Text)
                {
                    string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
                    ProcessControlMessage(message);
                }
            }
        }
    }
    
    void ProcessControlMessage(string jsonMessage)
    {
        try
        {
            var controlData = JsonConvert.DeserializeObject<ControlData>(jsonMessage);
            
            // 处理面部动画
            if (controlData.blendShapes != null)
            {
                for (int i = 0; i < Mathf.Min(controlData.blendShapes.Length, blendShapeWeights.Length); i++)
                {
                    blendShapeWeights[i] = Mathf.Lerp(
                        blendShapeWeights[i], 
                        controlData.blendShapes[i], 
                        Time.deltaTime* 0.3f
                    );
                    
                    if (faceRenderer != null && i < faceRenderer.sharedMesh.blendShapeCount)
                    {
                        faceRenderer.SetBlendShapeWeight(i, blendShapeWeights[i] * 100);
                    }
                }
            }
            
            // 处理头部旋转
            if (controlData.headRotation != null)
            {
                targetHeadRotation = new Vector3(
                    controlData.headRotation[0],
                    controlData.headRotation[1],
                    controlData.headRotation[2]
                );
                
                // 平滑旋转
                transform.localRotation = Quaternion.Slerp(
                    transform.localRotation,
                    Quaternion.Euler(targetHeadRotation),
                    Time.deltaTime * 5f
                );
            }
            
            // 处理身体动作
            if (digitalHumanAnimator != null && !string.IsNullOrEmpty(controlData.animationTrigger))
            {
                digitalHumanAnimator.SetTrigger(controlData.animationTrigger);
            }
            
            // 处理语音
            if (!string.IsNullOrEmpty(controlData.speechText))
            {
                currentSpeechText = controlData.speechText;
                StartCoroutine(PlaySpeechAnimation(controlData.speechText));
            }
        }
        catch (System.Exception e)
        {
            Debug.LogError($"处理控制消息失败: {e.Message}");
        }
    }
    
    IEnumerator PlaySpeechAnimation(string text)
    {
        if (digitalHumanAnimator != null)
        {
            digitalHumanAnimator.SetBool("IsTalking", true);
            
            // 模拟口型同步(实际应根据音频分析)
            float duration = text.Length * 0.1f; // 简单估算
            float elapsed = 0f;
            
            while (elapsed < duration)
            {
                // 随机口型变化(实际应基于音频分析)
                float mouthOpen = Mathf.Sin(elapsed * 20f) * 0.5f + 0.5f;
                faceRenderer.SetBlendShapeWeight(mouthOpenIndex, mouthOpen * 100);
                
                elapsed += Time.deltaTime;
                yield return null;
            }
            
            digitalHumanAnimator.SetBool("IsTalking", false);
            faceRenderer.SetBlendShapeWeight(mouthOpenIndex, 0);
        }
    }
    
    async void SendStatusUpdate()
    {
        if (webSocket != null && webSocket.State == WebSocketState.Open)
        {
            var status = new
            {
                type = "status",
                timestamp = System.DateTime.UtcNow.Ticks,
                position = transform.position,
                rotation = transform.eulerAngles,
                isTalking = digitalHumanAnimator != null && digitalHumanAnimator.GetBool("IsTalking")
            };
            
            string json = JsonConvert.SerializeObject(status);
            byte[] buffer = Encoding.UTF8.GetBytes(json);
            
            await webSocket.SendAsync(
                new ArraySegment<byte>(buffer),
                WebSocketMessageType.Text,
                true,
                cancellationTokenSource.Token
            );
        }
    }
    
    void Update()
    {
        // 定期发送状态更新
        if (Time.frameCount % 60 == 0) // 每秒一次
        {
            SendStatusUpdate();
        }
        
        // 自动眨眼
        if (Time.frameCount % 300 == 0) // 每5秒一次
        {
            StartCoroutine(BlinkAnimation());
        }
    }
    
    IEnumerator BlinkAnimation()
    {
        float blinkDuration = 0.2f;
        float elapsed = 0f;
        
        while (elapsed < blinkDuration)
        {
            float weight = Mathf.Sin((elapsed / blinkDuration) * Mathf.PI) * 100;
            faceRenderer.SetBlendShapeWeight(blinkLeftIndex, weight);
            faceRenderer.SetBlendShapeWeight(blinkRightIndex, weight);
            
            elapsed += Time.deltaTime;
            yield return null;
        }
        
        faceRenderer.SetBlendShapeWeight(blinkLeftIndex, 0);
        faceRenderer.SetBlendShapeWeight(blinkRightIndex, 0);
    }
    
    void OnDestroy()
    {
        if (webSocket != null)
        {
            webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "关闭连接", CancellationToken.None);
        }
        
        if (cancellationTokenSource != null)
        {
            cancellationTokenSource.Cancel();
        }
    }
    
    [System.Serializable]
    public class ControlData
    {
        public float[] blendShapes;
        public float[] headRotation;
        public string animationTrigger;
        public string speechText;
    }
}

2.3 Python服务端控制代码

# unity_controller_server.py
import asyncio
import websockets
import json
import numpy as np
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class AnimationState:
    """动画状态管理"""
    blend_shapes: List[float]
    head_rotation: List[float]
    current_action: str
    emotion: str = "neutral"

class UnityWebSocketServer:
    """Unity WebSocket 控制服务器"""
    
    def __init__(self, host: str = "localhost", port: int = 8765):
        self.host = host
        self.port = port
        self.clients = set()
        self.animation_state = AnimationState(
            blend_shapes=[0.0] * 52,  # 标准52个混合形状
            head_rotation=[0.0, 0.0, 0.0],
            current_action="idle"
        )
        
    async def handle_client(self, websocket, path):
        """处理客户端连接"""
        self.clients.add(websocket)
        print(f"新客户端连接,当前连接数: {len(self.clients)}")
        
        try:
            # 发送初始状态
            await self.send_control_data(websocket)
            
            async for message in websocket:
                await self.process_client_message(websocket, message)
                
        except websockets.exceptions.ConnectionClosed:
            print("客户端断开连接")
        finally:
            self.clients.remove(websocket)
            
    async def process_client_message(self, websocket, message: str):
        """处理客户端消息"""
        try:
            data = json.loads(message)
            message_type = data.get("type")
            
            if message_type == "status":
                # 处理状态更新
                print(f"收到状态更新: {data}")
                
            elif message_type == "request":
                # 处理控制请求
                await self.handle_control_request(websocket, data)
                
        except json.JSONDecodeError as e:
            print(f"JSON解析错误: {e}")
            
    async def handle_control_request(self, websocket, data: Dict):
        """处理控制请求"""
        request_type = data.get("request_type")
        
        if request_type == "speak":
            text = data.get("text", "")
            emotion = data.get("emotion", "neutral")
            await self.trigger_speech(text, emotion)
            
        elif request_type == "action":
            action = data.get("action", "")
            await self.trigger_action(action)
            
        elif request_type == "emotion":
            emotion = data.get("emotion", "neutral")
            await self.set_emotion(emotion)
            
    async def trigger_speech(self, text: str, emotion: str = "neutral"):
        """触发语音动画"""
        print(f"触发语音: {text} (情感: {emotion})")
        
        # 根据情感设置混合形状
        emotion_shapes = self.get_emotion_shapes(emotion)
        
        # 生成口型动画序列(简化版)
        # 实际应根据音频分析生成精确的口型序列
        speech_frames = self.generate_speech_animation(text)
        
        # 发送给所有客户端
        for frame in speech_frames:
            control_data = {
                "type": "control",
                "blendShapes": frame["blend_shapes"],
                "headRotation": frame["head_rotation"],
                "speechText": text,
                "timestamp": datetime.now().isoformat()
            }
            
            await self.broadcast(json.dumps(control_data))
            await asyncio.sleep(0.033)  # 30fps
            
    async def trigger_action(self, action: str):
        """触发特定动作"""
        print(f"触发动作: {action}")
        
        action_map = {
            "wave": {"animationTrigger": "Wave", "blendShapes": self.get_wave_shapes()},
            "nod": {"animationTrigger": "Nod", "blendShapes": self.get_nod_shapes()},
            "point": {"animationTrigger": "Point", "blendShapes": self.get_point_shapes()},
        }
        
        if action in action_map:
            control_data = {
                "type": "control",
                **action_map[action],
                "timestamp": datetime.now().isoformat()
            }
            
            await self.broadcast(json.dumps(control_data))
            
    async def set_emotion(self, emotion: str):
        """设置情感状态"""
        print(f"设置情感: {emotion}")
        
        blend_shapes = self.get_emotion_shapes(emotion)
        
        control_data = {
            "type": "control",
            "blendShapes": blend_shapes,
            "emotion": emotion,
            "timestamp": datetime.now().isoformat()
        }
        
        await self.broadcast(json.dumps(control_data))
        
    def generate_speech_animation(self, text: str) -> List[Dict]:
        """生成语音动画序列(简化版)"""
        frames = []
        duration = len(text) * 0.1  # 简单估算
        
        for i in range(int(duration * 30)):  # 30fps
            # 生成基础口型(实际应根据音素)
            time_point = i / 30.0
            mouth_open = np.sin(time_point * 10) * 0.3 + 0.3
            
            # 复制当前混合形状并设置口型
            blend_shapes = self.animation_state.blend_shapes.copy()
            blend_shapes[3] = mouth_open  # mouthOpen索引
            
            # 轻微头部运动
            head_rotation = [
                np.sin(time_point * 2) * 5,  # pitch
                np.sin(time_point * 1.5) * 3,  # yaw
                0  # roll
            ]
            
            frames.append({
                "blend_shapes": blend_shapes,
                "head_rotation": head_rotation
            })
            
        return frames
    
    def get_emotion_shapes(self, emotion: str) -> List[float]:
        """获取情感对应的混合形状"""
        shapes = [0.0] * 52
        
        emotion_map = {
            "happy": {"smile": 0.8, "eyes_squint": 0.3},
            "sad": {"frown": 0.6, "inner_brow_raiser": 0.4},
            "angry": {"brow_lowerer": 0.7, "lip_tightener": 0.5},
            "surprised": {"jaw_drop": 0.4, "brow_raiser": 0.6},
            "neutral": {}  # 默认
        }
        
        if emotion in emotion_map:
            for shape_name, weight in emotion_map[emotion].items():
                shape_index = self.get_blendshape_index(shape_name)
                if shape_index is not None:
                    shapes[shape_index] = weight
                    
        return shapes
    
    def get_blendshape_index(self, shape_name: str) -> int:
        """获取混合形状索引(简化映射)"""
        shape_map = {
            "smile": 2,
            "frown": 4,
            "mouthOpen": 3,
            "blink_left": 0,
            "blink_right": 1,
            "brow_raiser": 6,
            "brow_lowerer": 7,
            "eyes_squint": 8,
            "jaw_drop": 9,
            "lip_tightener": 10,
            "inner_brow_raiser": 11
        }
        return shape_map.get(shape_name, 0)
    
    def get_wave_shapes(self) -> List[float]:
        """挥手动作的混合形状"""
        shapes = [0.0] * 52
        shapes[self.get_blendshape_index("smile")] = 0.5
        return shapes
    
    def get_nod_shapes(self) -> List[float]:
        """点头动作的混合形状"""
        shapes = [0.0] * 52
        shapes[self.get_blendshape_index("eyes_squint")] = 0.3
        return shapes
    
    def get_point_shapes(self) -> List[float]:
        """指向动作的混合形状"""
        shapes = [0.0] * 52
        shapes[self.get_blendshape_index("lip_tightener")] = 0.2
        return shapes
    
    async def send_control_data(self, websocket):
        """发送控制数据到客户端"""
        control_data = {
            "type": "control",
            "blendShapes": self.animation_state.blend_shapes,
            "headRotation": self.animation_state.head_rotation,
            "currentAction": self.animation_state.current_action,
            "emotion": self.animation_state.emotion,
            "timestamp": datetime.now().isoformat()
        }
        
        await websocket.send(json.dumps(control_data))
        
    async def broadcast(self, message: str):
        """广播消息给所有客户端"""
        if self.clients:
            await asyncio.gather(
                *[client.send(message) for client in self.clients],
                return_exceptions=True
            )
    
    async def run(self):
        """启动服务器"""
        print(f"启动WebSocket服务器在 {self.host}:{self.port}")
        async with websockets.serve(
            self.handle_client, 
            self.host, 
            self.port
        ):
            await asyncio.Future()  # 永久运行

# 使用示例
if __name__ == "__main__":
    server = UnityWebSocketServer()
    
    # 启动服务器
    asyncio.run(server.run())

第三部分:生产部署 - 微服务架构与云原生部署

3.1 Docker容器化配置

# Dockerfile
# 基础镜像
FROM nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04

# 设置环境变量
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV TZ=Asia/Shanghai

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    python3.10 \
    python3-pip \
    python3-dev \
    git \
    wget \
    curl \
    ffmpeg \
    libsm6 \
    libxext6 \
    libxrender-dev \
    libgl1-mesa-glx \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip3 install --no-cache-dir -r requirements.txt \
    && pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# 暴露端口
EXPOSE 8000 8765 1935

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动命令
CMD ["python3", "main.py"]
# docker-compose.yml
version: '3.8'

services:
  # API网关
  api-gateway:
    build: ./api-gateway
    ports:
      - "8000:8000"
    environment:
      - NODE_ENV=production
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
      - tts-service
      - animation-service
    networks:
      - digital-human-network
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: all
              capabilities: [gpu]

  # TTS服务
  tts-service:
    build: ./services/tts
    environment:
      - MODEL_PATH=/models/tts
      - CACHE_SIZE=1000
    volumes:
      - tts-models:/models/tts
      - tts-cache:/cache
    networks:
      - digital-human-network
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  # 动画生成服务
  animation-service:
    build: ./services/animation
    environment:
      - UNITY_WS_URL=ws://unity-controller:8765
      - MAX_CONCURRENT=10
    networks:
      - digital-human-network
    depends_on:
      - unity-controller

  # Unity WebSocket控制器
  unity-controller:
    build: ./services/unity-controller
    ports:
      - "8765:8765"
    networks:
      - digital-human-network

  # 直播推流服务
  stream-service:
    build: ./services/stream
    ports:
      - "1935:1935"  # RTMP
      - "8080:8080"  # HTTP-FLV
    environment:
      - RTMP_URL=rtmp://stream-service/live
    volumes:
      - stream-data:/data
    networks:
      - digital-human-network

  # Redis缓存
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    networks:
      - digital-human-network

  # PostgreSQL数据库
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=digitalhuman
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres-data:/var/lib/postgres```yaml
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - digital-human-network

  # 监控服务
  monitoring:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus-data:/prometheus
    ports:
      - "9090:9090"
    networks:
      - digital-human-network

  # 日志收集
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"
    volumes:
      - loki-data:/loki
    command: -config.file=/etc/loki/local-config.yaml
    networks:
      - digital-human-network

volumes:
  tts-models:
  tts-cache:
  stream-data:
  redis-data:
  postgres-data:
  prometheus-data:
  loki-data:

networks:
  digital-human-network:
    driver: bridge

3.2 Kubernetes部署配置

# k8s-deployment.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: digital-human
---
# ConfigMap配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: digital-human-config
  namespace: digital-human
data:
  app-config.yaml: |
    services:
      tts:
        endpoint: "http://tts-service.digital-human.svc.cluster.local:8001"
        model: "fastspeech2"
        language: "zh-cn"
      
      animation:
        endpoint: "http://animation-service.digital-human.svc.cluster.local:8002"
        max_concurrent: 20
      
      unity:
        websocket: "ws://unity-controller.digital-human.svc.cluster.local:8765"
      
      streaming:
        rtmp_endpoint: "rtmp://stream-service.digital-human.svc.cluster.local:1935/live"
        hls_endpoint: "http://stream-service.digital-human.svc.cluster.local:8080/live"
    
    monitoring:
      prometheus_endpoint: "http://prometheus.digital-human.svc.cluster.local:9090"
      metrics_port: 9091
    
    cache:
      redis_host: "redis-master.digital-human.svc.cluster.local"
      redis_port: 6379
      ttl: 3600
---
# Redis StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: redis
  namespace: digital-human
spec:
  serviceName: redis
  replicas: 3
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
        role: master
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        volumeMounts:
        - name: redis-data
          mountPath: /data
        command: ["redis-server", "--appendonly", "yes", "--cluster-enabled", "yes"]
        readinessProbe:
          tcpSocket:
            port: 6379
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          tcpSocket:
            port: 6379
          initialDelaySeconds: 15
          periodSeconds: 20
  volumeClaimTemplates:
  - metadata:
      name: redis-data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi
---
# TTS服务部署(GPU节点)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: tts-service
  namespace: digital-human
  labels:
    app: tts-service
spec:
  replicas: 2
  selector:
    matchLabels:
      app: tts-service
  template:
    metadata:
      labels:
        app: tts-service
    spec:
      nodeSelector:
        accelerator: nvidia-tesla-t4
      containers:
      - name: tts
        image: digitalhuman/tts-service:1.0.0
        ports:
        - containerPort: 8001
        env:
        - name: MODEL_PATH
          value: "/models/tts"
        - name: GPU_DEVICE
          value: "0"
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
            nvidia.com/gpu: 1
          limits:
            memory: "8Gi"
            cpu: "4"
            nvidia.com/gpu: 1
        volumeMounts:
        - name: tts-models
          mountPath: /models/tts
        - name: config
          mountPath: /app/config
        readinessProbe:
          httpGet:
            path: /health
            port: 8001
          initialDelaySeconds: 30
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 8001
          initialDelaySeconds: 60
          periodSeconds: 30
      volumes:
      - name: tts-models
        persistentVolumeClaim:
          claimName: tts-models-pvc
      - name: config
        configMap:
          name: digital-human-config
---
# TTS服务Service
apiVersion: v1
kind: Service
metadata:
  name: tts-service
  namespace: digital-human
spec:
  selector:
    app: tts-service
  ports:
  - port: 8001
    targetPort: 8001
  type: ClusterIP
---
# 动画生成服务HPA配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: animation-service-hpa
  namespace: digital-human
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: animation-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
---
# API网关Ingress配置
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: digital-human-ingress
  namespace: digital-human
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
  tls:
  - hosts:
    - api.digitalhuman.com
    secretName: digital-human-tls
  rules:
  - host: api.digitalhuman.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: api-gateway
            port:
              number: 8000
---
# 监控ServiceMonitor(Prometheus Operator)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: digital-human-monitor
  namespace: digital-human
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
      app.kubernetes.io/part-of: digital-human
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics
  namespaceSelector:
    matchNames:
    - digital-human

3.3 生产级API网关与监控

# api_gateway.py
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import APIKeyHeader
from contextlib import asynccontextmanager
import asyncio
import aioredis
import logging
from typing import Dict, List, Optional
import time
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.requests import Request
from starlette.responses import Response
import uvicorn

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Prometheus指标
REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP Requests',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint']
)

ACTIVE_SESSIONS = Counter(
    'active_sessions_total',
    'Total active streaming sessions'
)

# API密钥验证
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, redis_pool, limit: int = 100, window: int = 60):
        self.redis = redis_pool
        self.limit = limit
        self.window = window
        
    async def is_allowed(self, key: str) -> bool:
        """检查是否允许请求"""
        current = int(time.time())
        window_start = current - self.window
        
        # 使用Redis pipeline提高性能
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.zadd(key, {str(current): current})
        pipe.expire(key, self.window)
        
        results = await pipe.execute()
        request_count = results[1]
        
        return request_count <= self.limit

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 启动时
    logger.info("启动API网关...")
    
    # 初始化Redis连接池
    app.state.redis = await aioredis.from_url(
        "redis://redis-master.digital-human.svc.cluster.local:6379",
        encoding="utf-8",
        decode_responses=True
    )
    
    # 初始化速率限制器
    app.state.rate_limiter = RateLimiter(app.state.redis)
    
    # 初始化服务发现
    app.state.service_registry = {
        "tts": "http://tts-service:8001",
        "animation": "http://animation-service:8002",
        "streaming": "http://stream-service:8080"
    }
    
    # 健康检查状态
    app.state.healthy_services = set()
    
    # 启动健康检查任务
    app.state.health_check_task = asyncio.create_task(
        health_check_services(app)
    )
    
    yield
    
    # 关闭时
    logger.info("关闭API网关...")
    app.state.health_check_task.cancel()
    await app.state.redis.close()

app = FastAPI(
    title="数字人直播API网关",
    description="生产环境数字人直播系统API网关",
    version="1.0.0",
    lifespan=lifespan
)

# 中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://digitalhuman.com", "https://admin.digitalhuman.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["api.digitalhuman.com", "localhost"]
)

app.add_middleware(GZipMiddleware, minimum_size=1000)

# 依赖项
async def verify_api_key(api_key: str = Depends(api_key_header)):
    """验证API密钥"""
    if not api_key:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="API密钥缺失"
        )
    
    # 从Redis验证密钥
    redis = app.state.redis
    key_valid = await redis.get(f"api_key:{api_key}")
    
    if not key_valid:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的API密钥"
        )
    
    return api_key

async def rate_limit(request: Request, api_key: str = Depends(verify_api_key)):
    """速率限制"""
    client_ip = request.client.host
    endpoint = request.url.path
    
    # 基于IP和端点的限流
    limit_key = f"rate_limit:{client_ip}:{endpoint}"
    
    if not await app.state.rate_limiter.is_allowed(limit_key):
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="请求过于频繁,请稍后再试"
        )
    
    return True

# 中间件:监控和日志
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    """监控HTTP请求"""
    start_time = time.time()
    method = request.method
    endpoint = request.url.path
    
    try:
        response = await call_next(request)
        
        # 记录指标
        REQUEST_COUNT.labels(
            method=method,
            endpoint=endpoint,
            status=response.status_code
        ).inc()
        
        REQUEST_LATENCY.labels(
            method=method,
            endpoint=endpoint
        ).observe(time.time() - start_time)
        
        # 记录访问日志
        logger.info(
            f"{method} {endpoint} {response.status_code} "
            f"{time.time() - start_time:.3f}s"
        )
        
        return response
        
    except Exception as e:
        logger.error(f"请求处理错误: {e}")
        REQUEST_COUNT.labels(
            method=method,
            endpoint=endpoint,
            status=500
        ).inc()
        raise

# 健康检查任务
async def health_check_services(app: FastAPI):
    """定期健康检查服务"""
    import aiohttp
    
    while True:
        try:
            async with aiohttp.ClientSession() as session:
                for service_name, url in app.state.service_registry.items():
                    try:
                        health_url = f"{url}/health"
                        async with session.get(health_url, timeout=5) as response:
                            if response.status == 200:
                                app.state.healthy_services.add(service_name)
                            else:
                                app.state.healthy_services.discard(service_name)
                    except Exception as e:
                        logger.warning(f"服务 {service_name} 健康检查失败: {e}")
                        app.state.healthy_services.discard(service_name)
            
            # 更新Redis中的服务状态
            await app.state.redis.set(
                "service_health",
                ",".join(app.state.healthy_services),
                ex=30
            )
            
        except Exception as e:
            logger.error(f"健康检查任务错误: {e}")
        
        await asyncio.sleep(30)  # 每30秒检查一次

# 路由
@app.get("/health")
async def health_check():
    """健康检查端点"""
    return {
        "status": "healthy",
        "timestamp": time.time(),
        "services": list(app.state.healthy_services)
    }

@app.get("/metrics")
async def metrics():
    """Prometheus指标端点"""
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

@app.post("/api/v1/live/start")
async def start_live_session(
    request: Request,
    config: Dict,
    rate_limited: bool = Depends(rate_limit)
):
    """开始直播会话"""
    # 验证服务健康状态
    required_services = {"tts", "animation", "streaming"}
    if not required_services.issubset(app.state.healthy_services):
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="依赖服务不可用"
        )
    
    # 生成会话ID
    import uuid
    session_id = str(uuid.uuid4())
    
    # 存储会话配置
    redis = app.state.redis
    session_key = f"session:{session_id}"
    
    await redis.hset(session_key, mapping={
        "config": str(config),
        "status": "initializing",
        "created_at": str(time.time()),
        "client_ip": request.client.host
    })
    
    await redis.expire(session_key, 3600)  # 1小时过期
    
    # 更新活跃会话计数
    ACTIVE_SESSIONS.inc()
    
    return {
        "session_id": session_id,
        "status": "created",
        "stream_url": f"rtmp://stream-service/live/{session_id}"
    }

@app.post("/api/v1/live/{session_id}/speak")
async def trigger_speech(
    session_id: str,
    text: str,
    emotion: Optional[str] = "neutral",
    rate_limited: bool = Depends(rate_limit)
):
    """触发数字人语音"""
    import aiohttp
    
    # 验证会话
    redis = app.state.redis
    session_key = f"session:{session_id}"
    session_exists = await redis.exists(session_key)
    
    if not session_exists:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="会话不存在或已过期"
        )
    
    # 调用TTS服务
    tts_url = f"{app.state.service_registry['tts']}/synthesize"
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(tts_url, json={
                "text": text,
                "emotion": emotion,
                "session_id": session_id
            }, timeout=10) as response:
                
                if response.status != 200:
                    raise HTTPException(
                        status_code=response.status,```python
                    tts_result = await response.json()
                    
        except aiohttp.ClientError as e:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail=f"TTS服务调用失败: {str(e)}"
            )
    
    # 调用动画服务
    animation_url = f"{app.state.service_registry['animation']}/animate"
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(animation_url, json={
                "audio_url": tts_result["audio_url"],
                "text": text,
                "emotion": emotion,
                "session_id": session_id
            }, timeout=15) as response:
                
                if response.status != 200:
                    raise HTTPException(
                        status_code=response.status,
                        detail="动画生成失败"
                    )
                    
                animation_result = await response.json()
                
        except aiohttp.ClientError as e:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail=f"动画服务调用失败: {str(e)}"
            )
    
    # 更新会话状态
    await redis.hset(session_key, "last_action", "speak")
    await redis.hset(session_key, "last_action_time", str(time.time()))
    
    return {
        "status": "success",
        "audio_url": tts_result["audio_url"],
        "video_url": animation_result["video_url"],
        "duration": animation_result["duration"]
    }

@app.post("/api/v1/live/{session_id}/action")
async def trigger_action(
    session_id: str,
    action: str,
    rate_limited: bool = Depends(rate_limit)
):
    """触发数字人动作"""
    import aiohttp
    
    # 验证会话
    redis = app.state.redis
    session_key = f"session:{session_id}"
    session_exists = await redis.exists(session_key)
    
    if not session_exists:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="会话不存在或已过期"
        )
    
    # 调用Unity控制器
    unity_url = f"{app.state.service_registry['unity']}/action"
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(unity_url, json={
                "action": action,
                "session_id": session_id
            }, timeout=5) as response:
                
                if response.status != 200:
                    raise HTTPException(
                        status_code=response.status,
                        detail="动作执行失败"
                    )
                    
        except aiohttp.ClientError as e:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail=f"Unity服务调用失败: {str(e)}"
            )
    
    # 更新会话状态
    await redis.hset(session_key, "last_action", f"action:{action}")
    await redis.hset(session_key, "last_action_time", str(time.time()))
    
    return {"status": "success", "action": action}

@app.delete("/api/v1/live/{session_id}")
async def end_live_session(
    session_id: str,
    rate_limited: bool = Depends(rate_limit)
):
    """结束直播会话"""
    redis = app.state.redis
    session_key = f"session:{session_id}"
    
    # 删除会话数据
    deleted = await redis.delete(session_key)
    
    if deleted:
        # 减少活跃会话计数
        ACTIVE_SESSIONS.dec()
        
        # 清理相关资源
        await redis.delete(f"session:{session_id}:chat")
        await redis.delete(f"session:{session_id}:metrics")
        
        return {"status": "success", "message": "会话已结束"}
    else:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="会话不存在"
        )

@app.get("/api/v1/live/{session_id}/stats")
async def get_session_stats(session_id: str):
    """获取会话统计信息"""
    redis = app.state.redis
    
    # 获取基础会话信息
    session_key = f"session:{session_id}"
    session_info = await redis.hgetall(session_key)
    
    if not session_info:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="会话不存在"
        )
    
    # 获取聊天统计
    chat_key = f"session:{session_id}:chat"
    chat_count = await redis.llen(chat_key)
    
    # 获取性能指标
    metrics_key = f"session:{session_id}:metrics"
    metrics = await redis.hgetall(metrics_key)
    
    return {
        "session_id": session_id,
        "status": session_info.get("status", "unknown"),
        "created_at": session_info.get("created_at"),
        "last_action": session_info.get("last_action"),
        "last_action_time": session_info.get("last_action_time"),
        "chat_messages": chat_count,
        "metrics": metrics
    }

# WebSocket实时聊天
from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    """WebSocket连接管理器"""
    
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}
        
    async def connect(self, websocket: WebSocket, session_id: str):
        await websocket.accept()
        if session_id not in self.active_connections:
            self.active_connections[session_id] = []
        self.active_connections[session_id].append(websocket)
        
    def disconnect(self, websocket: WebSocket, session_id: str):
        if session_id in self.active_connections:
            self.active_connections[session_id].remove(websocket)
            if not self.active_connections[session_id]:
                del self.active_connections[session_id]
                
    async def broadcast(self, message: str, session_id: str):
        """广播消息到指定会话的所有连接"""
        if session_id in self.active_connections:
            for connection in self.active_connections[session_id]:
                try:
                    await connection.send_text(message)
                except:
                    pass

manager = ConnectionManager()

@app.websocket("/ws/live/{session_id}/chat")
async def websocket_chat(websocket: WebSocket, session_id: str):
    """WebSocket聊天接口"""
    await manager.connect(websocket, session_id)
    
    redis = app.state.redis
    chat_key = f"session:{session_id}:chat"
    
    try:
        while True:
            # 接收消息
            data = await websocket.receive_text()
            message_data = json.loads(data)
            
            # 验证消息格式
            if "user_id" not in message_data or "content" not in message_data:
                await websocket.send_text(json.dumps({
                    "error": "Invalid message format"
                }))
                continue
            
            # 存储到Redis
            message_id = await redis.incr(f"message_id:{session_id}")
            message_data["id"] = message_id
            message_data["timestamp"] = time.time()
            
            await redis.rpush(chat_key, json.dumps(message_data))
            await redis.ltrim(chat_key, -100, -1)  # 只保留最近100条
            
            # 广播消息
            broadcast_msg = json.dumps({
                "type": "chat_message",
                "data": message_data
            })
            await manager.broadcast(broadcast_msg, session_id)
            
            # 触发数字人回复(可选)
            if message_data.get("require_reply", False):
                # 这里可以集成对话引擎
                pass
                
    except WebSocketDisconnect:
        manager.disconnect(websocket, session_id)
    except Exception as e:
        logger.error(f"WebSocket错误: {e}")
        manager.disconnect(websocket, session_id)

# 错误处理
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    """HTTP异常处理"""
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.detail,
            "path": request.url.path,
            "timestamp": time.time()
        }
    )

@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
    """通用异常处理"""
    logger.error(f"未处理异常: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={
            "error": "Internal server error",
            "path": request.url.path,
            "timestamp": time.time()
        }
    )

if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        log_level="info",
        access_log=True,
        timeout_keep_alive=30
    )

3.4 监控与告警配置

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

rule_files:
  - "alerts.yml"

scrape_configs:
  - job_name: 'digital-human-api'
    static_configs:
      - targets: ['api-gateway:9091']
    metrics_path: /metrics
    scrape_interval: 10s

  - job_name: 'tts-service'
    static_configs:
      - targets: ['tts-service:9091']
    scrape_interval: 10s

  - job_name: 'animation-service'
    static_configs:
      - targets: ['animation-service:9091']
    scrape_interval: 10s

  - job_name: 'redis'
    static_configs:
      - targets: ['redis-master:9121']
    scrape_interval: 15s

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres:9187']
    scrape_interval: 30s

  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
        action: replace
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
        target_label: __address__
# alerts.yml
groups:
  - name: digital-human-alerts
    rules:
      # API网关告警
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
        for: 2m
        labels:
          severity: critical
          service: api-gateway
        annotations:
          summary: "高错误率检测"
          description: "API网关错误率超过5% (当前值: {{ $value }})"
          
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
        for: 2m
        labels:
          severity: warning
          service: api-gateway
        annotations:
          summary: "高延迟检测"
          description: "API网关95%分位延迟超过2秒 (当前值: {{ $value }}s)"
          
      # TTS服务告警
      - alert: TTSServiceDown
        expr: up{job="tts-service"} == 0
        for: 1m
        labels:
          severity: critical
          service: tts
        annotations:
          summary: "TTS服务宕机"
          description: "TTS服务已宕机超过1分钟"
          
      - alert: TTSHighGPUUsage
        expr: DCGM_FI_DEV_GPU_UTIL{job="tts-service"} > 90
        for: 5m
        labels:
          severity: warning
          service: tts
        annotations:
          summary: "TTS服务GPU使用率高"
          description: "TTS服务GPU使用率超过90% (当前值: {{ $value }}%)"
          
      # 动画服务告警
      - alert: AnimationQueueFull
        expr: animation_queue_size > 50
        for: 2m
        labels:
          severity: warning
          service: animation
        annotations:
          summary: "动画队列积压"
          description: "动画生成队列超过50个任务 (当前值: {{ $value }})"
          
      # Redis告警
      - alert: RedisMemoryHigh
        expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.8
        for: 5m
        labels:
          severity: warning
          service: redis
        annotations:
          summary: "Redis内存使用率高"
          description: "Redis内存使用率超过80% (当前值: {{ $value }}%)"
          
      - alert: RedisDown
        expr: redis_up == 0
        for: 1m
        labels:
          severity: critical
          service: redis
        annotations:
          summary: "Redis服务宕机"
          description: "Redis服务已宕机超过1分钟"
          
      # 会话监控
      - alert: TooManyActiveSessions
        expr: active_sessions_total > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "活跃会话过多"
          description: "活跃会话数超过1000 (当前值: {{ $value }})"
          
      - alert: SessionCreationFailed
        expr: rate(session_creation_failed_total[5m]) > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "会话创建失败率高"
          description: "会话创建失败率过高 (当前值: {{ $value }})"
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐