构建极低延迟的实时语音识别系统:基于 Qwen3-ASR (vLLM) 与 Silero VAD 的 C/S 架构实战

在本地部署大语言模型和语音模型越来越流行的今天,想要在 Windows 下优雅地跑起一套**极低延迟的流式语音识别(Streaming ASR)**系统,往往会遇到一个非常头疼的底层硬件痛点:WSL2 环境默认无法直接访问宿主机的麦克风

虽然可以通过 usbipd 强行进行 USB 直通,但配置繁琐且极不稳定。为了绕开这个大坑,并最大化利用本地 GPU 资源,本文将带你从零构建一套优雅的 C/S(客户端/服务端)架构实时语音识别工程:

  • 客户端(Windows 宿主机):仅使用 CPU 进行轻量级的麦克风录音与 Silero-VAD 语音活动检测,通过 WebSocket 发送有效音频流。
  • 服务端(WSL2 容器):满血运行 Qwen3-ASR (vLLM 后端),纯异步接收音频,利用 GPU 算力进行流式转写并实时推回结果。

核心架构设计与亮点

  1. 算力解耦:客户端零 GPU 依赖,所有的重度推理全部交给服务端。
  2. 防吞字防切音(Ring Buffer):客户端引入了环形缓冲区,在 VAD 触发前预存约 480ms 的环境音“垫片”,在开始说话的瞬间与语音一起打包发送,彻底解决首字声母被吞的问题。
  3. 原生 vLLM 异步流式集成:服务端遵循 Qwen3 官方例程,通过 500ms 的音频缓冲池与 asyncio.to_thread 完美结合,既保证了推理效率,又彻底避免了密集计算对 WebSocket 网络的阻塞。

一、 服务端实现 (WSL2 环境)

1. 环境准备

在 WSL2 (Ubuntu) 环境中,确保你已经安装了支持 vLLM 后端的 qwen-asr 以及 websockets

pip install -U qwen-asr[vllm] websockets numpy

2. 避坑指南:vLLM 的三大天坑

在编写服务端代码时,我们曾踩过三个极其经典的坑,在这里提前排雷:

  • 相对路径加载失败:加载本地 Hugging Face 模型时,路径中不能带有 ../,这会触发库的命名规范校验。必须使用绝对路径
  • 多进程无限套娃 (RuntimeError: An attempt has been made to start a new process...):vLLM 在 Linux 下使用 spawn 模式启动子进程分配显存。如果在文件最外层实例化模型,会导致子进程无限重复导入脚本。必须将模型实例化代码放入 main() 内部,并被 if __name__ == "__main__": 保护。
  • KV Cache 显存溢出 (ValueError: To serve at least one request...):语音模型默认的最大序列长度 (max_model_len) 极长,导致 vLLM 初始化时预分配的 KV Cache 撑爆显存。对于流式语音片段,强制将 max_model_len 设为 4096 即可完美解决。

3. 服务端代码 (server_asr.py)

import asyncio
import websockets
import json
import logging
import numpy as np
from qwen_asr import Qwen3ASRModel

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# 绝对路径配置,切忌使用相对路径
ASR_MODEL_PATH = "/home/你的用户名/models/Qwen3-ASR-1.7B" 
# 服务端缓冲大小:500ms (16000Hz * 2bytes * 0.5s)
CHUNK_BYTES_THRESHOLD = 16000 

async def main():
    # ================= 1. 模型初始化 (必须在 main 内部) =================
    logging.info(f"正在加载模型 {ASR_MODEL_PATH} ...")
    asr = Qwen3ASRModel.LLM(
        model=ASR_MODEL_PATH,
        gpu_memory_utilization=0.9,
        max_new_tokens=32,
        max_model_len=4096  # 【关键】强行限制上下文长度,极大节省 KV Cache 显存!
    )
    logging.info("✅ 模型加载完成。等待客户端连接...")

    # ================= 2. WebSocket 处理闭包 =================
    async def handle_client(websocket):
        client_ip = websocket.remote_address
        logging.info(f"🔗 客户端已连接: {client_ip}")
        
        state = None
        audio_buffer = bytearray()
        
        try:
            async for message in websocket:
                # ---------- 接收控制信令 (JSON) ----------
                if isinstance(message, str):
                    data = json.loads(message)
                    if data.get("event") == "start":
                        logging.info("\n[新会话] VAD 触发,初始化 Streaming State...")
                        # 初始化流式状态(放入后台线程防阻塞)
                        state = await asyncio.to_thread(
                            asr.init_streaming_state,
                            unfixed_chunk_num=2,
                            unfixed_token_num=5,
                            chunk_size_sec=2.0
                        )
                        audio_buffer.clear()
                        
                    elif data.get("event") == "stop":
                        if state:
                            # 处理不足 500ms 的尾部残余数据
                            if len(audio_buffer) > 0:
                                wav16k = np.frombuffer(audio_buffer, dtype=np.int16).astype(np.float32) / 32768.0
                                await asyncio.to_thread(asr.streaming_transcribe, wav16k, state)
                                audio_buffer.clear()
                            
                            # 获取最终断句结果
                            await asyncio.to_thread(asr.finish_streaming_transcribe, state)
                            final_text = state.text
                            logging.info(f"[最终] {final_text}")
                            await websocket.send(json.dumps({"type": "final", "text": final_text}))
                            state = None

                # ---------- 接收音频流 (Bytes) ----------
                elif isinstance(message, bytes):
                    if state:
                        audio_buffer.extend(message)
                        # 积攒够 500ms,送入模型推理一次
                        if len(audio_buffer) >= CHUNK_BYTES_THRESHOLD:
                            wav16k = np.frombuffer(audio_buffer, dtype=np.int16).astype(np.float32) / 32768.0
                            audio_buffer.clear()
                            
                            await asyncio.to_thread(asr.streaming_transcribe, wav16k, state)
                            
                            if state.text:
                                await websocket.send(json.dumps({"type": "partial", "text": state.text}))

        except websockets.exceptions.ConnectionClosed:
            logging.warning(f"❌ 客户端断开连接")
        finally:
            if state:
                await asyncio.to_thread(asr.finish_streaming_transcribe, state)

    # 关闭 ping_interval 确保流式空闲期不断连
    async with websockets.serve(handle_client, "0.0.0.0", 8765, ping_interval=None):
        logging.info("🚀 WSL2 ASR WebSocket 服务已启动: ws://0.0.0.0:8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

二、 客户端实现 (Windows 环境)

1. 环境准备与 VAD 离线加载

在 Windows 端,我们只需安装基础的录音和网络库,以及 CPU 版的 PyTorch 即可:

pip install torch torchaudio --index-url https://download.pytorch.org/whl/cpu
pip install pyaudio numpy websockets

⚠️ 离线加载 VAD 模型避坑:
在国内网络环境下,torch.hub.load 自动下载 GitHub 资源经常会报 ssl.SSLEOFError。解决方案是去 snakers4/silero-vad 手动下载源码压缩包解压,并使用 source='local' 从本地加载。

2. 客户端代码 (client_vad.py)

import pyaudio
import numpy as np
import torch
import asyncio
import websockets
import json
import collections

# ================= 1. 配置参数 =================
WS_URL = "ws://127.0.0.1:8765"
SAMPLE_RATE = 16000
CHUNK_SIZE = 512
FORMAT = pyaudio.paInt16
CHANNELS = 1

# 环形缓冲:预存 15 个 Chunk (约 480ms),完美衔接服务端的 500ms 步长,防止吞掉首字
PRE_SPEECH_PAD_FRAMES = 15  

# ================= 2. 离线加载 VAD 模型 =================
print("⏳ 正在从本地加载 Silero VAD 模型...")
VAD_LOCAL_PATH = r"D:\你的路径\silero-vad-master"  # 替换为你解压的本地路径

vad_model, utils = torch.hub.load(
    repo_or_dir=VAD_LOCAL_PATH,
    model='silero_vad',
    source='local', # 【关键】指定本地加载
    force_reload=False,
    onnx=False
)
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
vad_iterator = VADIterator(vad_model)

# ================= 3. 核心客户端逻辑 =================
async def mic_client():
    try:
        async with websockets.connect(WS_URL, ping_interval=None) as websocket:
            print("✅ 成功连接到 WSL2 服务端!")
            
            audio_queue = asyncio.Queue()
            loop = asyncio.get_running_loop()
            ring_buffer = collections.deque(maxlen=PRE_SPEECH_PAD_FRAMES)
            
            # PyAudio 异步回调
            def audio_callback(in_data, frame_count, time_info, status):
                loop.call_soon_threadsafe(audio_queue.put_nowait, in_data)
                return (None, pyaudio.paContinue)

            p = pyaudio.PyAudio()
            stream = p.open(format=FORMAT, channels=CHANNELS, rate=SAMPLE_RATE, input=True,
                            frames_per_buffer=CHUNK_SIZE, stream_callback=audio_callback)
            stream.start_stream()
            print("🎤 麦克风已激活,可以开始说话了 (按 Ctrl+C 退出)...\n")

            async def capture_and_send():
                is_speaking = False
                while True:
                    data = await audio_queue.get()
                    
                    # 转换为 float32 给 VAD 判断
                    audio_float32 = torch.from_numpy(np.frombuffer(data, dtype=np.int16)).float() / 32768.0 
                    speech_dict = vad_iterator(audio_float32, return_seconds=False)
                    
                    if speech_dict and 'start' in speech_dict and not is_speaking:
                        is_speaking = True
                        await websocket.send(json.dumps({"event": "start"}))
                        # 【防吞字黑科技】:把说话前积攒的 480ms 静音垫片一口气发过去
                        for buffered_data in ring_buffer:
                            await websocket.send(buffered_data)
                        ring_buffer.clear()
                        
                    elif speech_dict and 'end' in speech_dict and is_speaking:
                        is_speaking = False
                        await websocket.send(data)
                        await websocket.send(json.dumps({"event": "stop"}))
                    
                    if is_speaking:
                        await websocket.send(data)
                    else:
                        ring_buffer.append(data)

            async def receive_results():
                async for message in websocket:
                    res = json.loads(message)
                    if res["type"] == "partial":
                        print(f"\r🗣️ [实时]: {res['text']}    ", end="", flush=True)
                    elif res["type"] == "final":
                        print(f"\n🎯 [断句]: {res['text']}\n", flush=True)

            await asyncio.gather(capture_and_send(), receive_results())

    except Exception as e:
         print(f"\n⚠️ 发生异常: {e}")
    finally:
        if 'stream' in locals() and stream.is_active():
            stream.stop_stream()
            stream.close()
        if 'p' in locals():
            p.terminate()

if __name__ == "__main__":
    try:
        asyncio.run(mic_client())
    except ConnectionRefusedError:
        print(f"❌ 无法连接到 {WS_URL},请确认 WSL2 服务端已开启。")
    except KeyboardInterrupt:
        print("\n👋 客户端已关闭。")

运行与测试

  1. 在 WSL2 终端中运行:python server_asr.py,等待模型加载至显存。
  2. 在 Windows 终端中运行:python client_vad.py
  3. 拿起麦克风说话,享受几乎无感的实时转写体验吧!

这种 C/S 架构不仅解决了系统级麦克风调用的痛点,后续还可以极度轻易地将其拓展:比如将 Windows 客户端替换为网页端、手机端,或者在服务端接入大语言模型,直接演进成一个全双工的语音对话助手(Voice Assistant)。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐