FunASR 实时流式语音识别:从原理到实践代码实现

提示: 跳过原理快速查看完整代码请直接跳转到第四节

1. FunASR 简介

1.1 什么是 FunASR

FunASR 是阿里巴巴达摩院开源的语音识别工具包,旨在 bridging 学术研究与工业应用之间的鸿沟。它基于 PyTorch 构建,提供从模型训练到推理部署的完整解决方案。

FunASR 的核心优势:

  • 丰富的预训练模型:涵盖语音识别(ASR)、语音活动检测(VAD)、语音分离、标点恢复等多种任务
  • 工业级性能:经过阿里巴巴集团大规模生产环境验证
  • 灵活的架构:支持自定义模型、数据集和训练流程
  • 中文优化:对中文语音识别场景做了深度优化

1.2 FunASR 核心模型

FunASR 提供了多种预训练模型,其中最常用的包括:

模型类型 模型名称 用途
ASR paraformer-zh 高精度离线中文语音识别
ASR paraformer-zh-streaming 流式中文语音识别
VAD fsmn-vad 语音活动检测
Punctuation ct-punc 标点符号恢复
Speaker cam++ 说话人识别

本文将重点介绍 paraformer-zh-streaming 流式识别模型的原理与应用。


2. 实时推理 vs 离线推理

2.1 离线推理(Offline Inference)

定义:离线推理指在用户说完一整段话后,系统才开始进行语音识别处理。

工作流程

用户说话 → 录制完整音频 → 上传完整音频 → 模型处理 → 返回完整文本

特点

  • 优点

    • 识别精度高,模型可以利用完整上下文信息
    • 实现简单,无需考虑状态管理
    • 适合批量处理场景(如视频字幕生成、会议录音转写)
  • 缺点

    • 响应延迟高,用户体验差
    • 无法边说边出结果
    • 不适合实时交互场景

典型应用场景

  • 录音文件转写
  • 视频字幕生成
  • 会议记录整理

2.2 实时推理(Real-time Inference)

定义:实时推理指用户说话的同时,系统持续进行语音识别并返回识别结果。

工作流程

用户说话 → 音频流分块 → 每块实时处理 → 增量返回文本 → 用户边说边看结果

特点

  • 优点

    • 低延迟,用户体验好
    • 边说边出结果,所见即所得
    • 适合实时交互场景
  • 缺点

    • 实现复杂,需要状态管理
    • 可能存在识别修正(后续块可能影响前面的结果)
    • 对系统稳定性要求高

典型应用场景

  • 实时语音转文字
  • 语音助手交互
  • 直播字幕
  • 会议实时字幕

2.3 对比总结

维度 离线推理 实时推理
延迟 高(等待完整音频) 低(毫秒级响应)
精度 较高(完整上下文) 略低(有限上下文)
实现复杂度 简单 复杂
状态管理 无需 需要
用户体验 一般 优秀
适用场景 批量处理 实时交互

3. 流式推理 vs 非流式推理

3.1 核心概念辨析

在讨论实时语音识别时,容易混淆"实时"和"流式"两个概念:

  • 实时(Real-time):描述的是用户体验层面的特性,指系统能够快速响应用户输入,边说边出结果。
  • 流式(Streaming):描述的是模型推理的技术实现方式,指模型以增量方式处理音频数据。

关系:流式推理是实现实时推理的主流技术手段,但实时推理也可以通过其他方式实现(如短音频分片离线处理)。

3.2 非流式推理(Non-streaming)

原理:模型一次性接收完整音频,进行整体编码和解码。

处理过程

完整音频 → 编码器(整体编码) → 解码器(整体解码) → 完整文本

特点

  • 编码器可以看到整个音频的上下文信息
  • 解码器可以基于完整编码序列进行解码
  • 识别精度通常更高
  • 无法在说话过程中给出结果

Paraformer 非流式模型结构

音频特征 → Encoder → Predictor → Decoder → 文本
          (整体)     (整体)      (整体)

3.3 流式推理(Streaming)

原理:模型增量接收音频块(chunk),维护中间状态(cache),逐步输出结果。

处理过程

音频块1 → 编码器(块1, cache) → 解码器(块1, cache) → 文本片段1
音频块2 → 编码器(块2, cache) → 解码器(块2, cache) → 文本片段2
...
音频块N → 编码器(块N, cache) → 解码器(块N, cache) → 文本片段N

核心机制

  1. Chunk 分块:将连续音频流切分为固定大小的块
  2. Cache 状态缓存:保存跨 chunk 的中间状态,确保上下文连续性
  3. Look-back 机制:编码器和解码器可以"回头看"之前的几个 chunk

Paraformer 流式模型结构

音频块 → Streaming Encoder → Streaming Decoder → 文本片段
         (带cache)            (带cache)
              ↓                    ↓
         encoder_look_back    decoder_look_back

3.4 流式推理的关键参数

3.4.1 Chunk Size(块大小)
chunk_size = [0, 10, 5]  # [左看, 当前块, 右看]
  • 左看(chunk_size[0]):编码时可以访问之前多少个块的信息
  • 当前块(chunk_size[1]):当前处理的音频块大小
  • 右看(chunk_size[2]):编码时可以访问之后多少个块的信息

示例解释:

  • chunk_size = [0, 10, 5] 表示当前处理第 10 个块时,可以"右看" 5 个块
  • 右看越多,精度越高,但延迟越大
  • 左看用于保持上下文连续性
3.4.2 Look-back 参数
encoder_chunk_look_back = 4  # 编码器回看块数
decoder_chunk_look_back = 1  # 解码器回看块数

这些参数控制模型在处理当前块时,需要参考之前多少个块的状态信息,用于保持识别的连续性和一致性。

3.4.3 Cache 管理
asr_cache = {}  # 存储模型中间状态

# 每次推理时传入 cache
result = model.generate(
    input=audio_chunk,
    cache=asr_cache,    # 传入当前 cache
    is_final=False,      # 是否为最后一块
    chunk_size=chunk_size,
    encoder_chunk_look_back=encoder_look_back,
    decoder_chunk_look_back=decoder_look_back
)
# cache 会被模型内部更新,下次推理继续使用

3.5 流式 vs 非流式对比

维度 非流式推理 流式推理
输入 完整音频 音频块序列
状态管理 无需 需要 cache
延迟 高(等完整音频) 低(逐块处理)
精度 高(完整上下文) 略低(有限上下文)
内存占用 高(完整音频) 低(当前块)
实现复杂度 简单 复杂

3.6 如何选择

选择非流式场景

  • 离线批量处理
  • 对精度要求极高
  • 不需要实时响应

选择流式场景

  • 实时语音识别
  • 低延迟要求
  • 资源受限环境
  • 交互式应用

4. FunASR 流式推理实战之paraformer-zh-streaming

运行界面

在这里插入图片描述

4.1 环境准备

pyproject.toml 文件

[project]
name = "blog-workspace"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "dotenv>=0.9.9",
    "flask>=3.1.3",
    "funasr>=1.3.1",
    "numpy>=2.4.4",
    "opuslib-next>=1.1.6",
    "torch>=2.11.0",
    "torchaudio>=2.11.0",
]

4.2 模型加载

hugginface模型下载方法: https://huggingface.co/funasr/paraformer-zh-streaming
modelscope模型下载: https://www.modelscope.cn/models/shuai1618/paraformer-zh-streaming

paraformer-zh-streaming

print("⏳ 加载 ASR 模型...")
asr_model = AutoModel(
    model=ASR_MODEL_PATH,
    model_revision="v2.0.4",
    disable_update=True,
)
print("✅ ASR 模型加载完成")


print("⏳ 加载 VAD 模型...")
vad_model = AutoModel(
    model=VAD_MODEL_PATH,
    model_revision="v2.0.4",
    disable_update=True,
)
print("✅ VAD 模型加载完成\n")

4.3 核心推理逻辑

暂无

4.4 完整示例(可直接运行)

"""
基于 paraformer-zh-streaming + fsmn-vad 的实时流式中文语音识别 Web 版本
通过 Web Audio API 获取麦克风音频,通过 HTTP 流式发送到后端

Install:
  pip install flask

Run:
  python web_asr_realtime.py
Open:
  http://127.0.0.1:8000
"""
import os
import queue
import time
import uuid
import numpy as np
import pyaudio
import threading

from dataclasses  import dataclass
from funasr       import AutoModel
from dotenv       import load_dotenv
from typing       import Dict, Optional
from flask        import Flask, Response, jsonify, request

load_dotenv()
root_path = os.environ.get("model_root_dir", "")
print(f"root_path: {root_path}")

# ─── 模型路径 ────────────────────────────────────────────────────
ASR_MODEL_PATH = root_path + "paraformer-zh-streaming"
VAD_MODEL_PATH = root_path + "fsmn-vad"

# ─── 音频参数 ────────────────────────────────────────────────────
SAMPLE_RATE = 16000
CHANNELS = 1

VAD_CHUNK_MS = 200
VAD_CHUNK_SAMPLES = int(SAMPLE_RATE * VAD_CHUNK_MS / 1000)  # 3200

ASR_CHUNK_MS = 600
ASR_CHUNK_SAMPLES = int(SAMPLE_RATE * ASR_CHUNK_MS / 1000)  # 9600

CHUNK_SIZE_CFG = [0, 10, 5]
ENCODER_LOOK_BACK = 4
DECODER_LOOK_BACK = 1

# ─── Flask 应用 ────────────────────────────────────────────────────
app = Flask(__name__)

# ════════════════════════════════════════════════════════════════
print("⏳ 加载 ASR 模型...")
asr_model = AutoModel(
    model=ASR_MODEL_PATH,
    model_revision="v2.0.4",
    disable_update=True,
)
print("✅ ASR 模型加载完成")

print("⏳ 加载 VAD 模型...")
vad_model = AutoModel(
    model=VAD_MODEL_PATH,
    model_revision="v2.0.4",
    disable_update=True,
)
print("✅ VAD 模型加载完成\n")


# ─── 会话管理 ──────────────────────────────────────────────────────
@dataclass
class Session:
    vad_cache: dict
    is_speaking: bool
    silence_start: float
    asr_cache: dict
    asr_pending: list
    sentence_text: str
    sentence_start_time: float
    created_at: float
    last_seen: float


SESSIONS: Dict[str, Session] = {}
SESSION_TTL_SEC = 10 * 60


def _gc_sessions():
    """清理过期会话"""
    now = time.time()
    dead = [sid for sid, s in SESSIONS.items() if now - s.last_seen > SESSION_TTL_SEC]
    for sid in dead:
        SESSIONS.pop(sid, None)


def _get_session(session_id: str) -> Optional[Session]:
    """获取会话并更新最后访问时间"""
    _gc_sessions()
    s = SESSIONS.get(session_id)
    if s:
        s.last_seen = time.time()
    return s


# ─── ASR 处理函数 ─────────────────────────────────────────────────
def _feed_asr(chunk: np.ndarray, asr_cache: dict, is_final: bool) -> str:
    """送入 ASR,返回识别的文字片段"""
    try:
        result = asr_model.generate(
            input=chunk,
            cache=asr_cache,
            is_final=is_final,
            chunk_size=CHUNK_SIZE_CFG,
            encoder_chunk_look_back=ENCODER_LOOK_BACK,
            decoder_chunk_look_back=DECODER_LOOK_BACK,
            disable_pbar=True,
        )
        if result:
            return result[0].get("text", "").strip()
    except Exception as e:
        print(f"\n⚠️  ASR 异常: {e}")
    return ""


def _flush_pending(session: Session, is_final: bool) -> str:
    """处理待识别的音频,返回整句累积文字"""
    if is_final:
        if session.asr_pending:
            chunk = np.array(session.asr_pending, dtype=np.float32)
            session.asr_pending = []
            new_piece = _feed_asr(chunk, session.asr_cache, is_final=True)
        else:
            new_piece = _feed_asr(
                np.zeros(160, dtype=np.float32), session.asr_cache, is_final=True
            )
        if new_piece:
            session.sentence_text += new_piece
    else:
        while len(session.asr_pending) >= ASR_CHUNK_SAMPLES:
            chunk = np.array(
                session.asr_pending[:ASR_CHUNK_SAMPLES], dtype=np.float32
            )
            session.asr_pending = session.asr_pending[ASR_CHUNK_SAMPLES:]
            new_piece = _feed_asr(chunk, session.asr_cache, is_final=False)
            if new_piece:
                session.sentence_text += new_piece

    return session.sentence_text


def _end_sentence(session: Session) -> dict:
    """结束当前句子"""
    text = _flush_pending(session, is_final=True)
    duration = time.time() - session.sentence_start_time

    # 重置句子状态
    session.asr_cache = {}
    session.asr_pending = []
    session.sentence_text = ""
    session.is_speaking = False
    session.silence_start = 0.0
    session.sentence_start_time = 0.0

    return {
        "text": text,
        "duration": duration,
        "is_sentence_end": True
    }


# ─── HTML 页面 ─────────────────────────────────────────────────────
INDEX_HTML = r"""<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="utf-8" />
  <meta name="viewport" content="width=device-width,initial-scale=1" />
  <title>Paraformer 实时语音识别</title>
  <style>
    :root{
      --bg:#ffffff;
      --card:#ffffff;
      --muted:#5b6472;
      --text:#0f172a;
      --border:#e5e7eb;
      --ok:#059669;
      --warn:#d97706;
      --danger:#e11d48;
    }

    html, body { height: 100%; }

    body{
      margin:0;
      font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, "Noto Sans";
      background: var(--bg);
      color:var(--text);
    }

    .wrap{
      height: 100vh;
      max-width: none;
      margin: 0;
      padding: 16px;
      box-sizing: border-box;
      display: flex;
    }

    .card{
      width: 100%;
      height: 100%;
      background: var(--card);
      border:1px solid var(--border);
      border-radius: 14px;
      padding: 16px;
      box-sizing: border-box;
      box-shadow: 0 10px 30px rgba(0,0,0,.06);
      display: flex;
      flex-direction: column;
      gap: 12px;
      min-height: 0;
    }

    h1{ font-size: 18px; margin: 0; letter-spacing:.2px; font-weight: 600;}

    .row{ display:flex; gap:12px; align-items:center; flex-wrap: wrap; }

    button{
      border:1px solid var(--border); border-radius: 12px;
      padding: 10px 16px; cursor:pointer; color:var(--text);
      background: #f8fafc;
      transition: transform .05s ease, background .15s ease, border-color .15s ease;
      font-weight: 600;
      font-size: 14px;
    }
    button:hover{ background: #f1f5f9; border-color:#cbd5e1; }
    button:active{ transform: translateY(1px); }
    button.primary{ border-color: rgba(5,150,105,.35); background: rgba(5,150,105,.10); }
    button.danger{ border-color: rgba(225,29,72,.35); background: rgba(225,29,72,.10); }
    button:disabled{ opacity:.5; cursor:not-allowed; }

    .pill{
      font-size: 12px; padding: 6px 10px; border-radius: 999px;
      border:1px solid var(--border); color: var(--muted);
      background: #f8fafc;
      user-select:none;
    }
    .pill.ok{ color: #065f46; border-color: rgba(5,150,105,.35); background: rgba(5,150,105,.10); }
    .pill.warn{ color: #92400e; border-color: rgba(217,119,6,.35); background: rgba(217,119,6,.10); }
    .pill.err{ color: #9f1239; border-color: rgba(225,29,72,.35); background: rgba(225,29,72,.10); }

    .panel{
      border:1px solid var(--border);
      border-radius: 12px;
      background: #ffffff;
      padding: 12px;
    }

    .panel.textpanel{
      flex: 1;
      display: flex;
      flex-direction: column;
      min-height: 0;
    }

    .label{ color:var(--muted); font-size: 12px; margin-bottom: 6px; }
    .mono{ font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New"; }

    #text{
      flex: 1;
      min-height: 0;
      white-space: pre-wrap;
      line-height: 1.6;
      font-size: 15px;
      padding: 12px;
      border-radius: 12px;
      border: 1px solid var(--border);
      background: #f8fafc;
      overflow: auto;
    }

    a{ color: #2563eb; text-decoration: none; font-size: 13px; font-weight: 600; }

    .info{ font-size: 13px; color: var(--muted); }
  </style>
</head>
<body>
  <div class="wrap">
    <div class="card">
      <h1>Paraformer 实时语音识别</h1>

      <div class="row">
        <button id="btnStart" class="primary">开始识别</button>
        <button id="btnStop" class="danger" disabled>停止识别</button>
        <span id="status" class="pill warn">未开始</span>
        <a href="javascript:void(0)" id="btnClear" style="margin-left:auto;">清空文本</a>
      </div>

      <div class="panel">
        <div class="row">
          <span class="info">模型: paraformer-zh-streaming + fsmn-vad</span>
        </div>
      </div>

      <div class="panel textpanel">
        <div class="label">识别文本</div>
        <div id="text"></div>
      </div>
    </div>
  </div>

<script>
(() => {
  const $ = (id) => document.getElementById(id);

  const btnStart = $("btnStart");
  const btnStop  = $("btnStop");
  const btnClear = $("btnClear");
  const statusEl = $("status");
  const textEl   = $("text");

  // VAD 块大小 200ms,与后端一致
  const VAD_CHUNK_MS = 200;
  const TARGET_SR = 16000;

  let audioCtx = null;
  let processor = null;
  let source = null;
  let mediaStream = null;

  let sessionId = null;
  let running = false;

  let buf = new Float32Array(0);
  let pushing = false;

  // 当前句子的文本
  let currentSentence = "";
  let allText = [];

  function setStatus(text, cls){
    statusEl.textContent = text;
    statusEl.className = "pill " + (cls || "");
  }

  function lockUI(on){
    btnStart.disabled = on;
    btnStop.disabled = !on;
  }

  function concatFloat32(a, b){
    const out = new Float32Array(a.length + b.length);
    out.set(a, 0);
    out.set(b, a.length);
    return out;
  }

  function resampleLinear(input, srcSr, dstSr){
    if (srcSr === dstSr) return input;
    const ratio = dstSr / srcSr;
    const outLen = Math.max(0, Math.round(input.length * ratio));
    const out = new Float32Array(outLen);
    for (let i = 0; i < outLen; i++){
      const x = i / ratio;
      const x0 = Math.floor(x);
      const x1 = Math.min(x0 + 1, input.length - 1);
      const t = x - x0;
      out[i] = input[x0] * (1 - t) + input[x1] * t;
    }
    return out;
  }

  function updateTextDisplay(){
    // 显示所有已完成句子 + 当前句子
    textEl.textContent = allText.join("") + currentSentence;
    // 滚动到底部
    textEl.scrollTop = textEl.scrollHeight;
  }

  async function apiStart(){
    const r = await fetch("/api/start", {method:"POST"});
    if(!r.ok) throw new Error(await r.text());
    const j = await r.json();
    sessionId = j.session_id;
  }

  async function apiPushChunk(float32_16k){
    const r = await fetch("/api/chunk?session_id=" + encodeURIComponent(sessionId), {
      method: "POST",
      headers: {"Content-Type":"application/octet-stream"},
      body: float32_16k.buffer
    });
    if(!r.ok) throw new Error(await r.text());
    return await r.json();
  }

  async function apiFinish(){
    const r = await fetch("/api/finish?session_id=" + encodeURIComponent(sessionId), {method:"POST"});
    if(!r.ok) throw new Error(await r.text());
    return await r.json();
  }

  btnClear.onclick = () => {
    allText = [];
    currentSentence = "";
    updateTextDisplay();
  };

  async function stopAudioPipeline(){
    try{
      if (processor){ processor.disconnect(); processor.onaudioprocess = null; }
      if (source) source.disconnect();
      if (audioCtx) await audioCtx.close();
      if (mediaStream) mediaStream.getTracks().forEach(t => t.stop());
    }catch(e){}
    processor = null; source = null; audioCtx = null; mediaStream = null;
  }

  btnStart.onclick = async () => {
    if (running) return;

    allText = [];
    currentSentence = "";
    updateTextDisplay();

    buf = new Float32Array(0);

    try{
      setStatus("启动中...", "warn");
      lockUI(true);

      await apiStart();

      mediaStream = await navigator.mediaDevices.getUserMedia({
        audio: {
          channelCount: 1,
          echoCancellation: true,
          noiseSuppression: true,
          autoGainControl: true
        },
        video: false
      });

      audioCtx = new (window.AudioContext || window.webkitAudioContext)();
      source = audioCtx.createMediaStreamSource(mediaStream);

      processor = audioCtx.createScriptProcessor(4096, 1, 1);
      const chunkSamples = Math.round(TARGET_SR * (VAD_CHUNK_MS / 1000));

      processor.onaudioprocess = (e) => {
        if (!running) return;
        const input = e.inputBuffer.getChannelData(0);
        const resampled = resampleLinear(input, audioCtx.sampleRate, TARGET_SR);
        buf = concatFloat32(buf, resampled);
        if (!pushing) pump();
      };

      source.connect(processor);
      processor.connect(audioCtx.destination);

      running = true;
      setStatus("识别中...", "ok");

    }catch(err){
      console.error(err);
      setStatus("启动失败: " + err.message, "err");
      lockUI(false);
      running = false;
      sessionId = null;
      await stopAudioPipeline();
    }
  };

  async function pump(){
    if (pushing) return;
    pushing = true;

    const chunkSamples = Math.round(TARGET_SR * (VAD_CHUNK_MS / 1000));

    try{
      while (running && buf.length >= chunkSamples){
        const chunk = buf.slice(0, chunkSamples);
        buf = buf.slice(chunkSamples);

        const j = await apiPushChunk(chunk);

        if (j.is_sentence_end && j.text) {
          // 句子结束,保存到历史
          allText.push(j.text);
          currentSentence = "";
        } else if (j.text) {
          // 句子进行中,更新当前句子
          currentSentence = j.text;
        }

        updateTextDisplay();

        if (running) setStatus("识别中...", "ok");
      }
    }catch(err){
      console.error(err);
      if (running) setStatus("后端错误: " + err.message, "err");
    }finally{
      pushing = false;
    }
  }

  btnStop.onclick = async () => {
    if (!running) return;

    running = false;
    setStatus("停止中...", "warn");
    lockUI(false);

    await stopAudioPipeline();

    try{
      if (sessionId){
        const j = await apiFinish();
        if (j.is_sentence_end && j.text) {
          allText.push(j.text);
          currentSentence = "";
        } else if (j.text) {
          currentSentence = j.text;
        }
        updateTextDisplay();
      }
      setStatus("已停止", "");
    }catch(err){
      console.error(err);
      setStatus("停止失败: " + err.message, "err");
    }finally{
      sessionId = null;
      buf = new Float32Array(0);
      pushing = false;
    }
  };
})();
</script>
</body>
</html>
"""


# ─── Flask 路由 ───────────────────────────────────────────────────
@app.get("/")
def index():
    return Response(INDEX_HTML, mimetype="text/html; charset=utf-8")


@app.post("/api/start")
def api_start():
    """启动一个新的识别会话"""
    session_id = uuid.uuid4().hex
    now = time.time()
    SESSIONS[session_id] = Session(
        vad_cache={},
        is_speaking=False,
        silence_start=0.0,
        asr_cache={},
        asr_pending=[],
        sentence_text="",
        sentence_start_time=0.0,
        created_at=now,
        last_seen=now
    )
    print(f"✅ 新会话 {session_id}")
    return jsonify({"session_id": session_id})


@app.post("/api/chunk")
def api_chunk():
    """处理音频块"""
    session_id = request.args.get("session_id", "")
    s = _get_session(session_id)
    if not s:
        return jsonify({"error": "invalid session_id"}), 400

    if request.mimetype != "application/octet-stream":
        return jsonify({"error": "expect application/octet-stream"}), 400

    raw = request.get_data(cache=False)
    if len(raw) % 4 != 0:
        return jsonify({"error": "float32 bytes length not multiple of 4"}), 400

    wav = np.frombuffer(raw, dtype=np.float32).reshape(-1)

    # VAD 检测
    vad_speech_start = False
    vad_speech_end = False
    try:
        vad_res = vad_model.generate(
            input=wav,
            cache=s.vad_cache,
            is_final=False,
            chunk_size=VAD_CHUNK_MS,
            disable_pbar=True,
        )
        if vad_res and vad_res[0].get("value"):
            for seg in vad_res[0]["value"]:
                s_start, s_end = seg[0], seg[1]
                if s_start >= 0:
                    vad_speech_start = True
                if s_end >= 0:
                    vad_speech_end = True
    except Exception as ex:
        print(f"\n⚠️  VAD 异常: {ex}")

    # 语音开始
    if vad_speech_start and not s.is_speaking:
        s.is_speaking = True
        s.silence_start = 0.0
        s.sentence_start_time = time.time()
        s.sentence_text = ""
        print(f"🎤 语音开始")

    if s.is_speaking:
        s.asr_pending.extend(wav.tolist())

        if vad_speech_end:
            # VAD 给出自然断句
            print(f"⏹️ 语音结束")
            result = _end_sentence(s)
            return jsonify(result)
        else:
            # 流式识别
            text = _flush_pending(s, is_final=False)
            return jsonify({
                "text": text,
                "is_sentence_end": False
            })

    return jsonify({"text": "", "is_sentence_end": False})


@app.post("/api/finish")
def api_finish():
    """结束识别会话"""
    session_id = request.args.get("session_id", "")
    s = _get_session(session_id)
    if not s:
        return jsonify({"error": "invalid session_id"}), 400

    # 如果正在说话,结束当前句子
    if s.is_speaking:
        result = _end_sentence(s)
        SESSIONS.pop(session_id, None)
        return jsonify(result)

    SESSIONS.pop(session_id, None)
    return jsonify({"text": "", "is_sentence_end": False})


if __name__ == "__main__":
    print("=" * 60)
    print("  Paraformer 实时语音识别 Web 服务")
    print("  模型: paraformer-zh-streaming + fsmn-vad")
    print("  访问: http://127.0.0.1:8000")
    print("  按 Ctrl+C 停止")
    print("=" * 60 + "\n")

    app.run(host="0.0.0.0", port=8000, debug=False, threaded=True)


5. 最佳实践与注意事项

5.1 音频预处理

  1. 采样率:确保音频采样率为 16kHz,与模型训练一致
  2. 声道:转换为单声道
  3. 降噪:建议启用 Web Audio API 的降噪、回声消除功能

5.2 Chunk 大小选择

Chunk 大小 延迟 精度 适用场景
小(200-400ms) 较低 高实时性要求
中(600-800ms) 平衡场景
大(1000ms+) 较高 精度优先场景

5.3 Cache 管理

  • 每个会话(session)需要独立的 cache
  • 会话结束时调用 is_final=True 清理状态
  • 注意及时清理过期会话,避免内存泄漏

5.4 VAD 配合使用

流式 ASR 通常配合 VAD(语音活动检测)使用:

  1. VAD 检测语音开始:用户开始说话
  2. VAD 检测语音结束:自然断句点
  3. ASR 流式处理:在语音段内进行流式识别

6. 总结

本文详细介绍了 FunASR 实时流式语音识别的核心概念:

  1. FunASR 是功能强大的开源语音识别工具包,提供丰富的预训练模型
  2. 实时推理 vs 离线推理:从用户体验角度区分,实时推理实现边说边出结果
  3. 流式推理 vs 非流式推理:从技术实现角度区分,流式推理通过 cache 和 chunk 机制实现增量处理

理解这些概念的区别和联系,有助于我们在实际项目中做出正确的技术选型。流式推理是构建实时语音识别系统的核心技术,掌握其原理和实践方法对于开发高质量语音应用至关重要。


参考资料

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐