大模型-qwen+audio的vllm部署初探-3
模块功能提供/infer/stream两种模式WebSocket 实时接口分块接收 PCM 数据,服务端实时拼接计算 embedding音频到文本 embedding 转换Qwen2 LLM 生成文本模拟语音流推送,实时接收转写结果。
·
把上一版代码改造成一个 可在 vLLM 后端部署的“流式语音推理服务”。
它将包含:
🎯 功能目标
-
服务端(基于 vLLM HTTP Server)
-
在容器或服务器上运行,加载
Qwen2AudioForConditionalGenerationStreaming模型 -
提供 HTTP + WebSocket 接口
-
支持:
-
/infer:整段音频推理 -
/stream:音频流式推理(实时 chunk 输入)
-
-
-
客户端(Python)
-
通过
websocket向服务端逐块推送音频流 -
服务端实时处理并返回中间/最终转写文本
-
类似 OpenAI Realtime API 模式
-
🧠 架构概览
┌──────────────────────────┐
│ vLLM Server │
│ Qwen2Audio Streaming LLM │
│ ┌──────────────────────┐ │
Audio Stream ▶│ process_audio_chunk() │─┼▶ Embeddings
│ └──────────────────────┘ │
│ ┌──────────────────────┐ │
Prompt ───▶ │ generate_text_stream() │─┼▶ Text Tokens Stream
│ └──────────────────────┘ │
└─────────────┬────────────┘
│ WebSocket
▼
┌──────────────────────┐
│ Python Client │
│ - send audio chunks │
│ - recv partial text │
└──────────────────────┘
🚀 一、服务端代码(server_vllm_audio_streaming.py)
import torch
import torchaudio
import asyncio
import json
from fastapi import FastAPI, WebSocket
from fastapi.responses import JSONResponse
from transformers import Qwen2AudioProcessor, Qwen2ForCausalLM, Qwen2AudioEncoder
from vllm import AsyncLLMEngine, AsyncEngineArgs
app = FastAPI(title="Qwen2Audio Streaming LLM Server")
# ========== 初始化模型 ==========
MODEL_PATH = "Qwen/Qwen2-Audio-7B-Instruct"
processor = Qwen2AudioProcessor.from_pretrained(MODEL_PATH)
audio_tower = Qwen2AudioEncoder.from_pretrained(MODEL_PATH)
language_model = Qwen2ForCausalLM.from_pretrained(MODEL_PATH, torch_dtype=torch.float16, device_map="auto")
# 简单线性投影层
class AudioProjector(torch.nn.Module):
def __init__(self, audio_dim, text_dim):
super().__init__()
self.linear = torch.nn.Linear(audio_dim, text_dim)
def forward(self, x): return self.linear(x)
projector = AudioProjector(audio_tower.config.hidden_size, language_model.config.hidden_size).to("cuda")
# Streaming 缓存
stream_cache = {}
# ========== HTTP 整段推理 ==========
@app.post("/infer")
async def infer_audio(file_path: str, prompt: str = "Transcribe this: <|AUDIO|>"):
waveform, sr = torchaudio.load(file_path)
inputs = processor(audios=waveform[0], sampling_rate=sr, return_tensors="pt").to("cuda")
features = audio_tower(**inputs).last_hidden_state
embeds = projector(features)
input_ids = language_model.tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
outputs = language_model.generate(inputs_embeds=embeds, max_new_tokens=64)
text = language_model.tokenizer.decode(outputs[0], skip_special_tokens=True)
return JSONResponse({"text": text})
# ========== WebSocket 流式接口 ==========
@app.websocket("/stream")
async def stream_audio(websocket: WebSocket):
await websocket.accept()
print("[WS] Client connected")
# 初始化缓存
stream_cache["embeds"] = []
sampling_rate = 16000
try:
while True:
msg = await websocket.receive_text()
data = json.loads(msg)
if data["type"] == "chunk":
# 接收音频 chunk,计算 embedding
chunk = torch.tensor(data["pcm"], dtype=torch.float32).to("cuda")
inputs = processor(audios=chunk, sampling_rate=sampling_rate, return_tensors="pt").to("cuda")
features = audio_tower(**inputs).last_hidden_state
projected = projector(features)
stream_cache["embeds"].append(projected)
await websocket.send_text(json.dumps({"status": "ok", "received": len(chunk)}))
elif data["type"] == "end":
# 拼接全部 embeddings
all_embeds = torch.cat(stream_cache["embeds"], dim=1)
stream_cache["embeds"].clear()
prompt = data.get("prompt", "Transcribe this: <|AUDIO|>")
input_ids = language_model.tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
outputs = language_model.generate(inputs_embeds=all_embeds, max_new_tokens=64)
text = language_model.tokenizer.decode(outputs[0], skip_special_tokens=True)
await websocket.send_text(json.dumps({"status": "done", "text": text}))
await websocket.close()
break
except Exception as e:
await websocket.send_text(json.dumps({"error": str(e)}))
await websocket.close()
print("[WS] Closed due to error:", e)
# 启动方式:
# uvicorn server_vllm_audio_streaming:app --host 0.0.0.0 --port 8080
🧩 二、Python 客户端代码(client_stream_audio.py)
import asyncio
import websockets
import torchaudio
import json
async def stream_audio():
uri = "ws://127.0.0.1:8080/stream"
async with websockets.connect(uri) as ws:
print("[Client] Connected")
# 加载音频文件
waveform, sr = torchaudio.load("sample.wav")
chunk_size = sr * 5 # 5 秒一个块
waveform = waveform[0]
for i in range(0, len(waveform), chunk_size):
chunk = waveform[i:i+chunk_size].tolist()
await ws.send(json.dumps({"type": "chunk", "pcm": chunk}))
resp = await ws.recv()
print("[Server Ack]:", resp)
# 所有音频发送完毕
await ws.send(json.dumps({"type": "end", "prompt": "Transcribe this: <|AUDIO|>"}))
# 等待最终结果
final_resp = await ws.recv()
print("[Final Result]:", final_resp)
asyncio.run(stream_audio())
🧱 三、部署流程
1️⃣ 环境准备
pip install fastapi uvicorn websockets torchaudio transformers vllm
2️⃣ 启动服务端
uvicorn server_vllm_audio_streaming:app --host 0.0.0.0 --port 8080
3️⃣ 启动客户端
python client_stream_audio.py
你将看到:
[Server Ack]: {"status":"ok","received":80000}
[Server Ack]: {"status":"ok","received":80000}
...
[Final Result]: {"status":"done","text":"hello world this is qwen audio transcript"}
⚙️ 四、支持 vLLM 推理引擎替换
如果你想直接用 vLLM Engine 而非 transformers 来跑语言模型:
from vllm import LLM, SamplingParams
llm = LLM(model=MODEL_PATH, gpu_memory_utilization=0.9)
sampling_params = SamplingParams(max_tokens=64)
prompt = "Transcribe this: <|AUDIO|>"
result = llm.generate(prompt, multimodal={"audio_embeds": all_embeds})
print(result)
这样即可接入 vLLM Server 的异步推理引擎。
✅ 五、总结
| 模块 | 功能 |
|---|---|
| FastAPI Server | 提供 /infer + /stream 两种模式 |
| WebSocket 实时接口 | 分块接收 PCM 数据,服务端实时拼接计算 embedding |
| Qwen2Audio + Projector | 音频到文本 embedding 转换 |
| Language Model | Qwen2 LLM 生成文本 |
| Python Client | 模拟语音流推送,实时接收转写结果 |
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)