构建极低延迟的实时语音识别系统:基于 Qwen3-ASR (vLLM) 与 Silero VAD 的 C/S 架构实战
构建极低延迟的实时语音识别系统:基于 Qwen3-ASR (vLLM) 与 Silero VAD 的 C/S 架构实战
在本地部署大语言模型和语音模型越来越流行的今天,想要在 Windows 下优雅地跑起一套**极低延迟的流式语音识别(Streaming ASR)**系统,往往会遇到一个非常头疼的底层硬件痛点:WSL2 环境默认无法直接访问宿主机的麦克风。
虽然可以通过 usbipd 强行进行 USB 直通,但配置繁琐且极不稳定。为了绕开这个大坑,并最大化利用本地 GPU 资源,本文将带你从零构建一套优雅的 C/S(客户端/服务端)架构实时语音识别工程:
- 客户端(Windows 宿主机):仅使用 CPU 进行轻量级的麦克风录音与 Silero-VAD 语音活动检测,通过 WebSocket 发送有效音频流。
- 服务端(WSL2 容器):满血运行 Qwen3-ASR (vLLM 后端),纯异步接收音频,利用 GPU 算力进行流式转写并实时推回结果。
核心架构设计与亮点
- 算力解耦:客户端零 GPU 依赖,所有的重度推理全部交给服务端。
- 防吞字防切音(Ring Buffer):客户端引入了环形缓冲区,在 VAD 触发前预存约 480ms 的环境音“垫片”,在开始说话的瞬间与语音一起打包发送,彻底解决首字声母被吞的问题。
- 原生 vLLM 异步流式集成:服务端遵循 Qwen3 官方例程,通过 500ms 的音频缓冲池与
asyncio.to_thread完美结合,既保证了推理效率,又彻底避免了密集计算对 WebSocket 网络的阻塞。
一、 服务端实现 (WSL2 环境)
1. 环境准备
在 WSL2 (Ubuntu) 环境中,确保你已经安装了支持 vLLM 后端的 qwen-asr 以及 websockets:
pip install -U qwen-asr[vllm] websockets numpy
2. 避坑指南:vLLM 的三大天坑
在编写服务端代码时,我们曾踩过三个极其经典的坑,在这里提前排雷:
- 相对路径加载失败:加载本地 Hugging Face 模型时,路径中不能带有
../,这会触发库的命名规范校验。必须使用绝对路径。 - 多进程无限套娃 (
RuntimeError: An attempt has been made to start a new process...):vLLM 在 Linux 下使用spawn模式启动子进程分配显存。如果在文件最外层实例化模型,会导致子进程无限重复导入脚本。必须将模型实例化代码放入main()内部,并被if __name__ == "__main__":保护。 - KV Cache 显存溢出 (
ValueError: To serve at least one request...):语音模型默认的最大序列长度 (max_model_len) 极长,导致 vLLM 初始化时预分配的 KV Cache 撑爆显存。对于流式语音片段,强制将max_model_len设为 4096 即可完美解决。
3. 服务端代码 (server_asr.py)
import asyncio
import websockets
import json
import logging
import numpy as np
from qwen_asr import Qwen3ASRModel
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# 绝对路径配置,切忌使用相对路径
ASR_MODEL_PATH = "/home/你的用户名/models/Qwen3-ASR-1.7B"
# 服务端缓冲大小:500ms (16000Hz * 2bytes * 0.5s)
CHUNK_BYTES_THRESHOLD = 16000
async def main():
# ================= 1. 模型初始化 (必须在 main 内部) =================
logging.info(f"正在加载模型 {ASR_MODEL_PATH} ...")
asr = Qwen3ASRModel.LLM(
model=ASR_MODEL_PATH,
gpu_memory_utilization=0.9,
max_new_tokens=32,
max_model_len=4096 # 【关键】强行限制上下文长度,极大节省 KV Cache 显存!
)
logging.info("✅ 模型加载完成。等待客户端连接...")
# ================= 2. WebSocket 处理闭包 =================
async def handle_client(websocket):
client_ip = websocket.remote_address
logging.info(f"🔗 客户端已连接: {client_ip}")
state = None
audio_buffer = bytearray()
try:
async for message in websocket:
# ---------- 接收控制信令 (JSON) ----------
if isinstance(message, str):
data = json.loads(message)
if data.get("event") == "start":
logging.info("\n[新会话] VAD 触发,初始化 Streaming State...")
# 初始化流式状态(放入后台线程防阻塞)
state = await asyncio.to_thread(
asr.init_streaming_state,
unfixed_chunk_num=2,
unfixed_token_num=5,
chunk_size_sec=2.0
)
audio_buffer.clear()
elif data.get("event") == "stop":
if state:
# 处理不足 500ms 的尾部残余数据
if len(audio_buffer) > 0:
wav16k = np.frombuffer(audio_buffer, dtype=np.int16).astype(np.float32) / 32768.0
await asyncio.to_thread(asr.streaming_transcribe, wav16k, state)
audio_buffer.clear()
# 获取最终断句结果
await asyncio.to_thread(asr.finish_streaming_transcribe, state)
final_text = state.text
logging.info(f"[最终] {final_text}")
await websocket.send(json.dumps({"type": "final", "text": final_text}))
state = None
# ---------- 接收音频流 (Bytes) ----------
elif isinstance(message, bytes):
if state:
audio_buffer.extend(message)
# 积攒够 500ms,送入模型推理一次
if len(audio_buffer) >= CHUNK_BYTES_THRESHOLD:
wav16k = np.frombuffer(audio_buffer, dtype=np.int16).astype(np.float32) / 32768.0
audio_buffer.clear()
await asyncio.to_thread(asr.streaming_transcribe, wav16k, state)
if state.text:
await websocket.send(json.dumps({"type": "partial", "text": state.text}))
except websockets.exceptions.ConnectionClosed:
logging.warning(f"❌ 客户端断开连接")
finally:
if state:
await asyncio.to_thread(asr.finish_streaming_transcribe, state)
# 关闭 ping_interval 确保流式空闲期不断连
async with websockets.serve(handle_client, "0.0.0.0", 8765, ping_interval=None):
logging.info("🚀 WSL2 ASR WebSocket 服务已启动: ws://0.0.0.0:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
二、 客户端实现 (Windows 环境)
1. 环境准备与 VAD 离线加载
在 Windows 端,我们只需安装基础的录音和网络库,以及 CPU 版的 PyTorch 即可:
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cpu
pip install pyaudio numpy websockets
⚠️ 离线加载 VAD 模型避坑:
在国内网络环境下,torch.hub.load 自动下载 GitHub 资源经常会报 ssl.SSLEOFError。解决方案是去 snakers4/silero-vad 手动下载源码压缩包解压,并使用 source='local' 从本地加载。
2. 客户端代码 (client_vad.py)
import pyaudio
import numpy as np
import torch
import asyncio
import websockets
import json
import collections
# ================= 1. 配置参数 =================
WS_URL = "ws://127.0.0.1:8765"
SAMPLE_RATE = 16000
CHUNK_SIZE = 512
FORMAT = pyaudio.paInt16
CHANNELS = 1
# 环形缓冲:预存 15 个 Chunk (约 480ms),完美衔接服务端的 500ms 步长,防止吞掉首字
PRE_SPEECH_PAD_FRAMES = 15
# ================= 2. 离线加载 VAD 模型 =================
print("⏳ 正在从本地加载 Silero VAD 模型...")
VAD_LOCAL_PATH = r"D:\你的路径\silero-vad-master" # 替换为你解压的本地路径
vad_model, utils = torch.hub.load(
repo_or_dir=VAD_LOCAL_PATH,
model='silero_vad',
source='local', # 【关键】指定本地加载
force_reload=False,
onnx=False
)
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
vad_iterator = VADIterator(vad_model)
# ================= 3. 核心客户端逻辑 =================
async def mic_client():
try:
async with websockets.connect(WS_URL, ping_interval=None) as websocket:
print("✅ 成功连接到 WSL2 服务端!")
audio_queue = asyncio.Queue()
loop = asyncio.get_running_loop()
ring_buffer = collections.deque(maxlen=PRE_SPEECH_PAD_FRAMES)
# PyAudio 异步回调
def audio_callback(in_data, frame_count, time_info, status):
loop.call_soon_threadsafe(audio_queue.put_nowait, in_data)
return (None, pyaudio.paContinue)
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=SAMPLE_RATE, input=True,
frames_per_buffer=CHUNK_SIZE, stream_callback=audio_callback)
stream.start_stream()
print("🎤 麦克风已激活,可以开始说话了 (按 Ctrl+C 退出)...\n")
async def capture_and_send():
is_speaking = False
while True:
data = await audio_queue.get()
# 转换为 float32 给 VAD 判断
audio_float32 = torch.from_numpy(np.frombuffer(data, dtype=np.int16)).float() / 32768.0
speech_dict = vad_iterator(audio_float32, return_seconds=False)
if speech_dict and 'start' in speech_dict and not is_speaking:
is_speaking = True
await websocket.send(json.dumps({"event": "start"}))
# 【防吞字黑科技】:把说话前积攒的 480ms 静音垫片一口气发过去
for buffered_data in ring_buffer:
await websocket.send(buffered_data)
ring_buffer.clear()
elif speech_dict and 'end' in speech_dict and is_speaking:
is_speaking = False
await websocket.send(data)
await websocket.send(json.dumps({"event": "stop"}))
if is_speaking:
await websocket.send(data)
else:
ring_buffer.append(data)
async def receive_results():
async for message in websocket:
res = json.loads(message)
if res["type"] == "partial":
print(f"\r🗣️ [实时]: {res['text']} ", end="", flush=True)
elif res["type"] == "final":
print(f"\n🎯 [断句]: {res['text']}\n", flush=True)
await asyncio.gather(capture_and_send(), receive_results())
except Exception as e:
print(f"\n⚠️ 发生异常: {e}")
finally:
if 'stream' in locals() and stream.is_active():
stream.stop_stream()
stream.close()
if 'p' in locals():
p.terminate()
if __name__ == "__main__":
try:
asyncio.run(mic_client())
except ConnectionRefusedError:
print(f"❌ 无法连接到 {WS_URL},请确认 WSL2 服务端已开启。")
except KeyboardInterrupt:
print("\n👋 客户端已关闭。")
运行与测试
- 在 WSL2 终端中运行:
python server_asr.py,等待模型加载至显存。 - 在 Windows 终端中运行:
python client_vad.py。 - 拿起麦克风说话,享受几乎无感的实时转写体验吧!
这种 C/S 架构不仅解决了系统级麦克风调用的痛点,后续还可以极度轻易地将其拓展:比如将 Windows 客户端替换为网页端、手机端,或者在服务端接入大语言模型,直接演进成一个全双工的语音对话助手(Voice Assistant)。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)