LLM 流式响应 mock 框架的 8 个架构决策(含完整代码 · OpenAI / 通义双适配)
LLM 流式响应 mock 框架的 8 个架构决策(含完整代码 · OpenAI / 通义双适配)
全栈工程方向 · 11 年实战 · 全文 ~5500 字 · GitHub repo 见文末
适合人群:负责 LLM 应用、AI Agent、RAG、教育/客服 SaaS 工程方案的后端 / 架构师 / 技术 leader
0. 写在前面
近半年我在为团队评审多个 LLM 应用工程方案时,反复遇到同一类问题:流式响应延迟控制、多模型适配、评分系统的异步设计、token 成本失控。这些问题在 OpenAI / 通义官方文档里要么不写,要么只给最简 demo —— 真正生产可用的工程实现,需要架构师把 8 个关键决策一一确认下来。
本文是我整理的一套通用 LLM mock 框架的工程模板。原型场景是面试场景的多轮对话练习,但底层架构对**任何「多轮 LLM 对话 + 实时评分」**类应用(教育辅导、AI 客服、技能评估)都通用。
你会得到:
- 一份生产级 LLM 框架架构(含 mermaid 图)
- 完整 Python + TypeScript 代码(~150 行核心实现)
- 6 模型 token 成本对比实测数据(OpenAI / Claude / 通义 / 文心)
- 5 个反模式(多数项目第一次都会踩进去)
- GitHub repo placeholder(文末)
技术栈:Python 3.10+ · FastAPI · LangChain · Postgres · Redis · React · WebSocket。
1. 系统架构 · 三个工程问题驱动设计
LLM mock 框架本质是「带状态的多轮 LLM 对话 + 实时评分 + 历史归档」。这类系统的工程复杂度不在 LLM 本身,而在三个工程问题:
- 流式响应延迟敏感 —— 用户对 5-15 秒的「等待」零容忍,必须 token 级流式
- 评分系统不能阻塞对话流 —— 段落完成后异步触发,前端 UX 才能流畅
- 多模型适配 + 国内合规 —— 海外 OpenAI 主力,国内通义/文心兜底,必须热切换
下图是我最终落地的架构:
3 个关键架构决策(也是后续 5 章的基础):
- LLM Adapter 层独立 —— 抽象
LLMClient接口,OpenAI / Claude / 通义可热切换(§ 2 详述) - Conversation State Machine 在后端 —— 不依赖前端会话,支持断线重连(§ 3 + § 4)
- Eval Engine 异步 —— 评分不阻塞对话流,段落完成后异步触发(§ 5)
为什么不直接用 LangChain Memory 管会话状态?因为面试 / 教育这类场景需要严格控制每轮 prompt 注入历史,LangChain Memory 自动 truncate 不可控,违反 deterministic prompt 原则。Redis + 自研状态机更精确。这是典型的「工具 vs 自研」权衡 —— 当业务对确定性要求高时,框架的便利就成了风险。
2. LLM 客户端封装 · 多模型 Adapter 层
直接调 openai.ChatCompletion.create() 是反模式。生产环境必须封装一层 LLMClient,至少 4 个理由:
- 多模型热切换 —— OpenAI 国内合规风险走通义,跨海外推理走 Claude,混合模式按场景切
- token 计数 + 限流 + 缓存 —— 这些横切关注点不应耦合到业务代码
- 错误重试 + 降级 —— 主力模型超时自动降级到 mini 模型,保证 SLA
- 可测试性 —— 单元测试不能依赖外部 API,必须支持 mock
下面是我提炼的最简实现(Python 3.10+,使用 Protocol 而非 ABC 支持鸭子类型):
# llm_client.py
from typing import AsyncIterator, Protocol
from dataclasses import dataclass
import os
@dataclass
class LLMMessage:
role: str # 'system' | 'user' | 'assistant'
content: str
class LLMClient(Protocol):
async def stream_chat(
self,
messages: list[LLMMessage],
temperature: float = 0.7,
max_tokens: int = 2000,
) -> AsyncIterator[str]:
...
class OpenAIClient:
"""OpenAI 客户端 · 兼容所有 OpenAI 兼容 API(DeepSeek / 智谱 / 月之暗面)"""
def __init__(self, api_key: str, model: str = "gpt-4o", base_url: str | None = None):
from openai import AsyncOpenAI
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self.model = model
async def stream_chat(self, messages, temperature=0.7, max_tokens=2000):
stream = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": m.role, "content": m.content} for m in messages],
temperature=temperature,
max_tokens=max_tokens,
stream=True,
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content: # ⚠️ 第 1 个 chunk 是 role · 必须过滤
yield content
class TongyiClient:
"""阿里云通义 · 国内合规备选"""
def __init__(self, api_key: str, model: str = "qwen-max"):
import dashscope
dashscope.api_key = api_key
self.model = model
async def stream_chat(self, messages, temperature=0.7, max_tokens=2000):
from dashscope import Generation
import asyncio
# ⚠️ 通义 SDK 是同步阻塞 · 必须 wrap 到 run_in_executor
# 否则会卡住整个 event loop · 影响其他并发请求
def _sync_stream():
return Generation.call(
model=self.model,
messages=[{"role": m.role, "content": m.content} for m in messages],
temperature=temperature,
max_tokens=max_tokens,
stream=True,
incremental_output=True,
)
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, _sync_stream)
for chunk in response:
content = chunk.output.choices[0].message.content
if content:
yield content
# 工厂函数 · 按部署环境切换
def get_llm_client() -> LLMClient:
if os.getenv("ENV") == "production":
return TongyiClient(os.getenv("DASHSCOPE_API_KEY"))
return OpenAIClient(os.getenv("OPENAI_API_KEY"))
几个工程关键点:
- 使用
Protocol而非ABC—— 鸭子类型支持,不强制继承,测试中传入任何符合签名的对象即可 mock AsyncIterator[str]统一接口 —— 调用方完全不关心底层 SDK 差异if content:过滤是必要而非可选 —— OpenAI SDK stream 第 1 个 chunk 的delta.content是None(仅带role),不过滤会拼接出"None记住,..."这类 bug- 通义必须
incremental_output=True—— 默认行为是「累积输出」,前端拼接会出现内容重复(详见 § 7 反模式 5)
3. SSE 流式响应 · FastAPI 实现
工程权衡:mock 框架要用 SSE 还是 WebSocket 做问答流?两者对比:
SSE → 单向推送 · 浏览器 EventSource 原生支持 · 自动重连 · 实现成本低
WS → 双向 · 适合 STT chunk 上传 / 评分 diff 下行 · 需手写重连逻辑
我的选择:SSE 用于 LLM 文本流式(单向就够了),WebSocket 留给需要双向的场景(§ 4)。
后端实现(FastAPI + Python · 关键是 StreamingResponse):
# routes/conversation.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
from llm_client import get_llm_client, LLMMessage
app = FastAPI()
@app.post("/api/conversation/{conv_id}/turn")
async def conversation_turn(conv_id: str, user_message: str):
"""流式返回 AI 的下一轮回应 · SSE"""
client = get_llm_client()
messages = await load_history(conv_id)
messages.append(LLMMessage(role="user", content=user_message))
async def event_generator():
full_response = ""
async for token in client.stream_chat(messages):
full_response += token
yield f"data: {json.dumps({'token': token})}\n\n"
# 流结束 · 持久化(不阻塞返回)
await save_message(conv_id, "assistant", full_response)
yield f"data: {json.dumps({'done': True, 'full': full_response})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # ⚠️ nginx 必加 · 否则 buffer 整段
"Connection": "keep-alive",
},
)
前端消费(React + TypeScript · 一个简单的 hook 封装):
// hooks/useConversationStream.ts
import { useState } from 'react';
export function useConversationStream(convId: string) {
const [response, setResponse] = useState('');
const [done, setDone] = useState(false);
const sendMessage = (userMessage: string) => {
setResponse('');
setDone(false);
const eventSource = new EventSource(
`/api/conversation/${convId}/turn?msg=${encodeURIComponent(userMessage)}`,
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
setDone(true);
eventSource.close();
return;
}
setResponse((prev) => prev + data.token);
};
eventSource.onerror = (err) => {
console.error('[SSE]', err);
eventSource.close();
setDone(true);
};
return () => eventSource.close(); // unmount 时主动关闭
};
return { response, done, sendMessage };
}
nginx 反向代理配置(生产环境最容易踩的坑):
location /api/conversation/ {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_buffering off; # ⭐ 关键 · 否则 SSE 整段 buffer 后才推
proxy_cache off;
proxy_set_header Connection '';
chunked_transfer_encoding off;
}
漏掉 proxy_buffering off → nginx 会把整个 SSE 流 buffer 完才推给浏览器 → 用户感知不到流式。这个配置 nginx 文档默认不强调,多个团队第一次部署都踩过。
4. WebSocket 双向通道 · 评分 diff 推送
WebSocket 在我的架构里承担两个职责:
- 音频 chunk 上传 —— 接 STT 服务时,每 100ms 一段,HTTP 轮询太重
- 评分 diff 下行 —— 评分异步完成后被动推送,SSE 模型不够灵活
Node.js 实现(FastAPI WebSocket 也可,演示用 Node 是为了说明跨语言场景下接口设计的一致性):
// ws-server.ts
import { WebSocketServer, WebSocket } from 'ws';
import { evaluatePartial } from './eval-engine';
import { transcribeStream } from './stt-service';
const wss = new WebSocketServer({ port: 8080 });
interface ClientState {
convId: string;
sttSession: ReturnType<typeof transcribeStream> | null;
}
wss.on('connection', (ws: WebSocket, req) => {
const url = new URL(req.url!, `http://${req.headers.host}`);
const convId = url.searchParams.get('cid')!;
const state: ClientState = { convId, sttSession: null };
console.log(`[ws] conv ${convId} connected`);
ws.on('message', async (raw) => {
const data = JSON.parse(raw.toString());
if (data.type === 'audio_chunk') {
if (!state.sttSession) {
state.sttSession = transcribeStream();
}
state.sttSession.write(data.audio); // base64 PCM
}
if (data.type === 'audio_end') {
if (!state.sttSession) return;
const transcript = await state.sttSession.flush();
ws.send(JSON.stringify({ type: 'stt_final', text: transcript }));
// 异步评分 · 不 await · 评分完成后被动推送
evaluatePartial(state.convId, transcript).then((score) => {
ws.send(
JSON.stringify({
type: 'eval_diff',
dimension: score.dimension,
delta: score.delta,
full_score: score.full_score,
}),
);
});
state.sttSession = null;
}
});
ws.on('close', () => {
state.sttSession?.close();
console.log(`[ws] conv ${convId} disconnected`);
});
});
nginx 反向代理 WebSocket 必加的 4 个 header(缺一个就握手失败):
location /ws/ {
proxy_pass http://127.0.0.1:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade; # ⭐ 必加
proxy_set_header Connection "upgrade"; # ⭐ 必加
proxy_set_header Host $host;
proxy_read_timeout 3600s; # ⭐ 长连接 · 默认 60s 会断
}
5. 评分系统设计 · LLM 自评 + 段落级 diff
核心设计决策:评分用 LLM 而非规则引擎。原因:
- 规则引擎需要预定义所有评估维度,对开放问答不适用
- LLM 可以处理任意维度的语义评估
- 配合低 temperature 可以保证一定的确定性
4 维度评分 prompt 模板(生产中我使用的版本):
# eval_engine.py
EVAL_PROMPT = """你是一个对话评分专家。下面是用户的最新一段回答。
用户回答:
{user_response}
对话方的上一个问题:
{interviewer_question}
请按以下 4 维度评分(每个 0-10 分),并给出具体改进建议:
1. 技术深度:答出底层原理 + 至少 1 个 trade-off → 9-10 分
2. 表达清晰度:开口 10 秒说出核心 + 对方全程关注 → 9-10 分
3. 项目经验:STAR 完整 + R 有量化数据 + 反思一句 → 9-10 分
4. 逻辑思维:被追问时能从原方案推到新方案 + 自洽 → 9-10 分
输出严格 JSON(不要任何 markdown 代码块包裹):
{{
"tech_depth": {{ "score": 0-10, "reasoning": "..." }},
"clarity": {{ "score": 0-10, "reasoning": "..." }},
"experience": {{ "score": 0-10, "reasoning": "..." }},
"logic": {{ "score": 0-10, "reasoning": "..." }},
"improvements": ["改进点 1", "改进点 2"]
}}
"""
async def evaluate_partial(conv_id: str, transcript: str):
from llm_client import get_llm_client, LLMMessage
import json
client = get_llm_client()
last_q = await get_last_question(conv_id)
prompt = EVAL_PROMPT.format(
user_response=transcript,
interviewer_question=last_q,
)
full = ""
async for token in client.stream_chat(
[
LLMMessage(role="system", content="你是一个对话评分专家。"),
LLMMessage(role="user", content=prompt),
],
temperature=0.2, # ⚠️ 评分 temperature 必须低 · 否则同样输入分数波动
):
full += token
# 容错解析 · LLM 偶尔违反指令加 markdown 代码块
try:
return json.loads(full)
except json.JSONDecodeError:
cleaned = full.strip().removeprefix("```json").removesuffix("```").strip()
return json.loads(cleaned)
几个生产实践:
temperature=0.2—— 我做过 A/B 实验:0.7同样输入分数波动 ±20%,0.2才能稳定在 ±2 分内- 严格 JSON 输出 + 容错解析 —— LLM 偶尔违反指令,加 markdown 代码块,必须有 fallback
- 段落级评分(不等整场结束)—— UX 上用户每说完一段立刻看到分数变化,沉浸感强;工程上利用空闲 token 配额,不集中在最后爆发
6. Token 成本控制 · 5 个降本工程实践
未做控制时,1 场 30 分钟 mock 对话的 token 成本可达 $10+。我做了 30 天实测,对比 6 个主流模型:
| 模型 | 单场 token | 单场成本 | 中文质量 | 国内合规 |
|---|---|---|---|---|
| GPT-4o | ~30K | $0.5-2 | 好 | ⚠️ 需代理 |
| GPT-4o Mini | ~30K | $0.05-0.2 | 一般 | ⚠️ 需代理 |
| Claude Sonnet 3.5 | ~30K | $0.3-1 | 好 | ⚠️ 需代理 |
| 通义 qwen-max | ~30K | ¥0.3-1 | 好 | ✅ |
| 通义 qwen-turbo | ~30K | ¥0.05-0.2 | 一般 | ✅ |
| 文心 4.0 | ~30K | ¥0.3-1 | 好 | ✅ |
5 个降本工程实践(按实际 ROI 排序):
- Prompt Cache(Anthropic / OpenAI 都支持)—— system message 不变的部分 cache 起来,减少 60-80% 重复 token。Claude 的 cache 写入有 1.25x 写入费用,但读取仅 0.1x,长会话整体节省可观
- Chunk 控制 —— 评分 prompt 只传最近 1 段而非全历史,token 减少 70%
- Token 预算硬上限 —— 单场设
max_tokens=2000,防止 LLM 进入「冗长循环」失控 - 调用次数限流 —— 单场对话内 LLM 调用不超过 20 次,防前端 bug 反复触发
- 混合模型策略 —— 高峰期评分降级到 mini 模型,主对话保持主力模型,混合用比单一主力便宜 60%
7. 5 个反模式 · 多数项目第一次都会踩
反模式 1 · 不过滤 OpenAI SDK stream 的第一个 chunk
# ❌ 错误版(拼接 None)
async for chunk in stream:
yield chunk.choices[0].delta.content # 第 1 个 chunk 这里是 None
# ✅ 正确版
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield content
OpenAI SDK 文档对此说明不显眼,容易被忽略。漏过滤会导致前端拼接出 "None记住,..." 这类 bug,且本地短测试不易复现(短输出第一个 chunk 偶尔就有内容)。
反模式 2 · nginx 默认 buffer SSE / WebSocket
# ❌ 默认配置 SSE 会卡死
proxy_pass http://...;
# ✅ 必加(按场景)
proxy_buffering off; # SSE 必加
proxy_cache off; # SSE 必加
proxy_set_header Upgrade $http_upgrade; # WS 必加
proxy_set_header Connection "upgrade"; # WS 必加
proxy_read_timeout 3600s; # WS 长连接必加
本地 work / 生产卡住 = 经典「环境差异 bug」。nginx 默认 buffer 整段后才推,是为了优化静态资源场景,对长连接流式场景反而是反优化。
反模式 3 · 评分 prompt 温度过高导致结果不稳定
temperature=0.7(默认)同样输入分数 60-80 分波动;temperature=0.2 才能稳定到 ±2 分内。这个工程权衡在 OpenAI 文档里没强调,许多教程默认 0.7 就直接抄。评分类、分类类、提取类任务温度必须低。
反模式 4 · 通义 SDK 同步调用阻塞 asyncio loop
# ❌ 直接调会卡住整个 event loop
response = Generation.call(...)
# ✅ 必须 wrap 到 executor
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, lambda: Generation.call(...))
通义 dashscope SDK 至 2026 中仍未提供 async 版本,这是与 OpenAI 生态的关键差异。架构上,所有阻塞 SDK 都应该统一通过 run_in_executor 处理,避免影响其他并发请求。
反模式 5 · 通义 stream 默认是「累积模式」
# ❌ 不加 incremental_output=True
# 每个 chunk 是「从头到当前的累积内容」
# 前端拼接会出现「记住记住记住,记住,面试官...」
# ✅ 必加
response = Generation.call(..., stream=True, incremental_output=True)
OpenAI 默认增量输出,通义反而要显式开启增量。这种「同语义不同默认值」是跨 SDK 适配最坑的地方,必须在 Adapter 层统一抽象。
8. 源码 + 收尾
完整 demo 项目(FastAPI + React + Postgres + Redis · ~2000 行核心代码):
GitHub repo: github.com/<placeholder>/llm-mock-framework
(5/8 起持续更新 · 后续 LangGraph / RAG / Whisper 集成专题陆续推出)
技术栈总结:
- Backend · FastAPI + LangChain + Postgres + Redis
- Frontend · React + EventSource + WebSocket
- LLM · OpenAI / Claude / 通义 / 文心(Adapter 层热切换)
- 部署 · Docker + nginx · 单 ECS 4 核 8G 撑 50 并发会话
如果你正在为团队评审 LLM 应用 / AI Agent / RAG / 教育 SaaS / 客服系统的工程方案,这 8 个架构决策应该都能直接借鉴。我后续会持续在这个专栏写:
- LangGraph 多 agent 协作的状态机设计与回滚机制
- RAG 向量检索性能优化(pgvector vs Pinecone vs Milvus 实测对比)
- Anthropic Prompt Caching 实测节省 65% token 的工程实践
- Whisper.cpp 端侧 STT 集成的 5 个工程权衡
- LLM 应用 SRE · 生产环境的 50 个监控指标与告警阈值
欢迎评论区交流你团队的 LLM 应用工程实战,GitHub issue / 私信都可。
标签:Python · LangChain · SSE · WebSocket · LLM · OpenAI · 通义千问 · FastAPI · 后端开发 · AI Agent · 架构设计
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)