把上一版代码改造成一个 可在 vLLM 后端部署的“流式语音推理服务”
它将包含:


🎯 功能目标

  1. 服务端(基于 vLLM HTTP Server)

    • 在容器或服务器上运行,加载 Qwen2AudioForConditionalGenerationStreaming 模型

    • 提供 HTTP + WebSocket 接口

    • 支持:

      • /infer:整段音频推理

      • /stream:音频流式推理(实时 chunk 输入)

  2. 客户端(Python)

    • 通过 websocket 向服务端逐块推送音频流

    • 服务端实时处理并返回中间/最终转写文本

    • 类似 OpenAI Realtime API 模式


🧠 架构概览

             ┌──────────────────────────┐
             │        vLLM Server        │
             │  Qwen2Audio Streaming LLM │
             │ ┌──────────────────────┐ │
Audio Stream ▶│ process_audio_chunk() │─┼▶ Embeddings
             │ └──────────────────────┘ │
             │ ┌──────────────────────┐ │
Prompt  ───▶ │ generate_text_stream() │─┼▶ Text Tokens Stream
             │ └──────────────────────┘ │
             └─────────────┬────────────┘
                           │ WebSocket
                           ▼
                ┌──────────────────────┐
                │  Python Client       │
                │  - send audio chunks │
                │  - recv partial text │
                └──────────────────────┘

🚀 一、服务端代码(server_vllm_audio_streaming.py

import torch
import torchaudio
import asyncio
import json
from fastapi import FastAPI, WebSocket
from fastapi.responses import JSONResponse
from transformers import Qwen2AudioProcessor, Qwen2ForCausalLM, Qwen2AudioEncoder
from vllm import AsyncLLMEngine, AsyncEngineArgs

app = FastAPI(title="Qwen2Audio Streaming LLM Server")

# ========== 初始化模型 ==========
MODEL_PATH = "Qwen/Qwen2-Audio-7B-Instruct"

processor = Qwen2AudioProcessor.from_pretrained(MODEL_PATH)
audio_tower = Qwen2AudioEncoder.from_pretrained(MODEL_PATH)
language_model = Qwen2ForCausalLM.from_pretrained(MODEL_PATH, torch_dtype=torch.float16, device_map="auto")

# 简单线性投影层
class AudioProjector(torch.nn.Module):
    def __init__(self, audio_dim, text_dim):
        super().__init__()
        self.linear = torch.nn.Linear(audio_dim, text_dim)
    def forward(self, x): return self.linear(x)

projector = AudioProjector(audio_tower.config.hidden_size, language_model.config.hidden_size).to("cuda")

# Streaming 缓存
stream_cache = {}

# ========== HTTP 整段推理 ==========
@app.post("/infer")
async def infer_audio(file_path: str, prompt: str = "Transcribe this: <|AUDIO|>"):
    waveform, sr = torchaudio.load(file_path)
    inputs = processor(audios=waveform[0], sampling_rate=sr, return_tensors="pt").to("cuda")
    features = audio_tower(**inputs).last_hidden_state
    embeds = projector(features)

    input_ids = language_model.tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
    outputs = language_model.generate(inputs_embeds=embeds, max_new_tokens=64)
    text = language_model.tokenizer.decode(outputs[0], skip_special_tokens=True)
    return JSONResponse({"text": text})

# ========== WebSocket 流式接口 ==========
@app.websocket("/stream")
async def stream_audio(websocket: WebSocket):
    await websocket.accept()
    print("[WS] Client connected")

    # 初始化缓存
    stream_cache["embeds"] = []
    sampling_rate = 16000

    try:
        while True:
            msg = await websocket.receive_text()
            data = json.loads(msg)

            if data["type"] == "chunk":
                # 接收音频 chunk,计算 embedding
                chunk = torch.tensor(data["pcm"], dtype=torch.float32).to("cuda")
                inputs = processor(audios=chunk, sampling_rate=sampling_rate, return_tensors="pt").to("cuda")
                features = audio_tower(**inputs).last_hidden_state
                projected = projector(features)
                stream_cache["embeds"].append(projected)

                await websocket.send_text(json.dumps({"status": "ok", "received": len(chunk)}))

            elif data["type"] == "end":
                # 拼接全部 embeddings
                all_embeds = torch.cat(stream_cache["embeds"], dim=1)
                stream_cache["embeds"].clear()
                prompt = data.get("prompt", "Transcribe this: <|AUDIO|>")

                input_ids = language_model.tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
                outputs = language_model.generate(inputs_embeds=all_embeds, max_new_tokens=64)
                text = language_model.tokenizer.decode(outputs[0], skip_special_tokens=True)

                await websocket.send_text(json.dumps({"status": "done", "text": text}))
                await websocket.close()
                break

    except Exception as e:
        await websocket.send_text(json.dumps({"error": str(e)}))
        await websocket.close()
        print("[WS] Closed due to error:", e)

# 启动方式:
# uvicorn server_vllm_audio_streaming:app --host 0.0.0.0 --port 8080

🧩 二、Python 客户端代码(client_stream_audio.py

import asyncio
import websockets
import torchaudio
import json

async def stream_audio():
    uri = "ws://127.0.0.1:8080/stream"
    async with websockets.connect(uri) as ws:
        print("[Client] Connected")

        # 加载音频文件
        waveform, sr = torchaudio.load("sample.wav")
        chunk_size = sr * 5  # 5 秒一个块
        waveform = waveform[0]

        for i in range(0, len(waveform), chunk_size):
            chunk = waveform[i:i+chunk_size].tolist()
            await ws.send(json.dumps({"type": "chunk", "pcm": chunk}))
            resp = await ws.recv()
            print("[Server Ack]:", resp)

        # 所有音频发送完毕
        await ws.send(json.dumps({"type": "end", "prompt": "Transcribe this: <|AUDIO|>"}))

        # 等待最终结果
        final_resp = await ws.recv()
        print("[Final Result]:", final_resp)

asyncio.run(stream_audio())

🧱 三、部署流程

1️⃣ 环境准备

pip install fastapi uvicorn websockets torchaudio transformers vllm

2️⃣ 启动服务端

uvicorn server_vllm_audio_streaming:app --host 0.0.0.0 --port 8080

3️⃣ 启动客户端

python client_stream_audio.py

你将看到:

[Server Ack]: {"status":"ok","received":80000}
[Server Ack]: {"status":"ok","received":80000}
...
[Final Result]: {"status":"done","text":"hello world this is qwen audio transcript"}

⚙️ 四、支持 vLLM 推理引擎替换

如果你想直接用 vLLM Engine 而非 transformers 来跑语言模型:

from vllm import LLM, SamplingParams
llm = LLM(model=MODEL_PATH, gpu_memory_utilization=0.9)

sampling_params = SamplingParams(max_tokens=64)
prompt = "Transcribe this: <|AUDIO|>"
result = llm.generate(prompt, multimodal={"audio_embeds": all_embeds})
print(result)

这样即可接入 vLLM Server 的异步推理引擎。


✅ 五、总结

模块 功能
FastAPI Server 提供 /infer + /stream 两种模式
WebSocket 实时接口 分块接收 PCM 数据,服务端实时拼接计算 embedding
Qwen2Audio + Projector 音频到文本 embedding 转换
Language Model Qwen2 LLM 生成文本
Python Client 模拟语音流推送,实时接收转写结果
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐