【语音相关ASR】FunASR 实时流式语音识别之paraformer-zh-streaming
FunASR 实时流式语音识别:从原理到实践代码实现
提示: 跳过原理快速查看完整代码请直接跳转到第四节
1. FunASR 简介
1.1 什么是 FunASR
FunASR 是阿里巴巴达摩院开源的语音识别工具包,旨在 bridging 学术研究与工业应用之间的鸿沟。它基于 PyTorch 构建,提供从模型训练到推理部署的完整解决方案。
FunASR 的核心优势:
- 丰富的预训练模型:涵盖语音识别(ASR)、语音活动检测(VAD)、语音分离、标点恢复等多种任务
- 工业级性能:经过阿里巴巴集团大规模生产环境验证
- 灵活的架构:支持自定义模型、数据集和训练流程
- 中文优化:对中文语音识别场景做了深度优化
1.2 FunASR 核心模型
FunASR 提供了多种预训练模型,其中最常用的包括:
| 模型类型 | 模型名称 | 用途 |
|---|---|---|
| ASR | paraformer-zh | 高精度离线中文语音识别 |
| ASR | paraformer-zh-streaming | 流式中文语音识别 |
| VAD | fsmn-vad | 语音活动检测 |
| Punctuation | ct-punc | 标点符号恢复 |
| Speaker | cam++ | 说话人识别 |
本文将重点介绍 paraformer-zh-streaming 流式识别模型的原理与应用。
2. 实时推理 vs 离线推理
2.1 离线推理(Offline Inference)
定义:离线推理指在用户说完一整段话后,系统才开始进行语音识别处理。
工作流程:
用户说话 → 录制完整音频 → 上传完整音频 → 模型处理 → 返回完整文本
特点:
-
优点:
- 识别精度高,模型可以利用完整上下文信息
- 实现简单,无需考虑状态管理
- 适合批量处理场景(如视频字幕生成、会议录音转写)
-
缺点:
- 响应延迟高,用户体验差
- 无法边说边出结果
- 不适合实时交互场景
典型应用场景:
- 录音文件转写
- 视频字幕生成
- 会议记录整理
2.2 实时推理(Real-time Inference)
定义:实时推理指用户说话的同时,系统持续进行语音识别并返回识别结果。
工作流程:
用户说话 → 音频流分块 → 每块实时处理 → 增量返回文本 → 用户边说边看结果
特点:
-
优点:
- 低延迟,用户体验好
- 边说边出结果,所见即所得
- 适合实时交互场景
-
缺点:
- 实现复杂,需要状态管理
- 可能存在识别修正(后续块可能影响前面的结果)
- 对系统稳定性要求高
典型应用场景:
- 实时语音转文字
- 语音助手交互
- 直播字幕
- 会议实时字幕
2.3 对比总结
| 维度 | 离线推理 | 实时推理 |
|---|---|---|
| 延迟 | 高(等待完整音频) | 低(毫秒级响应) |
| 精度 | 较高(完整上下文) | 略低(有限上下文) |
| 实现复杂度 | 简单 | 复杂 |
| 状态管理 | 无需 | 需要 |
| 用户体验 | 一般 | 优秀 |
| 适用场景 | 批量处理 | 实时交互 |
3. 流式推理 vs 非流式推理
3.1 核心概念辨析
在讨论实时语音识别时,容易混淆"实时"和"流式"两个概念:
- 实时(Real-time):描述的是用户体验层面的特性,指系统能够快速响应用户输入,边说边出结果。
- 流式(Streaming):描述的是模型推理的技术实现方式,指模型以增量方式处理音频数据。
关系:流式推理是实现实时推理的主流技术手段,但实时推理也可以通过其他方式实现(如短音频分片离线处理)。
3.2 非流式推理(Non-streaming)
原理:模型一次性接收完整音频,进行整体编码和解码。
处理过程:
完整音频 → 编码器(整体编码) → 解码器(整体解码) → 完整文本
特点:
- 编码器可以看到整个音频的上下文信息
- 解码器可以基于完整编码序列进行解码
- 识别精度通常更高
- 无法在说话过程中给出结果
Paraformer 非流式模型结构:
音频特征 → Encoder → Predictor → Decoder → 文本
(整体) (整体) (整体)
3.3 流式推理(Streaming)
原理:模型增量接收音频块(chunk),维护中间状态(cache),逐步输出结果。
处理过程:
音频块1 → 编码器(块1, cache) → 解码器(块1, cache) → 文本片段1
音频块2 → 编码器(块2, cache) → 解码器(块2, cache) → 文本片段2
...
音频块N → 编码器(块N, cache) → 解码器(块N, cache) → 文本片段N
核心机制:
- Chunk 分块:将连续音频流切分为固定大小的块
- Cache 状态缓存:保存跨 chunk 的中间状态,确保上下文连续性
- Look-back 机制:编码器和解码器可以"回头看"之前的几个 chunk
Paraformer 流式模型结构:
音频块 → Streaming Encoder → Streaming Decoder → 文本片段
(带cache) (带cache)
↓ ↓
encoder_look_back decoder_look_back
3.4 流式推理的关键参数
3.4.1 Chunk Size(块大小)
chunk_size = [0, 10, 5] # [左看, 当前块, 右看]
- 左看(chunk_size[0]):编码时可以访问之前多少个块的信息
- 当前块(chunk_size[1]):当前处理的音频块大小
- 右看(chunk_size[2]):编码时可以访问之后多少个块的信息
示例解释:
chunk_size = [0, 10, 5]表示当前处理第 10 个块时,可以"右看" 5 个块- 右看越多,精度越高,但延迟越大
- 左看用于保持上下文连续性
3.4.2 Look-back 参数
encoder_chunk_look_back = 4 # 编码器回看块数
decoder_chunk_look_back = 1 # 解码器回看块数
这些参数控制模型在处理当前块时,需要参考之前多少个块的状态信息,用于保持识别的连续性和一致性。
3.4.3 Cache 管理
asr_cache = {} # 存储模型中间状态
# 每次推理时传入 cache
result = model.generate(
input=audio_chunk,
cache=asr_cache, # 传入当前 cache
is_final=False, # 是否为最后一块
chunk_size=chunk_size,
encoder_chunk_look_back=encoder_look_back,
decoder_chunk_look_back=decoder_look_back
)
# cache 会被模型内部更新,下次推理继续使用
3.5 流式 vs 非流式对比
| 维度 | 非流式推理 | 流式推理 |
|---|---|---|
| 输入 | 完整音频 | 音频块序列 |
| 状态管理 | 无需 | 需要 cache |
| 延迟 | 高(等完整音频) | 低(逐块处理) |
| 精度 | 高(完整上下文) | 略低(有限上下文) |
| 内存占用 | 高(完整音频) | 低(当前块) |
| 实现复杂度 | 简单 | 复杂 |
3.6 如何选择
选择非流式场景:
- 离线批量处理
- 对精度要求极高
- 不需要实时响应
选择流式场景:
- 实时语音识别
- 低延迟要求
- 资源受限环境
- 交互式应用
4. FunASR 流式推理实战之paraformer-zh-streaming
运行界面

4.1 环境准备
pyproject.toml 文件
[project]
name = "blog-workspace"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"dotenv>=0.9.9",
"flask>=3.1.3",
"funasr>=1.3.1",
"numpy>=2.4.4",
"opuslib-next>=1.1.6",
"torch>=2.11.0",
"torchaudio>=2.11.0",
]
4.2 模型加载
hugginface模型下载方法: https://huggingface.co/funasr/paraformer-zh-streaming
modelscope模型下载: https://www.modelscope.cn/models/shuai1618/paraformer-zh-streaming
paraformer-zh-streaming
print("⏳ 加载 ASR 模型...")
asr_model = AutoModel(
model=ASR_MODEL_PATH,
model_revision="v2.0.4",
disable_update=True,
)
print("✅ ASR 模型加载完成")
print("⏳ 加载 VAD 模型...")
vad_model = AutoModel(
model=VAD_MODEL_PATH,
model_revision="v2.0.4",
disable_update=True,
)
print("✅ VAD 模型加载完成\n")
4.3 核心推理逻辑
暂无
4.4 完整示例(可直接运行)
"""
基于 paraformer-zh-streaming + fsmn-vad 的实时流式中文语音识别 Web 版本
通过 Web Audio API 获取麦克风音频,通过 HTTP 流式发送到后端
Install:
pip install flask
Run:
python web_asr_realtime.py
Open:
http://127.0.0.1:8000
"""
import os
import queue
import time
import uuid
import numpy as np
import pyaudio
import threading
from dataclasses import dataclass
from funasr import AutoModel
from dotenv import load_dotenv
from typing import Dict, Optional
from flask import Flask, Response, jsonify, request
load_dotenv()
root_path = os.environ.get("model_root_dir", "")
print(f"root_path: {root_path}")
# ─── 模型路径 ────────────────────────────────────────────────────
ASR_MODEL_PATH = root_path + "paraformer-zh-streaming"
VAD_MODEL_PATH = root_path + "fsmn-vad"
# ─── 音频参数 ────────────────────────────────────────────────────
SAMPLE_RATE = 16000
CHANNELS = 1
VAD_CHUNK_MS = 200
VAD_CHUNK_SAMPLES = int(SAMPLE_RATE * VAD_CHUNK_MS / 1000) # 3200
ASR_CHUNK_MS = 600
ASR_CHUNK_SAMPLES = int(SAMPLE_RATE * ASR_CHUNK_MS / 1000) # 9600
CHUNK_SIZE_CFG = [0, 10, 5]
ENCODER_LOOK_BACK = 4
DECODER_LOOK_BACK = 1
# ─── Flask 应用 ────────────────────────────────────────────────────
app = Flask(__name__)
# ════════════════════════════════════════════════════════════════
print("⏳ 加载 ASR 模型...")
asr_model = AutoModel(
model=ASR_MODEL_PATH,
model_revision="v2.0.4",
disable_update=True,
)
print("✅ ASR 模型加载完成")
print("⏳ 加载 VAD 模型...")
vad_model = AutoModel(
model=VAD_MODEL_PATH,
model_revision="v2.0.4",
disable_update=True,
)
print("✅ VAD 模型加载完成\n")
# ─── 会话管理 ──────────────────────────────────────────────────────
@dataclass
class Session:
vad_cache: dict
is_speaking: bool
silence_start: float
asr_cache: dict
asr_pending: list
sentence_text: str
sentence_start_time: float
created_at: float
last_seen: float
SESSIONS: Dict[str, Session] = {}
SESSION_TTL_SEC = 10 * 60
def _gc_sessions():
"""清理过期会话"""
now = time.time()
dead = [sid for sid, s in SESSIONS.items() if now - s.last_seen > SESSION_TTL_SEC]
for sid in dead:
SESSIONS.pop(sid, None)
def _get_session(session_id: str) -> Optional[Session]:
"""获取会话并更新最后访问时间"""
_gc_sessions()
s = SESSIONS.get(session_id)
if s:
s.last_seen = time.time()
return s
# ─── ASR 处理函数 ─────────────────────────────────────────────────
def _feed_asr(chunk: np.ndarray, asr_cache: dict, is_final: bool) -> str:
"""送入 ASR,返回识别的文字片段"""
try:
result = asr_model.generate(
input=chunk,
cache=asr_cache,
is_final=is_final,
chunk_size=CHUNK_SIZE_CFG,
encoder_chunk_look_back=ENCODER_LOOK_BACK,
decoder_chunk_look_back=DECODER_LOOK_BACK,
disable_pbar=True,
)
if result:
return result[0].get("text", "").strip()
except Exception as e:
print(f"\n⚠️ ASR 异常: {e}")
return ""
def _flush_pending(session: Session, is_final: bool) -> str:
"""处理待识别的音频,返回整句累积文字"""
if is_final:
if session.asr_pending:
chunk = np.array(session.asr_pending, dtype=np.float32)
session.asr_pending = []
new_piece = _feed_asr(chunk, session.asr_cache, is_final=True)
else:
new_piece = _feed_asr(
np.zeros(160, dtype=np.float32), session.asr_cache, is_final=True
)
if new_piece:
session.sentence_text += new_piece
else:
while len(session.asr_pending) >= ASR_CHUNK_SAMPLES:
chunk = np.array(
session.asr_pending[:ASR_CHUNK_SAMPLES], dtype=np.float32
)
session.asr_pending = session.asr_pending[ASR_CHUNK_SAMPLES:]
new_piece = _feed_asr(chunk, session.asr_cache, is_final=False)
if new_piece:
session.sentence_text += new_piece
return session.sentence_text
def _end_sentence(session: Session) -> dict:
"""结束当前句子"""
text = _flush_pending(session, is_final=True)
duration = time.time() - session.sentence_start_time
# 重置句子状态
session.asr_cache = {}
session.asr_pending = []
session.sentence_text = ""
session.is_speaking = False
session.silence_start = 0.0
session.sentence_start_time = 0.0
return {
"text": text,
"duration": duration,
"is_sentence_end": True
}
# ─── HTML 页面 ─────────────────────────────────────────────────────
INDEX_HTML = r"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>Paraformer 实时语音识别</title>
<style>
:root{
--bg:#ffffff;
--card:#ffffff;
--muted:#5b6472;
--text:#0f172a;
--border:#e5e7eb;
--ok:#059669;
--warn:#d97706;
--danger:#e11d48;
}
html, body { height: 100%; }
body{
margin:0;
font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, "Noto Sans";
background: var(--bg);
color:var(--text);
}
.wrap{
height: 100vh;
max-width: none;
margin: 0;
padding: 16px;
box-sizing: border-box;
display: flex;
}
.card{
width: 100%;
height: 100%;
background: var(--card);
border:1px solid var(--border);
border-radius: 14px;
padding: 16px;
box-sizing: border-box;
box-shadow: 0 10px 30px rgba(0,0,0,.06);
display: flex;
flex-direction: column;
gap: 12px;
min-height: 0;
}
h1{ font-size: 18px; margin: 0; letter-spacing:.2px; font-weight: 600;}
.row{ display:flex; gap:12px; align-items:center; flex-wrap: wrap; }
button{
border:1px solid var(--border); border-radius: 12px;
padding: 10px 16px; cursor:pointer; color:var(--text);
background: #f8fafc;
transition: transform .05s ease, background .15s ease, border-color .15s ease;
font-weight: 600;
font-size: 14px;
}
button:hover{ background: #f1f5f9; border-color:#cbd5e1; }
button:active{ transform: translateY(1px); }
button.primary{ border-color: rgba(5,150,105,.35); background: rgba(5,150,105,.10); }
button.danger{ border-color: rgba(225,29,72,.35); background: rgba(225,29,72,.10); }
button:disabled{ opacity:.5; cursor:not-allowed; }
.pill{
font-size: 12px; padding: 6px 10px; border-radius: 999px;
border:1px solid var(--border); color: var(--muted);
background: #f8fafc;
user-select:none;
}
.pill.ok{ color: #065f46; border-color: rgba(5,150,105,.35); background: rgba(5,150,105,.10); }
.pill.warn{ color: #92400e; border-color: rgba(217,119,6,.35); background: rgba(217,119,6,.10); }
.pill.err{ color: #9f1239; border-color: rgba(225,29,72,.35); background: rgba(225,29,72,.10); }
.panel{
border:1px solid var(--border);
border-radius: 12px;
background: #ffffff;
padding: 12px;
}
.panel.textpanel{
flex: 1;
display: flex;
flex-direction: column;
min-height: 0;
}
.label{ color:var(--muted); font-size: 12px; margin-bottom: 6px; }
.mono{ font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New"; }
#text{
flex: 1;
min-height: 0;
white-space: pre-wrap;
line-height: 1.6;
font-size: 15px;
padding: 12px;
border-radius: 12px;
border: 1px solid var(--border);
background: #f8fafc;
overflow: auto;
}
a{ color: #2563eb; text-decoration: none; font-size: 13px; font-weight: 600; }
.info{ font-size: 13px; color: var(--muted); }
</style>
</head>
<body>
<div class="wrap">
<div class="card">
<h1>Paraformer 实时语音识别</h1>
<div class="row">
<button id="btnStart" class="primary">开始识别</button>
<button id="btnStop" class="danger" disabled>停止识别</button>
<span id="status" class="pill warn">未开始</span>
<a href="javascript:void(0)" id="btnClear" style="margin-left:auto;">清空文本</a>
</div>
<div class="panel">
<div class="row">
<span class="info">模型: paraformer-zh-streaming + fsmn-vad</span>
</div>
</div>
<div class="panel textpanel">
<div class="label">识别文本</div>
<div id="text"></div>
</div>
</div>
</div>
<script>
(() => {
const $ = (id) => document.getElementById(id);
const btnStart = $("btnStart");
const btnStop = $("btnStop");
const btnClear = $("btnClear");
const statusEl = $("status");
const textEl = $("text");
// VAD 块大小 200ms,与后端一致
const VAD_CHUNK_MS = 200;
const TARGET_SR = 16000;
let audioCtx = null;
let processor = null;
let source = null;
let mediaStream = null;
let sessionId = null;
let running = false;
let buf = new Float32Array(0);
let pushing = false;
// 当前句子的文本
let currentSentence = "";
let allText = [];
function setStatus(text, cls){
statusEl.textContent = text;
statusEl.className = "pill " + (cls || "");
}
function lockUI(on){
btnStart.disabled = on;
btnStop.disabled = !on;
}
function concatFloat32(a, b){
const out = new Float32Array(a.length + b.length);
out.set(a, 0);
out.set(b, a.length);
return out;
}
function resampleLinear(input, srcSr, dstSr){
if (srcSr === dstSr) return input;
const ratio = dstSr / srcSr;
const outLen = Math.max(0, Math.round(input.length * ratio));
const out = new Float32Array(outLen);
for (let i = 0; i < outLen; i++){
const x = i / ratio;
const x0 = Math.floor(x);
const x1 = Math.min(x0 + 1, input.length - 1);
const t = x - x0;
out[i] = input[x0] * (1 - t) + input[x1] * t;
}
return out;
}
function updateTextDisplay(){
// 显示所有已完成句子 + 当前句子
textEl.textContent = allText.join("") + currentSentence;
// 滚动到底部
textEl.scrollTop = textEl.scrollHeight;
}
async function apiStart(){
const r = await fetch("/api/start", {method:"POST"});
if(!r.ok) throw new Error(await r.text());
const j = await r.json();
sessionId = j.session_id;
}
async function apiPushChunk(float32_16k){
const r = await fetch("/api/chunk?session_id=" + encodeURIComponent(sessionId), {
method: "POST",
headers: {"Content-Type":"application/octet-stream"},
body: float32_16k.buffer
});
if(!r.ok) throw new Error(await r.text());
return await r.json();
}
async function apiFinish(){
const r = await fetch("/api/finish?session_id=" + encodeURIComponent(sessionId), {method:"POST"});
if(!r.ok) throw new Error(await r.text());
return await r.json();
}
btnClear.onclick = () => {
allText = [];
currentSentence = "";
updateTextDisplay();
};
async function stopAudioPipeline(){
try{
if (processor){ processor.disconnect(); processor.onaudioprocess = null; }
if (source) source.disconnect();
if (audioCtx) await audioCtx.close();
if (mediaStream) mediaStream.getTracks().forEach(t => t.stop());
}catch(e){}
processor = null; source = null; audioCtx = null; mediaStream = null;
}
btnStart.onclick = async () => {
if (running) return;
allText = [];
currentSentence = "";
updateTextDisplay();
buf = new Float32Array(0);
try{
setStatus("启动中...", "warn");
lockUI(true);
await apiStart();
mediaStream = await navigator.mediaDevices.getUserMedia({
audio: {
channelCount: 1,
echoCancellation: true,
noiseSuppression: true,
autoGainControl: true
},
video: false
});
audioCtx = new (window.AudioContext || window.webkitAudioContext)();
source = audioCtx.createMediaStreamSource(mediaStream);
processor = audioCtx.createScriptProcessor(4096, 1, 1);
const chunkSamples = Math.round(TARGET_SR * (VAD_CHUNK_MS / 1000));
processor.onaudioprocess = (e) => {
if (!running) return;
const input = e.inputBuffer.getChannelData(0);
const resampled = resampleLinear(input, audioCtx.sampleRate, TARGET_SR);
buf = concatFloat32(buf, resampled);
if (!pushing) pump();
};
source.connect(processor);
processor.connect(audioCtx.destination);
running = true;
setStatus("识别中...", "ok");
}catch(err){
console.error(err);
setStatus("启动失败: " + err.message, "err");
lockUI(false);
running = false;
sessionId = null;
await stopAudioPipeline();
}
};
async function pump(){
if (pushing) return;
pushing = true;
const chunkSamples = Math.round(TARGET_SR * (VAD_CHUNK_MS / 1000));
try{
while (running && buf.length >= chunkSamples){
const chunk = buf.slice(0, chunkSamples);
buf = buf.slice(chunkSamples);
const j = await apiPushChunk(chunk);
if (j.is_sentence_end && j.text) {
// 句子结束,保存到历史
allText.push(j.text);
currentSentence = "";
} else if (j.text) {
// 句子进行中,更新当前句子
currentSentence = j.text;
}
updateTextDisplay();
if (running) setStatus("识别中...", "ok");
}
}catch(err){
console.error(err);
if (running) setStatus("后端错误: " + err.message, "err");
}finally{
pushing = false;
}
}
btnStop.onclick = async () => {
if (!running) return;
running = false;
setStatus("停止中...", "warn");
lockUI(false);
await stopAudioPipeline();
try{
if (sessionId){
const j = await apiFinish();
if (j.is_sentence_end && j.text) {
allText.push(j.text);
currentSentence = "";
} else if (j.text) {
currentSentence = j.text;
}
updateTextDisplay();
}
setStatus("已停止", "");
}catch(err){
console.error(err);
setStatus("停止失败: " + err.message, "err");
}finally{
sessionId = null;
buf = new Float32Array(0);
pushing = false;
}
};
})();
</script>
</body>
</html>
"""
# ─── Flask 路由 ───────────────────────────────────────────────────
@app.get("/")
def index():
return Response(INDEX_HTML, mimetype="text/html; charset=utf-8")
@app.post("/api/start")
def api_start():
"""启动一个新的识别会话"""
session_id = uuid.uuid4().hex
now = time.time()
SESSIONS[session_id] = Session(
vad_cache={},
is_speaking=False,
silence_start=0.0,
asr_cache={},
asr_pending=[],
sentence_text="",
sentence_start_time=0.0,
created_at=now,
last_seen=now
)
print(f"✅ 新会话 {session_id}")
return jsonify({"session_id": session_id})
@app.post("/api/chunk")
def api_chunk():
"""处理音频块"""
session_id = request.args.get("session_id", "")
s = _get_session(session_id)
if not s:
return jsonify({"error": "invalid session_id"}), 400
if request.mimetype != "application/octet-stream":
return jsonify({"error": "expect application/octet-stream"}), 400
raw = request.get_data(cache=False)
if len(raw) % 4 != 0:
return jsonify({"error": "float32 bytes length not multiple of 4"}), 400
wav = np.frombuffer(raw, dtype=np.float32).reshape(-1)
# VAD 检测
vad_speech_start = False
vad_speech_end = False
try:
vad_res = vad_model.generate(
input=wav,
cache=s.vad_cache,
is_final=False,
chunk_size=VAD_CHUNK_MS,
disable_pbar=True,
)
if vad_res and vad_res[0].get("value"):
for seg in vad_res[0]["value"]:
s_start, s_end = seg[0], seg[1]
if s_start >= 0:
vad_speech_start = True
if s_end >= 0:
vad_speech_end = True
except Exception as ex:
print(f"\n⚠️ VAD 异常: {ex}")
# 语音开始
if vad_speech_start and not s.is_speaking:
s.is_speaking = True
s.silence_start = 0.0
s.sentence_start_time = time.time()
s.sentence_text = ""
print(f"🎤 语音开始")
if s.is_speaking:
s.asr_pending.extend(wav.tolist())
if vad_speech_end:
# VAD 给出自然断句
print(f"⏹️ 语音结束")
result = _end_sentence(s)
return jsonify(result)
else:
# 流式识别
text = _flush_pending(s, is_final=False)
return jsonify({
"text": text,
"is_sentence_end": False
})
return jsonify({"text": "", "is_sentence_end": False})
@app.post("/api/finish")
def api_finish():
"""结束识别会话"""
session_id = request.args.get("session_id", "")
s = _get_session(session_id)
if not s:
return jsonify({"error": "invalid session_id"}), 400
# 如果正在说话,结束当前句子
if s.is_speaking:
result = _end_sentence(s)
SESSIONS.pop(session_id, None)
return jsonify(result)
SESSIONS.pop(session_id, None)
return jsonify({"text": "", "is_sentence_end": False})
if __name__ == "__main__":
print("=" * 60)
print(" Paraformer 实时语音识别 Web 服务")
print(" 模型: paraformer-zh-streaming + fsmn-vad")
print(" 访问: http://127.0.0.1:8000")
print(" 按 Ctrl+C 停止")
print("=" * 60 + "\n")
app.run(host="0.0.0.0", port=8000, debug=False, threaded=True)
5. 最佳实践与注意事项
5.1 音频预处理
- 采样率:确保音频采样率为 16kHz,与模型训练一致
- 声道:转换为单声道
- 降噪:建议启用 Web Audio API 的降噪、回声消除功能
5.2 Chunk 大小选择
| Chunk 大小 | 延迟 | 精度 | 适用场景 |
|---|---|---|---|
| 小(200-400ms) | 低 | 较低 | 高实时性要求 |
| 中(600-800ms) | 中 | 中 | 平衡场景 |
| 大(1000ms+) | 高 | 较高 | 精度优先场景 |
5.3 Cache 管理
- 每个会话(session)需要独立的 cache
- 会话结束时调用
is_final=True清理状态 - 注意及时清理过期会话,避免内存泄漏
5.4 VAD 配合使用
流式 ASR 通常配合 VAD(语音活动检测)使用:
- VAD 检测语音开始:用户开始说话
- VAD 检测语音结束:自然断句点
- ASR 流式处理:在语音段内进行流式识别
6. 总结
本文详细介绍了 FunASR 实时流式语音识别的核心概念:
- FunASR 是功能强大的开源语音识别工具包,提供丰富的预训练模型
- 实时推理 vs 离线推理:从用户体验角度区分,实时推理实现边说边出结果
- 流式推理 vs 非流式推理:从技术实现角度区分,流式推理通过 cache 和 chunk 机制实现增量处理
理解这些概念的区别和联系,有助于我们在实际项目中做出正确的技术选型。流式推理是构建实时语音识别系统的核心技术,掌握其原理和实践方法对于开发高质量语音应用至关重要。
参考资料
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)