LLM 流式响应 mock 框架的 8 个架构决策(含完整代码 · OpenAI / 通义双适配)

全栈工程方向 · 11 年实战 · 全文 ~5500 字 · GitHub repo 见文末

适合人群:负责 LLM 应用、AI Agent、RAG、教育/客服 SaaS 工程方案的后端 / 架构师 / 技术 leader


0. 写在前面

近半年我在为团队评审多个 LLM 应用工程方案时,反复遇到同一类问题:流式响应延迟控制、多模型适配、评分系统的异步设计、token 成本失控。这些问题在 OpenAI / 通义官方文档里要么不写,要么只给最简 demo —— 真正生产可用的工程实现,需要架构师把 8 个关键决策一一确认下来。

本文是我整理的一套通用 LLM mock 框架的工程模板。原型场景是面试场景的多轮对话练习,但底层架构对**任何「多轮 LLM 对话 + 实时评分」**类应用(教育辅导、AI 客服、技能评估)都通用。

你会得到:

  • 一份生产级 LLM 框架架构(含 mermaid 图)
  • 完整 Python + TypeScript 代码(~150 行核心实现)
  • 6 模型 token 成本对比实测数据(OpenAI / Claude / 通义 / 文心)
  • 5 个反模式(多数项目第一次都会踩进去)
  • GitHub repo placeholder(文末)

技术栈:Python 3.10+ · FastAPI · LangChain · Postgres · Redis · React · WebSocket


1. 系统架构 · 三个工程问题驱动设计

LLM mock 框架本质是「带状态的多轮 LLM 对话 + 实时评分 + 历史归档」。这类系统的工程复杂度不在 LLM 本身,而在三个工程问题:

  1. 流式响应延迟敏感 —— 用户对 5-15 秒的「等待」零容忍,必须 token 级流式
  2. 评分系统不能阻塞对话流 —— 段落完成后异步触发,前端 UX 才能流畅
  3. 多模型适配 + 国内合规 —— 海外 OpenAI 主力,国内通义/文心兜底,必须热切换

下图是我最终落地的架构:

LLM 后端

后端 FastAPI

前端

React UI

SSE Consumer

WebSocket Client

Auth Middleware

Conversation State Machine

LLM Adapter Layer

Eval Engine 异步

Postgres + Redis

OpenAI / Claude / 通义

STT 服务 可选

3 个关键架构决策(也是后续 5 章的基础):

  1. LLM Adapter 层独立 —— 抽象 LLMClient 接口,OpenAI / Claude / 通义可热切换(§ 2 详述)
  2. Conversation State Machine 在后端 —— 不依赖前端会话,支持断线重连(§ 3 + § 4)
  3. Eval Engine 异步 —— 评分不阻塞对话流,段落完成后异步触发(§ 5)

为什么不直接用 LangChain Memory 管会话状态?因为面试 / 教育这类场景需要严格控制每轮 prompt 注入历史,LangChain Memory 自动 truncate 不可控,违反 deterministic prompt 原则。Redis + 自研状态机更精确。这是典型的「工具 vs 自研」权衡 —— 当业务对确定性要求高时,框架的便利就成了风险。


2. LLM 客户端封装 · 多模型 Adapter 层

直接调 openai.ChatCompletion.create() 是反模式。生产环境必须封装一层 LLMClient,至少 4 个理由:

  1. 多模型热切换 —— OpenAI 国内合规风险走通义,跨海外推理走 Claude,混合模式按场景切
  2. token 计数 + 限流 + 缓存 —— 这些横切关注点不应耦合到业务代码
  3. 错误重试 + 降级 —— 主力模型超时自动降级到 mini 模型,保证 SLA
  4. 可测试性 —— 单元测试不能依赖外部 API,必须支持 mock

下面是我提炼的最简实现(Python 3.10+,使用 Protocol 而非 ABC 支持鸭子类型):

# llm_client.py
from typing import AsyncIterator, Protocol
from dataclasses import dataclass
import os

@dataclass
class LLMMessage:
    role: str  # 'system' | 'user' | 'assistant'
    content: str

class LLMClient(Protocol):
    async def stream_chat(
        self,
        messages: list[LLMMessage],
        temperature: float = 0.7,
        max_tokens: int = 2000,
    ) -> AsyncIterator[str]:
        ...

class OpenAIClient:
    """OpenAI 客户端 · 兼容所有 OpenAI 兼容 API(DeepSeek / 智谱 / 月之暗面)"""
    def __init__(self, api_key: str, model: str = "gpt-4o", base_url: str | None = None):
        from openai import AsyncOpenAI
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
        self.model = model

    async def stream_chat(self, messages, temperature=0.7, max_tokens=2000):
        stream = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": m.role, "content": m.content} for m in messages],
            temperature=temperature,
            max_tokens=max_tokens,
            stream=True,
        )
        async for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:  # ⚠️ 第 1 个 chunk 是 role · 必须过滤
                yield content

class TongyiClient:
    """阿里云通义 · 国内合规备选"""
    def __init__(self, api_key: str, model: str = "qwen-max"):
        import dashscope
        dashscope.api_key = api_key
        self.model = model

    async def stream_chat(self, messages, temperature=0.7, max_tokens=2000):
        from dashscope import Generation
        import asyncio

        # ⚠️ 通义 SDK 是同步阻塞 · 必须 wrap 到 run_in_executor
        # 否则会卡住整个 event loop · 影响其他并发请求
        def _sync_stream():
            return Generation.call(
                model=self.model,
                messages=[{"role": m.role, "content": m.content} for m in messages],
                temperature=temperature,
                max_tokens=max_tokens,
                stream=True,
                incremental_output=True,
            )

        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(None, _sync_stream)
        for chunk in response:
            content = chunk.output.choices[0].message.content
            if content:
                yield content

# 工厂函数 · 按部署环境切换
def get_llm_client() -> LLMClient:
    if os.getenv("ENV") == "production":
        return TongyiClient(os.getenv("DASHSCOPE_API_KEY"))
    return OpenAIClient(os.getenv("OPENAI_API_KEY"))

几个工程关键点

  • 使用 Protocol 而非 ABC —— 鸭子类型支持,不强制继承,测试中传入任何符合签名的对象即可 mock
  • AsyncIterator[str] 统一接口 —— 调用方完全不关心底层 SDK 差异
  • if content: 过滤是必要而非可选 —— OpenAI SDK stream 第 1 个 chunk 的 delta.contentNone(仅带 role),不过滤会拼接出 "None记住,..." 这类 bug
  • 通义必须 incremental_output=True —— 默认行为是「累积输出」,前端拼接会出现内容重复(详见 § 7 反模式 5)

3. SSE 流式响应 · FastAPI 实现

工程权衡:mock 框架要用 SSE 还是 WebSocket 做问答流?两者对比:

SSE  → 单向推送 · 浏览器 EventSource 原生支持 · 自动重连 · 实现成本低
WS   → 双向 · 适合 STT chunk 上传 / 评分 diff 下行 · 需手写重连逻辑

我的选择:SSE 用于 LLM 文本流式(单向就够了),WebSocket 留给需要双向的场景(§ 4)。

后端实现(FastAPI + Python · 关键是 StreamingResponse):

# routes/conversation.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
from llm_client import get_llm_client, LLMMessage

app = FastAPI()

@app.post("/api/conversation/{conv_id}/turn")
async def conversation_turn(conv_id: str, user_message: str):
    """流式返回 AI 的下一轮回应 · SSE"""
    client = get_llm_client()
    messages = await load_history(conv_id)
    messages.append(LLMMessage(role="user", content=user_message))

    async def event_generator():
        full_response = ""
        async for token in client.stream_chat(messages):
            full_response += token
            yield f"data: {json.dumps({'token': token})}\n\n"

        # 流结束 · 持久化(不阻塞返回)
        await save_message(conv_id, "assistant", full_response)
        yield f"data: {json.dumps({'done': True, 'full': full_response})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # ⚠️ nginx 必加 · 否则 buffer 整段
            "Connection": "keep-alive",
        },
    )

前端消费(React + TypeScript · 一个简单的 hook 封装):

// hooks/useConversationStream.ts
import { useState } from 'react';

export function useConversationStream(convId: string) {
  const [response, setResponse] = useState('');
  const [done, setDone] = useState(false);

  const sendMessage = (userMessage: string) => {
    setResponse('');
    setDone(false);

    const eventSource = new EventSource(
      `/api/conversation/${convId}/turn?msg=${encodeURIComponent(userMessage)}`,
    );

    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      if (data.done) {
        setDone(true);
        eventSource.close();
        return;
      }
      setResponse((prev) => prev + data.token);
    };

    eventSource.onerror = (err) => {
      console.error('[SSE]', err);
      eventSource.close();
      setDone(true);
    };

    return () => eventSource.close(); // unmount 时主动关闭
  };

  return { response, done, sendMessage };
}

nginx 反向代理配置(生产环境最容易踩的坑):

location /api/conversation/ {
    proxy_pass http://127.0.0.1:8000;
    proxy_http_version 1.1;
    proxy_buffering off;          # ⭐ 关键 · 否则 SSE 整段 buffer 后才推
    proxy_cache off;
    proxy_set_header Connection '';
    chunked_transfer_encoding off;
}

漏掉 proxy_buffering off → nginx 会把整个 SSE 流 buffer 完才推给浏览器 → 用户感知不到流式。这个配置 nginx 文档默认不强调,多个团队第一次部署都踩过。


4. WebSocket 双向通道 · 评分 diff 推送

WebSocket 在我的架构里承担两个职责:

  1. 音频 chunk 上传 —— 接 STT 服务时,每 100ms 一段,HTTP 轮询太重
  2. 评分 diff 下行 —— 评分异步完成后被动推送,SSE 模型不够灵活

Node.js 实现(FastAPI WebSocket 也可,演示用 Node 是为了说明跨语言场景下接口设计的一致性):

// ws-server.ts
import { WebSocketServer, WebSocket } from 'ws';
import { evaluatePartial } from './eval-engine';
import { transcribeStream } from './stt-service';

const wss = new WebSocketServer({ port: 8080 });

interface ClientState {
  convId: string;
  sttSession: ReturnType<typeof transcribeStream> | null;
}

wss.on('connection', (ws: WebSocket, req) => {
  const url = new URL(req.url!, `http://${req.headers.host}`);
  const convId = url.searchParams.get('cid')!;
  const state: ClientState = { convId, sttSession: null };

  console.log(`[ws] conv ${convId} connected`);

  ws.on('message', async (raw) => {
    const data = JSON.parse(raw.toString());

    if (data.type === 'audio_chunk') {
      if (!state.sttSession) {
        state.sttSession = transcribeStream();
      }
      state.sttSession.write(data.audio); // base64 PCM
    }

    if (data.type === 'audio_end') {
      if (!state.sttSession) return;
      const transcript = await state.sttSession.flush();
      ws.send(JSON.stringify({ type: 'stt_final', text: transcript }));

      // 异步评分 · 不 await · 评分完成后被动推送
      evaluatePartial(state.convId, transcript).then((score) => {
        ws.send(
          JSON.stringify({
            type: 'eval_diff',
            dimension: score.dimension,
            delta: score.delta,
            full_score: score.full_score,
          }),
        );
      });

      state.sttSession = null;
    }
  });

  ws.on('close', () => {
    state.sttSession?.close();
    console.log(`[ws] conv ${convId} disconnected`);
  });
});

nginx 反向代理 WebSocket 必加的 4 个 header(缺一个就握手失败):

location /ws/ {
    proxy_pass http://127.0.0.1:8080;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;     # ⭐ 必加
    proxy_set_header Connection "upgrade";       # ⭐ 必加
    proxy_set_header Host $host;
    proxy_read_timeout 3600s;                    # ⭐ 长连接 · 默认 60s 会断
}

5. 评分系统设计 · LLM 自评 + 段落级 diff

核心设计决策:评分用 LLM 而非规则引擎。原因:

  • 规则引擎需要预定义所有评估维度,对开放问答不适用
  • LLM 可以处理任意维度的语义评估
  • 配合低 temperature 可以保证一定的确定性

4 维度评分 prompt 模板(生产中我使用的版本):

# eval_engine.py
EVAL_PROMPT = """你是一个对话评分专家。下面是用户的最新一段回答。

用户回答:
{user_response}

对话方的上一个问题:
{interviewer_question}

请按以下 4 维度评分(每个 0-10 分),并给出具体改进建议:

1. 技术深度:答出底层原理 + 至少 1 个 trade-off → 9-10 分
2. 表达清晰度:开口 10 秒说出核心 + 对方全程关注 → 9-10 分
3. 项目经验:STAR 完整 + R 有量化数据 + 反思一句 → 9-10 分
4. 逻辑思维:被追问时能从原方案推到新方案 + 自洽 → 9-10 分

输出严格 JSON(不要任何 markdown 代码块包裹):
{{
  "tech_depth": {{ "score": 0-10, "reasoning": "..." }},
  "clarity": {{ "score": 0-10, "reasoning": "..." }},
  "experience": {{ "score": 0-10, "reasoning": "..." }},
  "logic": {{ "score": 0-10, "reasoning": "..." }},
  "improvements": ["改进点 1", "改进点 2"]
}}
"""

async def evaluate_partial(conv_id: str, transcript: str):
    from llm_client import get_llm_client, LLMMessage
    import json

    client = get_llm_client()
    last_q = await get_last_question(conv_id)

    prompt = EVAL_PROMPT.format(
        user_response=transcript,
        interviewer_question=last_q,
    )

    full = ""
    async for token in client.stream_chat(
        [
            LLMMessage(role="system", content="你是一个对话评分专家。"),
            LLMMessage(role="user", content=prompt),
        ],
        temperature=0.2,  # ⚠️ 评分 temperature 必须低 · 否则同样输入分数波动
    ):
        full += token

    # 容错解析 · LLM 偶尔违反指令加 markdown 代码块
    try:
        return json.loads(full)
    except json.JSONDecodeError:
        cleaned = full.strip().removeprefix("```json").removesuffix("```").strip()
        return json.loads(cleaned)

几个生产实践

  • temperature=0.2 —— 我做过 A/B 实验:0.7 同样输入分数波动 ±20%,0.2 才能稳定在 ±2 分内
  • 严格 JSON 输出 + 容错解析 —— LLM 偶尔违反指令,加 markdown 代码块,必须有 fallback
  • 段落级评分(不等整场结束)—— UX 上用户每说完一段立刻看到分数变化,沉浸感强;工程上利用空闲 token 配额,不集中在最后爆发

6. Token 成本控制 · 5 个降本工程实践

未做控制时,1 场 30 分钟 mock 对话的 token 成本可达 $10+。我做了 30 天实测,对比 6 个主流模型:

模型 单场 token 单场成本 中文质量 国内合规
GPT-4o ~30K $0.5-2 ⚠️ 需代理
GPT-4o Mini ~30K $0.05-0.2 一般 ⚠️ 需代理
Claude Sonnet 3.5 ~30K $0.3-1 ⚠️ 需代理
通义 qwen-max ~30K ¥0.3-1
通义 qwen-turbo ~30K ¥0.05-0.2 一般
文心 4.0 ~30K ¥0.3-1

5 个降本工程实践(按实际 ROI 排序):

  1. Prompt Cache(Anthropic / OpenAI 都支持)—— system message 不变的部分 cache 起来,减少 60-80% 重复 token。Claude 的 cache 写入有 1.25x 写入费用,但读取仅 0.1x,长会话整体节省可观
  2. Chunk 控制 —— 评分 prompt 只传最近 1 段而非全历史,token 减少 70%
  3. Token 预算硬上限 —— 单场设 max_tokens=2000,防止 LLM 进入「冗长循环」失控
  4. 调用次数限流 —— 单场对话内 LLM 调用不超过 20 次,防前端 bug 反复触发
  5. 混合模型策略 —— 高峰期评分降级到 mini 模型,主对话保持主力模型,混合用比单一主力便宜 60%

7. 5 个反模式 · 多数项目第一次都会踩

反模式 1 · 不过滤 OpenAI SDK stream 的第一个 chunk

# ❌ 错误版(拼接 None)
async for chunk in stream:
    yield chunk.choices[0].delta.content  # 第 1 个 chunk 这里是 None

# ✅ 正确版
async for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        yield content

OpenAI SDK 文档对此说明不显眼,容易被忽略。漏过滤会导致前端拼接出 "None记住,..." 这类 bug,且本地短测试不易复现(短输出第一个 chunk 偶尔就有内容)。

反模式 2 · nginx 默认 buffer SSE / WebSocket

# ❌ 默认配置 SSE 会卡死
proxy_pass http://...;

# ✅ 必加(按场景)
proxy_buffering off;                           # SSE 必加
proxy_cache off;                                # SSE 必加
proxy_set_header Upgrade $http_upgrade;        # WS 必加
proxy_set_header Connection "upgrade";          # WS 必加
proxy_read_timeout 3600s;                       # WS 长连接必加

本地 work / 生产卡住 = 经典「环境差异 bug」。nginx 默认 buffer 整段后才推,是为了优化静态资源场景,对长连接流式场景反而是反优化。

反模式 3 · 评分 prompt 温度过高导致结果不稳定

temperature=0.7(默认)同样输入分数 60-80 分波动;temperature=0.2 才能稳定到 ±2 分内。这个工程权衡在 OpenAI 文档里没强调,许多教程默认 0.7 就直接抄。评分类、分类类、提取类任务温度必须低

反模式 4 · 通义 SDK 同步调用阻塞 asyncio loop

# ❌ 直接调会卡住整个 event loop
response = Generation.call(...)

# ✅ 必须 wrap 到 executor
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, lambda: Generation.call(...))

通义 dashscope SDK 至 2026 中仍未提供 async 版本,这是与 OpenAI 生态的关键差异。架构上,所有阻塞 SDK 都应该统一通过 run_in_executor 处理,避免影响其他并发请求。

反模式 5 · 通义 stream 默认是「累积模式」

# ❌ 不加 incremental_output=True
# 每个 chunk 是「从头到当前的累积内容」
# 前端拼接会出现「记住记住记住,记住,面试官...」

# ✅ 必加
response = Generation.call(..., stream=True, incremental_output=True)

OpenAI 默认增量输出,通义反而要显式开启增量。这种「同语义不同默认值」是跨 SDK 适配最坑的地方,必须在 Adapter 层统一抽象。


8. 源码 + 收尾

完整 demo 项目(FastAPI + React + Postgres + Redis · ~2000 行核心代码):

GitHub repo: github.com/<placeholder>/llm-mock-framework
(5/8 起持续更新 · 后续 LangGraph / RAG / Whisper 集成专题陆续推出)

技术栈总结:

  • Backend · FastAPI + LangChain + Postgres + Redis
  • Frontend · React + EventSource + WebSocket
  • LLM · OpenAI / Claude / 通义 / 文心(Adapter 层热切换)
  • 部署 · Docker + nginx · 单 ECS 4 核 8G 撑 50 并发会话

如果你正在为团队评审 LLM 应用 / AI Agent / RAG / 教育 SaaS / 客服系统的工程方案,这 8 个架构决策应该都能直接借鉴。我后续会持续在这个专栏写:

  • LangGraph 多 agent 协作的状态机设计与回滚机制
  • RAG 向量检索性能优化(pgvector vs Pinecone vs Milvus 实测对比)
  • Anthropic Prompt Caching 实测节省 65% token 的工程实践
  • Whisper.cpp 端侧 STT 集成的 5 个工程权衡
  • LLM 应用 SRE · 生产环境的 50 个监控指标与告警阈值

欢迎评论区交流你团队的 LLM 应用工程实战,GitHub issue / 私信都可。


标签:Python · LangChain · SSE · WebSocket · LLM · OpenAI · 通义千问 · FastAPI · 后端开发 · AI Agent · 架构设计

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐