玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


深度剖析 FastAPI 在 LLM 应用中的通讯架构:SSE、WebSocket、HTTP 及更多协议的实战指南

引言:一个真实的困扰

你有没有遇到过这样的场景?你信心满满地搭建了一个基于大语言模型的智能对话系统,用户体验却总是差了那么一口气。当用户在等待模型生成回答时,页面就像卡住了一样,没有任何反馈——直到最后突然“蹦”出一大段完整文字。用户开始抱怨:“怎么这么慢?”“是不是卡住了?”“能不能像 ChatGPT 那样一个字一个字地出来?”

这些问题,本质上都指向了一个核心挑战:如何让 LLM 应用与用户之间建立高效、实时、低延迟的通讯?

传统的 Web 应用走的是“请求-响应”模式:客户端发一个请求,服务器处理完,返回一个完整的响应,交易就结束了。但 LLM 应用完全不同——模型生成内容是一个“流式”的过程,token 是一个一个吐出来的。在这种情况下,如果我们还是沿用传统 Web 的思路,用户体验就会非常糟糕。

这就是为什么我们在 LLM 应用中需要引入多种通讯协议:SSE、WebSocket、HTTP Streaming 等等。每种协议都有自己的适用场景,选对了协议,用户体验飞升;选错了,轻则体验糟糕,重则系统崩溃。

这篇文章,我会结合我在实际项目中踩过的坑,系统性地讲解 FastAPI 支持的各种通讯协议,帮你彻底搞懂它们的原理、适用场景,以及最关键的——如何在实际项目中正确选型和实现。


一、为什么 LLM 应用需要特殊的通讯协议?

1.1 传统 Web 通讯的困境

在深入讨论之前,我们先来看看传统 Web 通讯模式遇到了什么问题。

想象一下这样的场景:用户在智能辅导系统中输入了一道数学题,系统需要调用 LLM 来分析这道题涉及哪些知识点,给出解题思路,并推荐类似的练习题。这是一个典型的 LLM 应用场景。

如果我们用传统的 HTTP 通讯模式,整个过程是这样的:

LLM 服务 FastAPI 服务器 用户浏览器 LLM 服务 FastAPI 服务器 用户浏览器 等待模型生成完整回答... POST /analyze - 发送题目 调用模型分析 返回完整回答(可能几千字) HTTP 200 - 返回完整响应

问题在哪里?你发现了吗——用户需要等待模型把每一个 token 都生成完毕,才能看到任何内容。如果模型生成一段 500 字的分析需要 10 秒,那用户就得在屏幕上盯着加载动画整整 10 秒。

这在用户体验上是不可接受的。ChatGPT 之所以用户体验好,正是因为它采用流式输出——模型每生成一个词,用户就能看到,延迟感知从 10 秒变成了几十毫秒。

1.2 LLM 应用的通讯需求图谱

LLM 应用的通讯需求远比传统 Web 应用复杂。让我用一张图来概括:

LLM 应用通讯需求

流式文本生成

单向 Server→Client

低延迟

OpenAI 兼容格式

实时对话

双向 Client↔Server

低延迟

状态保持

工具调用

请求-响应

离散事件

可能需要等待

多模态交互

高带宽

二进制数据

低延迟

多人协作

广播

状态同步

从这张图可以看出,LLM 应用需要支持的通讯场景是多样化的,没有任何单一协议能够完美覆盖所有场景。这就是为什么我们需要在 FastAPI 中灵活运用多种协议。


二、HTTP 协议在 LLM 应用中的深度应用

2.1 HTTP/1.1 到 HTTP/3 的演进

在讨论具体的协议之前,我们需要理解 HTTP 协议本身的演进,因为不同的 HTTP 版本对通讯性能有根本性影响。

HTTP/1.1 是我们最熟悉的版本,它引入了 Keep-Alive 机制,允许在同一个 TCP 连接上发送多个请求/响应。但这远远不够——因为 HTTP/1.1 仍然是“请求-响应”模式,同一个连接上只能有一个请求在处理(队头阻塞)。

HTTP/2 解决了这个问题。它引入了多路复用(Multiplexing),允许在同一个 TCP 连接上同时传输多个请求和响应。想象一下,你在一个网页上同时加载图片、脚本、样式表——HTTP/2 之前浏览器需要为每个资源建立独立的连接,HTTP/2 之后一个连接就够了。

HTTP/3 更进一步,它不再使用 TCP 作为传输层,而是使用了 QUIC 协议。QUIC 本身支持多路复用,而且解决了 TCP 的队头阻塞问题——即使一个数据包丢失了,其他数据包仍然可以继续传输。

对于 LLM 应用来说,这个演进意味着:

  • HTTP/1.1:基本不用考虑,不支持流式响应
  • HTTP/2:可以支持流式,但需要特殊的帧机制
  • HTTP/3:最佳选择,特别是对于移动网络环境

不过,FastAPI 默认运行在 HTTP/1.1 上,我们需要通过配置来启用 HTTP/2。更重要的是,真正影响 LLM 通讯体验的,是 SSE 和 WebSocket 这两个建立在 HTTP 之上的“子协议”。

2.2 FastAPI 中的 HTTP 请求处理机制

FastAPI 基于 Starletter 构建,继承了 Python ASGI 模型的强大能力。理解 FastAPI 如何处理 HTTP 请求,是掌握其他协议的基础。

当你定义这样一个端点时:

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import time

app = FastAPI()

@app.post("/llm/analyze")
async def analyze_question(request: Request):
    """
    分析用户问题的 LLM 端点

    这个端点展示了 FastAPI 处理 HTTP 请求的基本流程:
    1. 接收请求体
    2. 解析 JSON 数据
    3. 调用 LLM 处理
    4. 返回响应

    Args:
        request: FastAPI 请求对象,包含请求头、查询参数、正文等

    Returns:
        JSON 格式的分析结果
    """
    # 第一步:解析请求体
    body = await request.json()
    question = body.get("question", "")

    # 第二步:调用 LLM(这里用模拟数据演示)
    # 实际项目中这里会调用 OpenAI、Claude 等 API
    await time.sleep(0.1)  # 模拟 LLM 调用延迟

    # 第三步:构建响应
    result = {
        "question": question,
        "knowledge_points": ["函数", "参数", "默认值"],
        "difficulty": "中等",
        "analysis": "这道题考查的是 Python 函数的默认参数特性..."
    }

    return JSONResponse(content=result)

FastAPI 内部发生了什么?让我用流程图来解释:

FastAPI 应用

客户端

ASGI 服务器 (Uvicorn/Hypercorn)

ASGI Scope

receive()

send()

HTTP 请求

路由匹配

依赖注入

业务逻辑

响应序列化

关键点在于 asyncawait。FastAPI 是基于 asyncio 的异步框架,这意味着在等待 LLM 响应的时候,服务器可以释放出线程去处理其他请求。这就是为什么我们在定义 LLM 调用时,应该使用 async def

# 推荐的写法:使用 async def
@app.post("/llm/analyze")
async def analyze_question(request: Request):
    body = await request.json()
    # 使用 await 调用异步 LLM 客户端
    result = await llm_client.agenerate(prompt)
    return result

# 不推荐的写法:使用同步 def(会阻塞事件循环)
@app.post("/llm/analyze")
def analyze_question_sync(request: Request):
    body = request.json()  # 同步读取
    result = llm_client.generate(prompt)  # 同步调用
    return result

2.3 流式响应与 Server-Sent Events 的区别

这里有一个常见的误区:很多人把“流式响应”和 Server-Sent Events(SSE)混为一谈。让我解释它们的区别。

HTTP Streaming(HTTP 流式响应)是 HTTP/1.1 和 HTTP/2 的原生特性。它的核心原理是:服务器在收到完整响应之前,就可以开始向客户端发送数据。HTTP 分块传输编码(Chunked Transfer Encoding)是实现这个功能的关键机制。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.get("/http-stream")
async def http_stream():
    """
    HTTP Streaming 示例

    使用分块传输编码,服务器可以逐步发送响应体。
    客户端不需要等待完整响应,就可以开始处理接收到的数据块。

    这种方式适用于:
    - 大文件下载
    - 长计算任务
    - LLM 流式输出

    Returns:
        流式响应,Content-Type 取决于具体实现
    """
    async def generate():
        # 分块发送数据,每块之间可以有延迟
        for i in range(10):
            # 模拟数据生成
            data = f"Chunk {i}: 数据内容...\n"
            yield data
            await asyncio.sleep(0.5)  # 模拟处理延迟

    return StreamingResponse(
        generate(),
        media_type="text/plain"  # 普通文本
    )

Server-Sent Events (SSE) 是建立在 HTTP Streaming 之上的一个更具体的协议。它的设计目标是:让服务器能够单向地向客户端推送数据。SSE 有一些重要的特性:

  1. Content-Type 必须是 text/event-stream
  2. 自动重连:浏览器会在连接断开后自动重新建立连接
  3. 事件格式:支持命名事件,便于客户端区分不同类型的推送
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
import time

app = FastAPI()

@app.get("/sse-stream")
async def sse_stream():
    """
    Server-Sent Events 示例

    SSE 是 HTTP Streaming 的一种具体实现,专为服务器向客户端
    单向推送数据而设计。

    SSE 的关键特性:
    1. 自动重连:浏览器原生支持
    2. 事件类型:可以通过 "event:" 前缀命名不同事件
    3. ID 追踪:可以通过 "id:" 追踪消息序号
    4. 重试间隔:可以通过 "retry:" 设置重试间隔

    适用于:
    - LLM 流式输出
    - 实时进度通知
    - 股票行情推送
    - 服务器监控数据

    Returns:
        text/event-stream 格式的流式响应
    """
    async def generate():
        # SSE 格式:每个事件以 "data: " 开头,以空行结束
        for i in range(10):
            # 构造 SSE 格式的数据
            data = json.dumps({
                "chunk": i,
                "content": f"这是第 {i} 段内容",
                "timestamp": time.time()
            })
            yield f"data: {data}\n\n"
            await asyncio.sleep(0.5)

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",  # 关键:必须是这个 MIME 类型
        headers={
            # 缓存控制:禁止缓存
            "Cache-Control": "no-cache",
            # 连接保持:不关闭连接
            "Connection": "keep-alive",
            # 禁用 Nginx 缓冲:确保实时推送
            "X-Accel-Buffering": "no"
        }
    )

总结一下两者的关系

Server-Sent Events

HTTP 协议

特性

特性

特性

HTTP Streaming\n(分块传输)

SSE 协议\n(建立在 HTTP Streaming 之上)

自动重连

事件格式

ID 追踪


三、Server-Sent Events 深度剖析

3.1 SSE 的技术原理

SSE 的原理其实并不复杂。服务器在响应头中声明 Content-Type: text/event-stream,然后就可以在一个 HTTP 连接上持续向客户端发送数据。

让我用一张图来解释完整的工作流程:

LLM 服务 FastAPI 服务器 浏览器 LLM 服务 FastAPI 服务器 浏览器 连接保持打开 loop [LLM 生成过程中] 连接关闭(或者保持等待新数据) GET /sse/chat (发起请求) HTTP 200 + Content-Type: text/event-stream 请求生成 (1) 返回 token 1 (2) data: {"content": "你"} 请求生成 (3) 返回 token 2 (4) data: {"content": "好"} 生成完成 data: [DONE]

SSE 的数据格式非常简洁:

data: {"message": "Hello"}
data: {"message": "World"}
data: [DONE]

每条消息以 data: 开头,以两个换行符结束。客户端可以通过 EventSource API 来监听这些事件:

// 浏览器端使用 EventSource 监听 SSE
const eventSource = new EventSource('/sse/chat');

eventSource.onmessage = function(event) {
    const data = JSON.parse(event.data);
    console.log('收到消息:', data);

    // 检查是否结束
    if (event.data === '[DONE]') {
        eventSource.close();
    }
};

eventSource.onerror = function(error) {
    console.error('SSE 错误:', error);
    eventSource.close();
};

3.2 FastAPI 中 SSE 的最佳实践

在生产环境中使用 SSE,我们需要考虑很多实际因素:连接管理、断线重连、心跳保活、错误处理等等。

让我给你一个完整的、生产级别的 SSE 实现:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from typing import Dict, Set, Optional, Any
import asyncio
import json
import time
import uuid
from datetime import datetime
from contextlib import asynccontextmanager

app = FastAPI()

class SSEManager:
    """
    SSE 连接管理器

    这个类负责管理所有活跃的 SSE 连接。
    在生产环境中,你可能需要将这个管理器与 Redis 等
    分布式存储结合使用,以支持多实例部署。

    主要功能:
    1. 连接管理:追踪所有活跃连接
    2. 心跳保活:定期发送心跳,维持连接
    3. 广播支持:向所有连接或特定连接发送消息
    4. 连接清理:自动移除断开连接

    Attributes:
        connections: 活跃的 SSE 连接字典
        heartbeat_interval: 心跳间隔(秒)
    """

    def __init__(self, heartbeat_interval: int = 30):
        """
        初始化 SSE 管理器

        Args:
            heartbeat_interval: 心跳发送间隔,单位秒,默认 30 秒
        """
        # 存储所有活跃连接:{client_id: asyncio.Queue}
        # 使用 Queue 可以方便地实现消息队列
        self.connections: Dict[str, asyncio.Queue] = {}
        # 心跳间隔
        self.heartbeat_interval = heartbeat_interval

    async def connect(self, client_id: str) -> asyncio.Queue:
        """
        建立新的 SSE 连接

        Args:
            client_id: 客户端唯一标识

        Returns:
            用于发送消息的 Queue 对象
        """
        # 创建一个消息队列,用于该连接
        queue = asyncio.Queue(maxsize=100)
        self.connections[client_id] = queue

        print(f"SSE 客户端连接: {client_id}, 当前连接数: {len(self.connections)}")

        # 启动心跳任务(后台运行)
        asyncio.create_task(self._heartbeat(client_id))

        return queue

    async def disconnect(self, client_id: str):
        """
        断开 SSE 连接

        Args:
            client_id: 要断开的客户端标识
        """
        if client_id in self.connections:
            # 关闭队列,通知客户端连接已断开
            await self.connections[client_id].put(None)
            del self.connections[client_id]
            print(f"SSE 客户端断开: {client_id}, 当前连接数: {len(self.connections)}")

    async def send(self, client_id: str, data: Dict[str, Any]):
        """
        向特定客户端发送消息

        Args:
            client_id: 目标客户端标识
            data: 要发送的数据(会被序列化为 JSON)
        """
        if client_id in self.connections:
            await self.connections[client_id].put(json.dumps(data))

    async def broadcast(self, data: Dict[str, Any]):
        """
        广播消息到所有客户端

        Args:
            data: 要广播的数据
        """
        # 遍历所有连接发送消息
        disconnected = []
        for client_id in self.connections:
            try:
                await self.send(client_id, data)
            except Exception as e:
                print(f"广播失败 {client_id}: {e}")
                disconnected.append(client_id)

        # 清理断开的连接
        for client_id in disconnected:
            await self.disconnect(client_id)

    async def _heartbeat(self, client_id: str):
        """
        心跳任务

        定期向客户端发送心跳,保持连接活跃。
        如果连续多次心跳失败,则断开连接。

        Args:
            client_id: 要心跳的客户端标识
        """
        consecutive_failures = 0
        max_failures = 3

        try:
            while client_id in self.connections:
                await asyncio.sleep(self.heartbeat_interval)

                # 发送心跳(使用特殊的 ping 事件)
                try:
                    await self.send(client_id, {
                        "type": "ping",
                        "timestamp": time.time()
                    })
                    consecutive_failures = 0
                except Exception:
                    consecutive_failures += 1
                    if consecutive_failures >= max_failures:
                        break
        except asyncio.CancelledError:
            # 任务被取消,正常退出
            pass
        except Exception as e:
            print(f"心跳任务异常 {client_id}: {e}")
        finally:
            # 清理连接
            await self.disconnect(client_id)


# 创建全局 SSE 管理器实例
sse_manager = SSEManager(heartbeat_interval=30)


@app.get("/sse/chat/{client_id}")
async def sse_chat_endpoint(client_id: str):
    """
    LLM 聊天 SSE 端点

    这是一个典型的 LLM 流式输出端点。
    客户端通过 EventSource 连接到这个端点,
    服务器会将 LLM 生成的 token 实时推送过来。

    SSE 格式说明:
    - data: {"type": "content", "content": "token"} - 内容块
    - data: {"type": "done"} - 生成完成
    - data: {"type": "error", "message": "..."} - 错误信息

    Args:
        client_id: 客户端唯一标识,用于区分不同会话

    Returns:
        SSE 流式响应
    """
    async def generate():
        # 建立连接
        queue = await sse_manager.connect(client_id)

        try:
            # 持续监听消息队列
            while client_id in sse_manager.connections:
                # 等待消息,超时则发送心跳
                try:
                    message = await asyncio.wait_for(
                        queue.get(),
                        timeout=sse_manager.heartbeat_interval
                    )
                except asyncio.TimeoutError:
                    # 心跳:发送 ping
                    yield f"data: {json.dumps({'type': 'ping'})}\n\n"
                    continue

                # 收到 None 表示连接断开
                if message is None:
                    break

                # 发送实际数据(SSE 格式)
                yield f"data: {message}\n\n"

        except Exception as e:
            # 发送错误信息
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
        finally:
            # 断开连接
            await sse_manager.disconnect(client_id)

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )


@app.post("/llm/chat")
async def llm_chat_endpoint(client_id: str, message: str):
    """
    触发 LLM 处理的端点

    客户端通过这个端点发送消息,服务器处理后通过 SSE 推送结果。

    Args:
        client_id: 客户端标识
        message: 用户消息

    Returns:
        处理状态
    """
    # 这里可以调用 LLM 处理
    # 为简化演示,我们模拟流式输出

    # 模拟 LLM 流式生成
    words = ["你好", ",", "感谢", "你的", "提问", "。", "我", "会", "尽力", "帮助", "你", "。"]

    for i, word in enumerate(words):
        # 通过 SSE 发送每个 token
        await sse_manager.send(client_id, {
            "type": "content",
            "content": word,
            "index": i
        })
        await asyncio.sleep(0.1)  # 模拟生成延迟

    # 发送完成信号
    await sse_manager.send(client_id, {
        "type": "done"
    })

    return {"status": "processing", "client_id": client_id}

3.3 SSE 与 LLM 流式输出的完美结合

SSE 是 LLM 流式输出最常用的协议,主要原因是它与 OpenAI 的 API 格式完全兼容。

让我给你展示如何实现 OpenAI 兼容的流式响应:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional
import asyncio
import json
import uuid
from datetime import datetime

app = FastAPI()


class Message(BaseModel):
    """
    聊天消息模型

    用于描述对话中的一条消息。
    每个消息包含角色(role)和内容(content)。

    Attributes:
        role: 消息角色,system/user/assistant 三种
        content: 消息内容
    """
    role: str
    content: str


class ChatCompletionRequest(BaseModel):
    """
    Chat Completion 请求模型

    对应 OpenAI 的 Chat Completions API 请求格式。
    支持流式和非流式两种模式。

    Attributes:
        model: 模型名称
        messages: 消息列表
        stream: 是否流式输出,默认 False
        temperature: 温度参数,控制随机性
        max_tokens: 最大生成 token 数
    """
    model: str
    messages: List[Message]
    stream: Optional[bool] = False
    temperature: Optional[float] = Field(default=1.0, ge=0, le=2)
    max_tokens: Optional[int] = None


async def generate_openai_stream(
    messages: List[Message],
    model: str = "gpt-3.5-turbo"
):
    """
    生成 OpenAI 兼容格式的流式响应

    这个生成器实现了 OpenAI 的流式响应格式。
    客户端可以使用 OpenAI SDK 的流式模式来消费这个响应。

    OpenAI 流式响应格式:
    1. 首次响应:包含 role 信息
    2. 中间响应:包含 content 内容块
    3. 最后响应:finish_reason 为 "stop"
    4. 结束信号:data: [DONE]

    Args:
        messages: 对话消息历史
        model: 模型名称

    Yields:
        SSE 格式的数据字符串
    """
    request_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
    created = int(datetime.now().timestamp())

    # 模拟的响应内容
    response_text = "这是一段模拟的 LLM 流式响应。在实际应用中,这里会连接真实的 LLM API。"

    # 步骤 1:首次响应 - 发送角色信息
    # OpenAI 格式要求首次响应必须包含 delta.role
    first_response = {
        "id": request_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [{
            "delta": {"role": "assistant"},
            "index": 0,
            "finish_reason": None
        }]
    }
    yield f"data: {json.dumps(first_response)}\n\n"

    # 步骤 2:流式发送内容
    # 每个 token(或词)作为一个单独的 chunk 发送
    words = response_text.split()
    for i, word in enumerate(words):
        chunk_response = {
            "id": request_id,
            "object": "chat.completion.chunk",
            "created": created,
            "model": model,
            "choices": [{
                "delta": {"content": word + " "},
                "index": 0,
                "finish_reason": None
            }]
        }
        yield f"data: {json.dumps(chunk_response)}\n\n"

        # 模拟 LLM 生成延迟
        await asyncio.sleep(0.05)

    # 步骤 3:最后响应 - finish_reason 为 "stop"
    # 表示生成完成
    final_response = {
        "id": request_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [{
            "delta": {},
            "index": 0,
            "finish_reason": "stop"
        }]
    }
    yield f"data: {json.dumps(final_response)}\n\n"

    # 步骤 4:发送结束信号
    # 客户端看到这个信号后应该关闭 EventSource
    yield "data: [DONE]\n\n"


@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    """
    OpenAI 兼容的 Chat Completions API

    这个端点实现了与 OpenAI API 兼容的接口。
    客户端可以直接使用 OpenAI SDK 来调用这个端点。

    支持两种模式:
    - stream=False:一次性返回完整响应
    - stream=True:流式返回响应块

    Args:
        request: Chat Completion 请求

    Returns:
        流式或非流式响应
    """
    if request.stream:
        # 流式模式:使用 SSE
        return StreamingResponse(
            generate_openai_stream(request.messages, request.model),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "X-Accel-Buffering": "no",
            }
        )
    else:
        # 非流式模式:直接返回完整响应
        # 这里用模拟数据演示
        return {
            "id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
            "object": "chat.completion",
            "created": int(datetime.now().timestamp()),
            "model": request.model,
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": "这是非流式响应的完整内容"
                },
                "finish_reason": "stop",
                "index": 0
            }]
        }

3.4 SSE 的适用场景与局限性

SSE 的优势

  1. 简单易用:只需要设置正确的 Content-Type,不需要复杂的协议握手
  2. 浏览器原生支持:所有现代浏览器都支持 EventSource API
  3. 自动重连:浏览器会自动处理断线重连
  4. OpenAI 兼容:与主流 LLM API 格式一致

SSE 的局限

  1. 单向通讯:只能服务器→客户端,客户端无法通过同一连接发送数据
  2. 连接数限制:浏览器对同一域名的 SSE 连接数有限制(通常 6 个)
  3. 无二进制支持:只能传输文本数据
  4. 需要长连接:不适合需要在大量设备间频繁切换的场景

四、WebSocket 实战

4.1 WebSocket 与 SSE 的对比

当我们需要双向通讯时,SSE 就不够用了。这就是 WebSocket 的用武之地。

让我用一张图来对比两者:

WebSocket

全双工

服务器

客户端

Server-Sent Events

单向推送

服务器

客户端

特性 SSE WebSocket
通讯方向 单向(Server→Client) 全双工
协议基础 HTTP 独立 WebSocket 协议
连接建立 普通 HTTP 请求 升级握手
浏览器支持 EventSource API 原生 WebSocket API
自动重连 浏览器原生支持 需手动实现
二进制数据 不支持 支持
防火墙兼容性 一般

我的经验法则

  • 只需要服务器推送:用 SSE(更简单,浏览器原生支持)
  • 需要双向通讯:用 WebSocket
  • 需要传输二进制:用 WebSocket
  • 需要极低延迟:用 WebSocket
  • 需要服务间通讯:用 WebSocket 或 gRPC

4.2 FastAPI 中 WebSocket 的基础实现

WebSocket 在 FastAPI 中有原生支持,API 设计得非常好:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import asyncio
import json

app = FastAPI()


class ConnectionManager:
    """
    WebSocket 连接管理器

    这个类封装了 WebSocket 连接管理的常见操作:
    1. 连接建立与断开
    2. 消息广播
    3. 个人消息发送

    在实际项目中,你可能需要:
    - 将连接信息持久化到 Redis
    - 支持房间/群组功能
    - 添加认证和授权

    Attributes:
        active_connections: 活跃连接字典,{client_id: websocket}
    """

    def __init__(self):
        """
        初始化连接管理器

        创建一个空白的连接字典。
        """
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, client_id: str, websocket: WebSocket):
        """
        接受并注册新的 WebSocket 连接

        WebSocket 连接需要先通过"握手"建立。
        accept() 方法完成握手过程。

        Args:
            client_id: 客户端唯一标识
            websocket: WebSocket 连接对象
        """
        # 接受连接,完成 WebSocket 握手
        await websocket.accept()
        # 注册到连接字典
        self.active_connections[client_id] = websocket
        print(f"WebSocket 连接建立: {client_id}")

    def disconnect(self, client_id: str):
        """
        断开 WebSocket 连接

        从连接字典中移除该客户端。

        Args:
            client_id: 要断开的客户端标识
        """
        if client_id in self.active_connections:
            del self.active_connections[client_id]
            print(f"WebSocket 连接断开: {client_id}")

    async def send_personal_message(self, message: str, client_id: str):
        """
        向特定客户端发送消息

        Args:
            message: 消息内容(会被转换为 JSON)
            client_id: 目标客户端标识

        Raises:
            KeyError: 如果客户端不存在
        """
        if client_id in self.active_connections:
            await self.active_connections[client_id].send_text(message)

    async def broadcast(self, message: str):
        """
        广播消息到所有连接

        遍历所有活跃连接,发送同一消息。
        如果某个连接发送失败,会断开该连接。

        Args:
            message: 要广播的消息内容
        """
        disconnected_clients = []
        for client_id, connection in self.active_connections.items():
            try:
                await connection.send_text(message)
            except Exception as e:
                print(f"广播失败 {client_id}: {e}")
                disconnected_clients.append(client_id)

        # 清理断开的连接
        for client_id in disconnected_clients:
            self.disconnect(client_id)


# 创建全局连接管理器实例
manager = ConnectionManager()


@app.websocket("/ws/chat/{client_id}")
async def websocket_chat(websocket: WebSocket, client_id: str):
    """
    WebSocket 聊天端点

    这是一个典型的 WebSocket 端点。
    与 SSE 不同,WebSocket 可以双向通讯:
    - 客户端可以主动发送消息
    - 服务器可以主动推送消息

    WebSocket 消息流程:
    1. 客户端发起 WebSocket 连接
    2. 服务器接受连接(accept)
    3. 双方可以随时发送消息
    4. 任意一方可以主动断开

    Args:
        websocket: WebSocket 连接对象
        client_id: 客户端标识,用于区分不同用户
    """
    # 建立连接
    await manager.connect(client_id, websocket)

    try:
        # 持续监听客户端消息
        while True:
            # receive_text() 会阻塞,直到收到消息
            # 这是 WebSocket 和 SSE 的关键区别:
            # SSE 只能被动等待推送,WebSocket 可以主动接收
            data = await websocket.receive_text()
            message = json.loads(data)

            print(f"收到 {client_id} 消息: {message}")

            # 处理消息并响应
            response = {
                "type": "response",
                "content": f"收到: {message.get('content', '')}",
                "original": message
            }
            await manager.send_personal_message(json.dumps(response), client_id)

    except WebSocketDisconnect:
        # 客户端主动断开
        manager.disconnect(client_id)
        print(f"客户端 {client_id} 断开连接")
    except Exception as e:
        # 其他异常
        print(f"WebSocket 错误: {e}")
        manager.disconnect(client_id)

4.3 心跳机制与保活

WebSocket 是长连接,如果中间的网络设备(如防火墙、负载均衡器)认为连接空闲,可能会主动断开。这就是为什么我们需要心跳机制:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import asyncio
import json
import time
import uuid

app = FastAPI()


class WebSocketWithHeartbeat:
    """
    带心跳机制的 WebSocket 管理器

    WebSocket 连接在长时间空闲时可能被网络设备断开。
    心跳机制通过定期发送小数据包来保持连接活跃。

    工作原理:
    1. 客户端或服务器定期发送 ping 消息
    2. 收到 pong 消息表示连接正常
    3. 如果超时未收到响应,断开连接

    Attributes:
        heartbeat_interval: 心跳间隔(秒)
        timeout: 等待响应超时时间(秒)
    """

    def __init__(self, heartbeat_interval: int = 30, timeout: int = 10):
        self.active_connections: Dict[str, WebSocket] = {}
        self.heartbeat_interval = heartbeat_interval
        self.timeout = timeout
        self.heartbeat_tasks: Dict[str, asyncio.Task] = {}

    async def connect(self, client_id: str, websocket: WebSocket):
        """建立连接并启动心跳"""
        await websocket.accept()
        self.active_connections[client_id] = websocket

        # 启动心跳任务
        self.heartbeat_tasks[client_id] = asyncio.create_task(
            self._heartbeat_loop(client_id, websocket)
        )

        print(f"WebSocket 连接建立(含心跳): {client_id}")

    async def _heartbeat_loop(self, client_id: str, websocket: WebSocket):
        """
        心跳循环

        定期发送 ping 消息,检测连接是否存活。

        Args:
            client_id: 客户端标识
            websocket: WebSocket 连接
        """
        try:
            while True:
                # 等待心跳间隔
                await asyncio.sleep(self.heartbeat_interval)

                if client_id not in self.active_connections:
                    break

                try:
                    # 发送 ping
                    await websocket.send_json({
                        "type": "ping",
                        "timestamp": time.time()
                    })

                    # 等待 pong 响应(使用 asyncio.wait_for 实现超时)
                    # 注意:这里简化了,实际应该等待客户端的 pong 响应
                    # 真正的 ping/pong 机制需要更复杂的实现

                except Exception as e:
                    # 发送失败,可能是连接已断开
                    print(f"心跳失败 {client_id}: {e}")
                    break

        except asyncio.CancelledError:
            # 任务被取消,正常退出
            pass
        finally:
            # 清理
            if client_id in self.active_connections:
                await self._close_connection(client_id)

    async def _close_connection(self, client_id: str):
        """关闭连接并清理资源"""
        if client_id in self.active_connections:
            try:
                await self.active_connections[client_id].close()
            except Exception:
                pass
            del self.active_connections[client_id]

        if client_id in self.heartbeat_tasks:
            self.heartbeat_tasks[client_id].cancel()
            del self.heartbeat_tasks[client_id]

        print(f"连接已清理: {client_id}")

    async def disconnect(self, client_id: str):
        """断开连接"""
        # 取消心跳任务
        if client_id in self.heartbeat_tasks:
            self.heartbeat_tasks[client_id].cancel()
            del self.heartbeat_tasks[client_id]

        # 关闭连接
        await self._close_connection(client_id)


heartbeat_manager = WebSocketWithHeartbeat(heartbeat_interval=30)


@app.websocket("/ws/chat/heartbeat/{client_id}")
async def websocket_with_heartbeat(websocket: WebSocket, client_id: str):
    """带心跳的 WebSocket 聊天端点"""
    await heartbeat_manager.connect(client_id, websocket)

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            # 处理消息
            if message.get("type") == "pong":
                # 收到 pong 响应,心跳正常
                print(f"收到 {client_id} 的 pong")

            # 处理其他消息...
            response = {
                "type": "response",
                "content": f"收到: {message.get('content', '')}"
            }
            await heartbeat_manager.active_connections[client_id].send_json(response)

    except WebSocketDisconnect:
        await heartbeat_manager.disconnect(client_id)

4.4 房间管理功能

在很多应用场景中,我们需要“房间”功能:比如群聊、协作编辑、多人游戏等。让我给你一个完整的实现:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set, Optional
import asyncio
import json

app = FastAPI()


class RoomManager:
    """
    WebSocket 房间管理器

    这个类实现了房间/群组功能:
    1. 客户端可以加入/离开房间
    2. 房间内的消息会广播给所有人
    3. 支持系统通知(用户加入/离开)
    4. 自动清理空房间

    数据结构设计:
    - rooms: {room_id: {client_id: websocket}} - 房间成员
    - client_rooms: {client_id: room_id} - 客户端所在房间

    Attributes:
        rooms: 房间成员字典
        client_rooms: 客户端房间映射
    """

    def __init__(self):
        # 房间成员: {room_id: {client_id: websocket}}
        self.rooms: Dict[str, Dict[str, WebSocket]] = {}
        # 客户端所在房间: {client_id: room_id}
        self.client_rooms: Dict[str, str] = {}
        # 客户端信息: {client_id: metadata}
        self.client_info: Dict[str, dict] = {}

    async def join_room(self, room_id: str, client_id: str, websocket: WebSocket, info: dict = None):
        """
        客户端加入房间

        如果客户端已经在其他房间,会先让其离开。
        如果房间不存在,会创建新房间。

        Args:
            room_id: 房间标识
            client_id: 客户端标识
            websocket: WebSocket 连接
            info: 客户端附加信息(如用户名)
        """
        # 如果已在其他房间,先退出
        if client_id in self.client_rooms:
            await self.leave_room(client_id)

        # 创建房间(如果不存在)
        if room_id not in self.rooms:
            self.rooms[room_id] = {}

        # 添加成员
        self.rooms[room_id][client_id] = websocket
        self.client_rooms[client_id] = room_id
        self.client_info[client_id] = info or {}

        print(f"客户端 {client_id} 加入房间 {room_id}")

        # 通知房间内其他人
        await self._broadcast_to_room(
            room_id,
            {
                "type": "system",
                "action": "join",
                "client_id": client_id,
                "message": f"用户 {info.get('username', client_id)} 加入了房间"
            },
            exclude=[client_id]
        )

    async def leave_room(self, client_id: str):
        """
        客户端离开房间

        从房间中移除客户端,如果房间为空则删除房间。
        会通知房间内其他人。

        Args:
            client_id: 要离开的客户端标识
        """
        if client_id not in self.client_rooms:
            return

        room_id = self.client_rooms[client_id]
        client_info = self.client_info.get(client_id, {})

        # 通知房间内其他人
        await self._broadcast_to_room(
            room_id,
            {
                "type": "system",
                "action": "leave",
                "client_id": client_id,
                "message": f"用户 {client_info.get('username', client_id)} 离开了房间"
            }
        )

        # 移除成员
        if room_id in self.rooms:
            self.rooms[room_id].pop(client_id, None)

            # 如果房间为空,删除房间
            if not self.rooms[room_id]:
                del self.rooms[room_id]

        # 清理客户端信息
        self.client_rooms.pop(client_id, None)
        self.client_info.pop(client_id, None)

    async def send_to_room(self, room_id: str, message: dict):
        """
        向房间内所有人广播消息

        Args:
            room_id: 目标房间标识
            message: 要广播的消息
        """
        await self._broadcast_to_room(room_id, message)

    async def send_to_client(self, client_id: str, message: dict):
        """
        向特定客户端发送消息

        Args:
            client_id: 目标客户端标识
            message: 要发送的消息
        """
        if client_id in self.client_rooms:
            room_id = self.client_rooms[client_id]
            if room_id in self.rooms and client_id in self.rooms[room_id]:
                await self.rooms[room_id][client_id].send_json(message)

    async def _broadcast_to_room(self, room_id: str, message: dict, exclude: list = None):
        """
        广播消息到房间(排除指定客户端)

        Args:
            room_id: 房间标识
            message: 消息内容
            exclude: 要排除的客户端列表
        """
        exclude = exclude or []

        if room_id not in self.rooms:
            return

        disconnected = []
        for client_id, websocket in self.rooms[room_id].items():
            if client_id in exclude:
                continue

            try:
                await websocket.send_json(message)
            except Exception as e:
                print(f"发送消息失败 {client_id}: {e}")
                disconnected.append(client_id)

        # 清理断开的连接
        for client_id in disconnected:
            await self.leave_room(client_id)


room_manager = RoomManager()


@app.websocket("/ws/room/{room_id}/{client_id}")
async def websocket_room(websocket: WebSocket, room_id: str, client_id: str):
    """
    WebSocket 房间端点

    客户端连接时需要指定:
    - room_id: 房间标识
    - client_id: 客户端标识

    消息格式:
    - 加入: {"type": "join", "username": "..."}
    - 聊天: {"type": "chat", "content": "..."}
    - 离开: {"type": "leave"}

    Args:
        websocket: WebSocket 连接
        room_id: 房间标识
        client_id: 客户端标识
    """
    await websocket.accept()

    # 等待客户端发送加入消息
    try:
        first_message = await asyncio.wait_for(
            websocket.receive_text(),
            timeout=10
        )
        join_data = json.loads(first_message)

        if join_data.get("type") == "join":
            username = join_data.get("username", client_id)
            await room_manager.join_room(
                room_id,
                client_id,
                websocket,
                {"username": username}
            )
    except asyncio.TimeoutError:
        await websocket.close()
        return
    except Exception as e:
        print(f"加入房间失败: {e}")
        await websocket.close()
        return

    try:
        # 持续监听消息
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            msg_type = message.get("type")

            if msg_type == "chat":
                # 聊天消息,广播给房间内所有人
                client_info = room_manager.client_info.get(client_id, {})
                await room_manager.send_to_room(room_id, {
                    "type": "chat",
                    "client_id": client_id,
                    "username": client_info.get("username", client_id),
                    "content": message.get("content", "")
                })

            elif msg_type == "typing":
                # 输入状态,只通知其他人
                await room_manager._broadcast_to_room(
                    room_id,
                    {
                        "type": "typing",
                        "client_id": client_id,
                        "username": room_manager.client_info.get(client_id, {}).get("username", client_id)
                    },
                    exclude=[client_id]
                )

            else:
                # 未知消息类型
                await websocket.send_json({
                    "type": "error",
                    "message": f"未知消息类型: {msg_type}"
                })

    except WebSocketDisconnect:
        await room_manager.leave_room(client_id)
    except Exception as e:
        print(f"WebSocket 错误: {e}")
        await room_manager.leave_room(client_id)

五、gRPC 与 Protocol Buffers

5.1 什么时候选择 gRPC?

gRPC 是一个高性能、通用的 RPC 框架,使用 Protocol Buffers 作为接口定义和序列化工具。在 LLM 应用中,gRPC 主要用于:

  1. 服务间通讯:微服务之间的高效调用
  2. 流式处理:支持服务端流式、客户端流式、双向流式
  3. 强类型接口:使用 .proto 文件定义接口,自动生成代码

让我给你展示如何在 FastAPI 中集成 gRPC:

# 首先定义 proto 文件:llm_service.proto
syntax = "proto3";

package llm;

service LLMService {
    // 简单 RPC
    rpc Generate(GenerateRequest) returns (GenerateResponse);

    // 服务端流式 RPC
    rpc StreamGenerate(StreamGenerateRequest) returns (stream StreamGenerateResponse);

    // 双向流式 RPC
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message GenerateRequest {
    string prompt = 1;
    string model = 2;
    int32 max_tokens = 3;
    float temperature = 4;
}

message GenerateResponse {
    string content = 1;
    int32 tokens_used = 2;
}

message StreamGenerateRequest {
    string prompt = 1;
    string model = 2;
}

message StreamGenerateResponse {
    string chunk = 1;
    bool done = 2;
}

message ChatMessage {
    string role = 1;
    string content = 2;
}

然后使用 grpcio-tools 生成 Python 代码,并在 FastAPI 中调用:

from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor
import grpc
import llm_service_pb2
import llm_service_pb2_grpc

app = FastAPI()

# gRPC 通道池(用于复用连接)
executor = ThreadPoolExecutor(max_workers=10)


def get_llm_stub():
    """
    获取 gRPC 存根

    在生产环境中,应该使用连接池来复用通道。
    """
    channel = grpc.insecure_channel('llm-service:50051')
    return llm_service_pb2_grpc.LLMServiceStub(channel)


@app.post("/llm/generate")
async def generate(request: dict):
    """
    简单 RPC 调用示例

    客户端发送请求,服务器返回完整响应。
    类似于传统的 HTTP 请求-响应模式。

    适用场景:
    - 简单的 LLM 调用
    - 不需要流式输出的情况
    """
    stub = get_llm_stub()

    # 构建请求
    req = llm_service_pb2.GenerateRequest(
        prompt=request["prompt"],
        model=request.get("model", "gpt-3.5-turbo"),
        max_tokens=request.get("max_tokens", 1000),
        temperature=request.get("temperature", 1.0)
    )

    # 同步调用(在线程池中执行,避免阻塞)
    future = executor.submit(stub.Generate, req)
    response = future.result()

    return {
        "content": response.content,
        "tokens_used": response.tokens_used
    }


@app.post("/llm/stream-generate")
async def stream_generate(request: dict):
    """
    服务端流式 RPC 示例

    服务器流式发送响应,客户端接收流。
    类似于 SSE,但使用二进制协议效率更高。

    适用场景:
    - LLM 流式输出
    - 大数据量传输
    """
    stub = get_llm_stub()

    req = llm_service_pb2.StreamGenerateRequest(
        prompt=request["prompt"],
        model=request.get("model", "gpt-3.5-turbo")
    )

    # 流式调用
    chunks = []
    for response in stub.StreamGenerate(req):
        if response.done:
            break
        chunks.append(response.chunk)

    return {"content": "".join(chunks)}

5.2 gRPC vs SSE vs WebSocket 选择指南

需要双向通讯?

客户端是浏览器?

需要高吞吐量?

需要服务间通讯?

SSE

WebSocket

gRPC Streaming

HTTP REST

特性 SSE WebSocket gRPC Streaming
客户端支持 浏览器原生 所有平台 需要 gRPC 库
协议复杂度
性能 最好
双向通讯
二进制支持
类型安全
LLM 流式 最佳选择 可用 可用

六、LLM 流式输出的协议选择实战

6.1 场景分析与协议选型

让我用几个真实的业务场景来演示如何选择协议:

场景一:Web 端智能对话助手

后端服务

Web 浏览器

HTTP SSE

流式

React/Vue 页面

EventSource

FastAPI

LLM API

这是最常见的场景,推荐使用 SSE。原因:

  • 浏览器原生支持 EventSource
  • OpenAI API 本身就是 SSE 格式
  • 实现简单,维护成本低

场景二:移动端实时对话应用

后端服务

移动端

WebSocket

流式

iOS/Android App

FastAPI

WebSocket Handler

LLM API

移动端推荐 WebSocket。原因:

  • App 不像浏览器有 EventSource
  • WebSocket 有完善的 SDK 支持
  • 需要双向通讯(发送消息、接收流式响应)

场景三:多人在线协作 AI 助手

协作服务

多个客户端

WebSocket

WebSocket

WebSocket

流式

用户 A

用户 B

用户 C

房间管理

LLM API

多人协作场景推荐 WebSocket + 房间管理。原因:

  • 需要实时广播(一个人说的话其他人要能看到)
  • 需要低延迟的双向通讯

6.2 混合协议架构设计

在实际项目中,我们往往需要同时支持多种协议。下面是一个完整的混合协议网关设计:

from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from typing import Dict, Any, Optional
import asyncio
import json
from enum import Enum

app = FastAPI()


class ProtocolType(str, Enum):
    """
    支持的协议类型枚举

    用于标识客户端连接的通讯协议类型。
    """
    SSE = "sse"
    WEBSOCKET = "websocket"
    HTTP_STREAMING = "http_streaming"


class UnifiedMessageFormat:
    """
    统一消息格式

    不同协议的内部消息格式可能不同,
    这个类用于统一格式,便于核心逻辑处理。

    消息格式:
    {
        "type": "content|error|done|ping|pong",
        "data": {...},
        "metadata": {...}
    }
    """

    @staticmethod
    def content(data: str, metadata: Dict = None) -> Dict[str, Any]:
        """构造内容消息"""
        return {
            "type": "content",
            "data": {"content": data},
            "metadata": metadata or {}
        }

    @staticmethod
    def done() -> Dict[str, Any]:
        """构造完成消息"""
        return {"type": "done", "data": {}}

    @staticmethod
    def error(message: str) -> Dict[str, Any]:
        """构造错误消息"""
        return {"type": "error", "data": {"message": message}}

    @staticmethod
    def ping() -> Dict[str, Any]:
        """构造心跳请求"""
        return {"type": "ping", "data": {}}

    @staticmethod
    def pong() -> Dict[str, Any]:
        """构造心跳响应"""
        return {"type": "pong", "data": {}}


class ProtocolAdapter:
    """
    协议适配器

    这个类是混合协议架构的核心。
    它将不同协议的通讯细节抽象化,
    核心业务逻辑只需要处理统一的消息格式。

    设计模式:适配器模式 + 策略模式

    Attributes:
        protocol: 当前使用的协议类型
    """

    def __init__(self, protocol: ProtocolType):
        self.protocol = protocol
        self.queue: Optional[asyncio.Queue] = None
        self.websocket: Optional[WebSocket] = None

    async def initialize_for_sse(self) -> asyncio.Queue:
        """
        初始化 SSE 适配器

        SSE 使用 Queue 作为消息队列,
        生成器从 Queue 中读取消息并发送。

        Returns:
            消息队列
        """
        self.queue = asyncio.Queue()
        return self.queue

    async def initialize_for_websocket(self, websocket: WebSocket) -> 'ProtocolAdapter':
        """
        初始化 WebSocket 适配器

        WebSocket 直接使用 WebSocket 连接发送消息。

        Args:
            websocket: WebSocket 连接

        Returns:
            适配器实例
        """
        await websocket.accept()
        self.websocket = websocket
        return self

    async def send(self, message: Dict[str, Any]):
        """
        发送消息

        根据协议类型选择不同的发送方式。

        Args:
            message: 统一格式的消息
        """
        if self.protocol == ProtocolType.SSE:
            # SSE: 发送到队列
            if self.queue:
                await self.queue.put(json.dumps(message))

        elif self.protocol == ProtocolType.WEBSOCKET:
            # WebSocket: 直接发送
            if self.websocket:
                await self.websocket.send_json(message)

        elif self.protocol == ProtocolType.HTTP_STREAMING:
            # HTTP Streaming: 同样使用队列
            if self.queue:
                await self.queue.put(json.dumps(message))

    async def receive(self) -> Optional[str]:
        """
        接收消息(仅 WebSocket 支持)

        Returns:
            收到的消息内容
        """
        if self.protocol == ProtocolType.WEBSOCKET and self.websocket:
            try:
                return await asyncio.wait_for(
                    self.websocket.receive_text(),
                    timeout=30
                )
            except asyncio.TimeoutError:
                return None
        return None


class LLMGateway:
    """
    LLM 统一网关

    这个类是整个混合协议架构的核心。
    它封装了 LLM 调用的逻辑,并提供统一的接口
    供不同协议的端点使用。

    设计理念:
    1. 核心逻辑与协议解耦
    2. 支持多种协议接入
    3. 统一的错误处理和日志
    """

    def __init__(self):
        # 活跃的适配器
        self.adapters: Dict[str, ProtocolAdapter] = {}

    async def register_adapter(self, client_id: str, adapter: ProtocolAdapter):
        """注册适配器"""
        self.adapters[client_id] = adapter

    async def unregister_adapter(self, client_id: str):
        """注销适配器"""
        self.adapters.pop(client_id, None)

    async def send_to_client(self, client_id: str, message: Dict[str, Any]):
        """向客户端发送消息"""
        if client_id in self.adapters:
            await self.adapters[client_id].send(message)

    async def process_and_stream(
        self,
        client_id: str,
        prompt: str,
        protocol: ProtocolType
    ):
        """
        处理 LLM 请求并流式返回结果

        这是核心的处理方法。
        它调用 LLM,将返回的 token 逐个推送给客户端。

        Args:
            client_id: 客户端标识
            prompt: 用户输入
            protocol: 通讯协议类型
        """
        # 模拟 LLM 流式输出
        words = ["你好", ",", "我是", "AI", "助手", "。"]

        for i, word in enumerate(words):
            # 发送内容
            await self.send_to_client(
                client_id,
                UnifiedMessageFormat.content(word, {"index": i})
            )

            # 模拟生成延迟
            await asyncio.sleep(0.1)

        # 发送完成信号
        await self.send_to_client(client_id, UnifiedMessageFormat.done())


# 创建全局网关实例
llm_gateway = LLMGateway()


# ============ SSE 端点 ============

@app.get("/llm/sse/{client_id}")
async def llm_sse_endpoint(client_id: str):
    """
    SSE 协议的 LLM 端点

    适用于浏览器端的流式对话。
    """
    # 创建 SSE 适配器
    adapter = ProtocolAdapter(ProtocolType.SSE)
    queue = await adapter.initialize_for_sse()

    # 注册到网关
    await llm_gateway.register_adapter(client_id, adapter)

    async def generate():
        try:
            while True:
                # 从队列获取消息
                message = await queue.get()
                if message is None:
                    break
                yield f"data: {message}\n\n"
        finally:
            await llm_gateway.unregister_adapter(client_id)

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )


# ============ WebSocket 端点 ============

@app.websocket("/llm/ws/{client_id}")
async def llm_websocket_endpoint(websocket: WebSocket, client_id: str):
    """
    WebSocket 协议的 LLM 端点

    适用于移动端或需要双向通讯的场景。
    """
    # 创建 WebSocket 适配器
    adapter = ProtocolAdapter(ProtocolType.WEBSOCKET)
    await adapter.initialize_for_websocket(websocket)

    # 注册到网关
    await llm_gateway.register_adapter(client_id, adapter)

    try:
        # 等待客户端发送消息
        while True:
            data = await adapter.receive()
            if data is None:
                break

            message = json.loads(data)

            if message.get("type") == "prompt":
                # 收到用户输入,处理并流式返回
                prompt = message.get("content", "")
                await llm_gateway.process_and_stream(client_id, prompt, ProtocolType.WEBSOCKET)

    except WebSocketDisconnect:
        pass
    finally:
        await llm_gateway.unregister_adapter(client_id)

七、更多通讯方案:WebRTC、WebTransport 与消息队列

7.1 WebRTC:极致低延迟的音视频交互

如果你想实现类似 OpenAI Advanced Voice Mode 那样的实时语音对话,WebRTC 是目前的行业天花板。

WebRTC(Web Real-Time Communication)是真正的 P2P(点对点) 通信,底层基于 UDP 而非 TCP,天然抗丢包,延迟极低。

后端服务

客户端

UDP P2P

WebRTC

P2P 连接

Data Channel

麦克风

浏览器

VAD\n静音检测

ASR\n语音识别

LLM

TTS\n语音合成

WebRTC 的核心优势

  1. 基于 UDP:没有 TCP 的三次握手和重传机制,延迟最低
  2. Data Channel:除了音视频,还支持传输任意二进制或文本数据
  3. NAT 穿透:内置 ICE/STUN/TURN 协议,自动穿透 NAT
# WebRTC 在 AI 中的典型应用架构
# 简化示例,展示核心概念

class WebRTCAudioHandler:
    """
    WebRTC 音频处理器

    处理前端的 PCM 音频流:
    1. VAD(语音活动检测)
    2. 发送给 ASR 服务
    3. 接收 LLM 响应
    4. 推送 TTS 音频流
    """

    def __init__(self):
        self.vad_enabled = True
        self.sample_rate = 16000

    async def handle_audio_stream(self, audio_chunk: bytes):
        """
        处理音频数据块

        Args:
            audio_chunk: PCM 音频数据
        """
        # 步骤 1: VAD 检测
        if self.vad_enabled and self._is_silence(audio_chunk):
            return None

        # 步骤 2: 发送到 ASR 服务
        text = await self._recognize(audio_chunk)

        # 步骤 3: 调用 LLM
        response = await self._generate(text)

        # 步骤 4: TTS 合成并返回
        audio = await self._synthesize(response)
        return audio

    def _is_silence(self, audio_chunk: bytes) -> bool:
        """简单的静音检测"""
        import struct
        # 计算 RMS
        samples = struct.unpack('<' + 'h' * (len(audio_chunk) // 2), audio_chunk)
        rms = sum(s * s for s in samples) / len(samples) ** 0.5
        return rms < 500  # 阈值

    async def _recognize(self, audio: bytes) -> str:
        """语音识别"""
        # 实际项目应调用 ASR 服务
        return "用户说的什么"

    async def _generate(self, text: str) -> str:
        """LLM 生成"""
        # 实际项目应调用 LLM
        return "LLM 的回复"

    async def _synthesize(self, text: str) -> bytes:
        """语音合成"""
        # 实际项目应调用 TTS 服务
        return b"audio_data"

什么时候选择 WebRTC

  • 需要实时语音/视频对话
  • 需要毫秒级的低延迟
  • 需要支持"打断"交互
  • 对音质要求高

7.2 WebTransport:面向未来的多模态传输

WebTransport 是被广泛认为是 WebSocket 的未来替代品,基于 HTTP/3 (QUIC 协议)。

WebTransport 方案

QUIC

流1\n(音频)

流2\n(文本)

流3\n(UI)

WebSocket 问题

丢失

阻塞

阻塞

TCP

包1

包2

包3

WebTransport 的核心优势

  1. 解决队头阻塞:TCP 如果丢包,所有数据都要等;QUIC 的流相互独立
  2. 基于 UDP:比 TCP 更低的延迟
  3. 多流复用:一个连接可以同时传输多个独立的数据流
# WebTransport 示例代码结构
# 注意:WebTransport 目前需要浏览器实验性支持

class WebTransportServer:
    """
    WebTransport 服务器

    适用于需要同时传输多种数据类型的场景:
    - 音频流
    - 文本日志
    - UI 更新指令
    """

    def __init__(self):
        self.streams = {}  # stream_id -> handler

    async def handle_connection(self, transport):
        """
        处理 WebTransport 连接

        WebTransport 的核心是流(Stream):
        - 每个流是独立的
        - 流可以并行传输,互不干扰
        - 支持双向流和单向流
        """
        # 接收客户端的流
        async for stream in transport.incoming_streams:
            stream_id = stream.stream_id
            self.streams[stream_id] = stream

            # 处理流
            asyncio.create_task(self._handle_stream(stream))

    async def _handle_stream(self, stream):
        """处理单个流"""
        # 读取流数据
        async for data in stream:
            # 根据数据类型处理
            await self._process_data(data, stream)

    async def _process_data(self, data, stream):
        """根据流类型处理"""
        # 可以根据 stream.stream_id 判断数据类型
        pass

什么时候选择 WebTransport

  • 需要传输多种类型的数据(音频+文本+UI)
  • 网络环境不稳定
  • 需要更高的吞吐量
  • 面向未来(目前浏览器支持度有限)

7.3 消息队列:后端服务的解耦利器

虽然不是前端直连的协议,但消息队列在复杂 Agent 架构中不可或缺。

后台任务

消息队列

API 层

客户端

HTTP/WS

发布任务

订阅

订阅

回调

推送

前端

API 网关

Redis PubSub\n/ Kafka\n/ RabbitMQ

任务执行器1

任务执行器2

典型的异步任务场景

  1. 用户触发一个耗时 10 分钟的代码审查任务
  2. 网关立即返回 Task ID,后端将任务扔进消息队列
  3. Worker 慢慢执行,完成后发布结果
  4. 网关监听到结果,通过 WebSocket 推送给前端
import asyncio
import json
from typing import Dict, Any, Optional
import redis.asyncio as redis


class AsyncTaskQueue:
    """
    异步任务队列

    使用 Redis PubSub 实现任务队列。
    适用于耗时任务的异步处理。

    架构:
    1. 前端发起请求,获得 Task ID
    2. 网关将任务放入队列,立即返回
    3. 后台 Worker 取出任务执行
    4. 执行完成后发布结果
    5. 网关收到结果,推送给前端
    """

    def __init__(self, redis_url: str = "redis://localhost"):
        """
        初始化任务队列

        Args:
            redis_url: Redis 连接地址
        """
        self.redis = redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
        self.results_channel = "task_results"
        self.task_prefix = "task:"

    async def submit_task(
        self,
        task_id: str,
        task_type: str,
        payload: Dict[str, Any]
    ) -> str:
        """
        提交任务

        Args:
            task_id: 任务唯一 ID
            task_type: 任务类型
            payload: 任务参数

        Returns:
            任务 ID
        """
        # 存储任务信息
        task_data = {
            "id": task_id,
            "type": task_type,
            "payload": payload,
            "status": "pending"
        }

        await self.redis.set(
            f"{self.task_prefix}{task_id}",
            json.dumps(task_data),
            ex=3600  # 1 小时过期
        )

        # 发布任务到队列
        await self.redis.publish(
            f"task_queue:{task_type}",
            json.dumps(task_data)
        )

        return task_id

    async def get_result(self, task_id: str) -> Optional[Dict[str, Any]]:
        """
        获取任务结果

        Args:
            task_id: 任务 ID

        Returns:
            任务结果,如果没有则返回 None
        """
        result_key = f"{self.task_prefix}{task_id}:result"
        result = await self.redis.get(result_key)

        if result:
            return json.loads(result)

        return None

    async def wait_for_result(self, task_id: str, timeout: int = 300) -> Dict[str, Any]:
        """
        等待任务结果

        使用 Redis PubSub 等待结果。

        Args:
            task_id: 任务 ID
            timeout: 超时时间(秒)

        Returns:
            任务结果

        Raises:
            TimeoutError: 等待超时
        """
        channel = f"{self.results_channel}:{task_id}"

        async with self.pubsub() as pubsub:
            await pubsub.subscribe(channel)

            try:
                async for message in pubsub.listen():
                    if message["type"] == "message":
                        return json.loads(message["data"])
            except asyncio.TimeoutError:
                raise TimeoutError(f"等待任务 {task_id} 结果超时")

    async def publish_result(self, task_id: str, result: Dict[str, Any]):
        """
        发布任务结果

        Args:
            task_id: 任务 ID
            result: 任务结果
        """
        # 存储结果
        result_key = f"{self.task_prefix}{task_id}:result"
        await self.redis.set(
            result_key,
            json.dumps(result),
            ex=3600
        )

        # 发布结果通知
        channel = f"{self.results_channel}:{task_id}"
        await self.redis.publish(
            channel,
            json.dumps(result)
        )


# 使用示例
async def example():
    queue = AsyncTaskQueue()

    # 1. 提交任务
    task_id = await queue.submit_task(
        task_id="task_123",
        task_type="code_review",
        payload={"code": "def hello(): pass"}
    )

    print(f"任务已提交: {task_id}")

    # 2. 后台 Worker 执行任务(伪代码)
    # result = await do_code_review(payload)
    # await queue.publish_result(task_id, result)

    # 3. 前端等待结果
    # result = await queue.wait_for_result(task_id)

7.4 技术选型全景图

技术方案

选型维度

单向低延迟

双向低延迟

极致低延迟

后端间

文本流

通用数据

音视频

多模态

前端-后端

前端-后端实时

P2P

后端-后端

异步任务

延迟要求

数据类型

通信节点

SSE

WebSocket

WebRTC

gRPC

WebTransport

消息队列

快速选型指南

场景 推荐技术 特点
标准 AI 打字机 SSE 简单、单向、基于 HTTP
高频交互/UI同步 WebSocket 全双工、低延迟
内部 Agent 微服务 gRPC 强类型、极速二进制
纯语音/视频对话 WebRTC 基于 UDP、极致低延迟
多模态海量并发 WebTransport 解决队头阻塞
耗时异步任务 消息队列 解耦、可靠

八、FastAPI + LangGraph 的优雅结合

8.1 LangGraph 1.2 的新特性和架构

LangChain 在 2025 年已经正式发布了 1.0 稳定版,并快速迭代到 1.2.x。LangGraph 作为 LangChain 的核心编排框架,已经全面确立了以 有向图(DAG) 为核心的运行机制。

LangGraph 1.2 架构

运行时

图定义

存储

SQLite

PostgreSQL

Memory

节点\n(Node)

Pregel 算法

边\n(Edge)

状态\n(State)

检查点\n(Checkpoint)

中断\n(Interrupt)

LangGraph 1.2 的核心特性

  1. 强制 TypedDict 状态管理:清晰的状态定义和类型安全
  2. 成熟的 Checkpoint 机制:支持中断恢复,状态持久化
  3. Interrupt 支持:可以在任意节点暂停,等待外部输入

8.2 SSE 与 WebSocket 在 LangGraph 中的应用分化

在 FastAPI + LangGraph 的架构下,SSE 和 WebSocket 的应用分化非常清晰:

WebSocket (全双工)

前端 WebSocket

WebSocket Endpoint

状态检查点

SSE (单向)

前端 EventSource

StreamingResponse

astream_events

SSE 场景

  • 标准的 AI 对话
  • 流式文本输出
  • 简单的心跳检测

WebSocket 场景

  • 实时打断(Interrupt)
  • 复杂的多轮交互
  • 人机协作(Human-in-the-loop)

8.3 实现优雅的指令打断与状态恢复

这是最关键的实战部分。让我展示如何结合 FastAPI 的 WebSocket 和 LangGraph 的 Checkpoint 机制,实现优雅的"打断"功能:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Any, Optional
import asyncio
import json


class LangGraphWebSocketAgent:
    """
    LangGraph + WebSocket Agent

    这个类实现了基于 WebSocket 的 LangGraph Agent。
    核心能力是支持随时打断和状态恢复。

    工作流程:
    1. 客户端通过 WebSocket 连接
    2. 发送消息,触发 LangGraph 执行
    3. LangGraph 执行过程中,客户端可以随时发送"打断"指令
    4. 服务端检测到打断,使用 Checkpoint 保存当前状态
    5. 客户端可以发送新指令,从 Checkpoint 恢复继续执行
    """

    def __init__(self, compiled_graph):
        """
        初始化 Agent

        Args:
            compiled_graph: 编译后的 LangGraph
        """
        self.graph = compiled_graph
        self.active_sessions: Dict[str, Dict[str, Any]] = {}

    async def handle_connection(self, websocket: WebSocket, session_id: str):
        """
        处理 WebSocket 连接

        这个方法处理完整的 WebSocket 生命周期:
        1. 接受连接
        2. 监听消息
        3. 处理打断
        4. 断开清理
        """
        await websocket.accept()
        print(f"WebSocket 连接建立: {session_id}")

        # 初始化会话状态
        self.active_sessions[session_id] = {
            "websocket": websocket,
            "current_task": None,
            "is_interrupted": False
        }

        try:
            # 持续监听客户端消息
            while True:
                data = await websocket.receive_text()
                message = json.loads(data)

                msg_type = message.get("type")

                if msg_type == "message":
                    # 处理用户消息
                    await self._handle_message(session_id, message)

                elif msg_type == "interrupt":
                    # 处理打断指令
                    await self._handle_interrupt(session_id)

                elif msg_type == "resume":
                    # 从打断状态恢复
                    await self._handle_resume(session_id, message)

                else:
                    await websocket.send_json({
                        "type": "error",
                        "message": f"Unknown message type: {msg_type}"
                    })

        except WebSocketDisconnect:
            print(f"WebSocket 连接断开: {session_id}")
            await self._cleanup_session(session_id)

    async def _handle_message(self, session_id: str, message: Dict):
        """处理用户消息"""
        session = self.active_sessions[session_id]
        websocket = session["websocket"]

        # 如果正在执行,先打断
        if session.get("current_task"):
            await self._handle_interrupt(session_id)

        # 获取用户输入
        user_input = message.get("content", "")

        # 构建初始状态
        initial_state = {
            "messages": [{"role": "user", "content": user_input}],
            "session_id": session_id
        }

        # 配置(包括 thread_id 用于 Checkpoint)
        config = {
            "configurable": {
                "thread_id": session_id
            }
        }

        # 创建异步任务执行 graph
        task = asyncio.create_task(
            self._run_graph(websocket, initial_state, config)
        )

        session["current_task"] = task
        session["is_interrupted"] = False

    async def _run_graph(
        self,
        websocket: WebSocket,
        initial_state: Dict,
        config: Dict
    ):
        """
        运行 LangGraph

        使用 stream 模式,逐步推送状态到客户端
        """
        try:
            # 使用 stream 模式,逐步获取每个节点的输出
            async for chunk in self.graph.astream(initial_state, config):
                # 检查是否被中断
                session = None
                for s in self.active_sessions.values():
                    if s["websocket"] == websocket:
                        session = s
                        break

                if session and session.get("is_interrupted"):
                    # 被中断了,停止执行
                    print(f"任务被中断: {config['configurable']['thread_id']}")
                    break

                # 推送结果到客户端
                await websocket.send_json({
                    "type": "chunk",
                    "data": chunk
                })

            # 执行完成
            await websocket.send_json({
                "type": "done"
            })

        except Exception as e:
            # 发送错误
            await websocket.send_json({
                "type": "error",
                "message": str(e)
            })

        finally:
            # 清理任务状态
            for s in self.active_sessions.values():
                if s["websocket"] == websocket:
                    s["current_task"] = None

    async def _handle_interrupt(self, session_id: str):
        """
        处理打断指令

        核心逻辑:
        1. 取消当前正在执行的任务
        2. 标记为中断状态
        3. 当前状态已经被 Checkpoint 保存
        """
        session = self.active_sessions[session_id]

        # 取消当前任务
        if session.get("current_task"):
            task = session["current_task"]
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

        # 标记中断状态
        session["is_interrupted"] = True
        session["current_task"] = None

        # 通知客户端
        session["websocket"].send_json({
            "type": "interrupted",
            "message": "当前任务已被中断,可以发送新指令或恢复执行"
        })

        print(f"会话 {session_id} 被中断,状态已保存到 Checkpoint")

    async def _handle_resume(self, session_id: str, message: Dict):
        """
        从中断状态恢复

        核心逻辑:
        1. 从 Checkpoint 获取之前的状态
        2. 将新的用户输入追加到状态
        3. 从保存的 Checkpoint 继续执行
        """
        session = self.active_sessions[session_id]
        websocket = session["websocket"]

        # 获取新输入
        new_input = message.get("content", "")

        # 配置
        config = {
            "configurable": {
                "thread_id": session_id
            }
        }

        # 从 Checkpoint 恢复并继续执行
        # 注意:LangGraph 的 resume 方法
        # 实际上是再次调用 invoke,但会从上一个 Checkpoint 恢复
        try:
            # 获取之前保存的状态
            previous_state = await self.graph.get_state(config)

            if previous_state is None:
                await websocket.send_json({
                    "type": "error",
                    "message": "未找到保存的状态"
                })
                return

            # 将新输入追加到消息历史
            previous_state["messages"].append({
                "role": "user",
                "content": new_input
            })

            # 继续执行
            session["is_interrupted"] = False
            task = asyncio.create_task(
                self._run_graph(websocket, previous_state, config)
            )
            session["current_task"] = task

        except Exception as e:
            await websocket.send_json({
                "type": "error",
                "message": f"恢复失败: {str(e)}"
            })

    async def _cleanup_session(self, session_id: str):
        """清理会话"""
        if session_id in self.active_sessions:
            del self.active_sessions[session_id]


# ============ FastAPI 应用 ============

app = FastAPI(title="LangGraph WebSocket Agent API")

# 假设已经创建了编译后的 LangGraph
# from your_langgraph_module import compiled_graph
# agent = LangGraphWebSocketAgent(compiled_graph)


@app.websocket("/ws/agent/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
    """
    Agent WebSocket 端点

    客户端连接后可以:
    - 发送 message 类型消息触发执行
    - 发送 interrupt 类型消息打断执行
    - 发送 resume 类型消息从中断恢复
    """
    # await agent.handle_connection(websocket, session_id)
    pass

8.4 FastAPI 依赖注入与 WebSocket 的结合

FastAPI 的依赖注入系统非常强大,但我们需要一些技巧让它与 WebSocket 优雅结合:

from fastapi import FastAPI, WebSocket, Depends, HTTPException
from typing import Optional
import json


class WebSocketManager:
    """
    WebSocket 连接管理器

    配合 FastAPI 的依赖注入系统使用。
    """

    def __init__(self):
        self.connections = {}

    async def connect(self, session_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[session_id] = websocket

    def disconnect(self, session_id: str):
        self.connections.pop(session_id, None)

    def get_connection(self, session_id: str) -> Optional[WebSocket]:
        return self.connections.get(session_id)


# 全局管理器
ws_manager = WebSocketManager()


# 使用依赖注入获取 WebSocket
async def get_websocket(session_id: str) -> WebSocket:
    """
    获取 WebSocket 连接的依赖函数

    用法:
    @app.websocket("/ws/{session_id}")
    async def endpoint(ws = Depends(get_websocket)):
        ...
    """
    ws = ws_manager.get_connection(session_id)
    if not ws:
        # 这里不能抛异常,因为 WebSocket 在连接阶段
        # 应该在连接处理函数中先调用 manager.connect
        pass
    return ws


# 带认证的 WebSocket 依赖
async def get_authenticated_websocket(
    websocket: WebSocket,
    token: str = None
) -> WebSocket:
    """
    带认证的 WebSocket 依赖

    可以在这里验证 JWT token 或其他认证信息

    Args:
        websocket: WebSocket 连接
        token: 可选的 token 参数

    Returns:
        验证后的 WebSocket 连接

    Raises:
        HTTPException: 认证失败
    """
    # 这里可以添加认证逻辑
    # if not token or not verify_token(token):
    #     raise HTTPException(status_code=401, detail="Unauthorized")

    return websocket


# 使用示例
@app.websocket("/ws/chat/{session_id}")
async def chat_endpoint(
    websocket: WebSocket,
    session_id: str,
    # 可以注入多个依赖
    auth_ws = Depends(get_authenticated_websocket)
):
    """聊天端点示例"""
    # 连接
    await ws_manager.connect(session_id, websocket)

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            # 处理消息
            response = {
                "type": "response",
                "content": f"收到: {message.get('content', '')}"
            }
            await websocket.send_json(response)

    finally:
        ws_manager.disconnect(session_id)

九、性能优化与生产实践

9.1 连接池与资源管理

在高并发场景下,连接管理至关重要:

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio

# 全局资源
llm_connection_pool = None
redis_pool = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理

    FastAPI 的 lifespan 上下文管理器用于:
    1. 启动时初始化资源(连接池、缓存等)
    2. 关闭时清理资源

    这确保了优雅启动和优雅关闭。
    """
    global llm_connection_pool, redis_pool

    # 启动阶段:初始化资源
    print("正在初始化连接池...")
    # 初始化 LLM 连接池
    llm_connection_pool = asyncio.Semaphore(100)  # 最多 100 个并发连接
    # 初始化 Redis 连接池
    # redis_pool = await aioredis.create_redis_pool(...)
    print("连接池初始化完成")

    yield  # 应用运行中

    # 关闭阶段:清理资源
    print("正在清理资源...")
    # 关闭 Redis
    # await redis_pool.close()
    print("资源清理完成")


app = FastAPI(lifespan=lifespan)


@app.post("/llm/chat")
async def chat(request: dict):
    """
    带限流的 LLM 端点

    使用信号量限制并发数,保护后端 LLM 服务。
    """
    # 获取连接槽位(如果已满会等待)
    async with llm_connection_pool:
        # 调用 LLM 处理请求
        # result = await llm_client.agenerate(...)
        await asyncio.sleep(1)  # 模拟

        return {"status": "ok", "content": "处理完成"}

7.2 背压控制

当 LLM 生成速度大于客户端消费速度时,我们需要背压控制:

import asyncio
from collections import deque

class BackpressureQueue:
    """
    带背压控制的异步队列

    背压(Backpressure)是指当消费者处理速度跟不上时,
    生产者需要减慢速度,以免内存溢出或丢失数据。

    策略:
    1. 队列满时暂停生产
    2. 超时断开慢客户端
    3. 优先级队列区分重要客户
    """

    def __init__(self, max_size: int = 1000, timeout: float = 30.0):
        self.queue = deque(maxlen=max_size)
        self.max_size = max_size
        self.timeout = timeout
        self._waiting_producers = asyncio.Queue()
        self._paused = False

    async def put(self, item):
        """
        添加元素,如果队列满则等待

        Args:
            item: 要添加的元素

        Raises:
            asyncio.TimeoutError: 等待超时
        """
        # 如果队列已满,等待消费者消费
        while len(self.queue) >= self.max_size:
            # 暂停生产者
            self._paused = True

            # 等待队列有空位
            try:
                await asyncio.wait_for(
                    self._waiting_producers.get(),
                    timeout=self.timeout
                )
            except asyncio.TimeoutError:
                # 超时,抛出异常让调用者处理
                raise

        # 添加元素
        self.queue.append(item)

        # 如果之前暂停了,恢复
        if self._paused and len(self.queue) < self.max_size * 0.5:
            self._paused = False
            # 通知等待的生产者
            try:
                self._waiting_producers.put_nowait(True)
            except asyncio.QueueFull:
                pass

    async def get(self):
        """获取元素"""
        item = self.queue.popleft()

        # 如果有等待的生产者,通知它
        if self._paused and not self._waiting_producers.empty():
            try:
                self._waiting_producers.put_nowait(True)
            except asyncio.QueueFull:
                pass

        return item

7.3 Nginx 生产配置

最后,不要忘记 Nginx 的配置优化:

# /etc/nginx/conf.d/llm-api.conf

upstream llm_backend {
    server 127.0.0.1:8000;
    # 多实例部署
    server 127.0.0.1:8001;
    keepalive 32;  # 保持连接复用
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/key.pem;

    # SSE 端点配置
    location /llm/sse/ {
        # 禁用代理缓冲,确保实时推送
        proxy_buffering off;

        # 正确的 MIME 类型
        proxy_cache off;

        # 超时设置(SSE 是长连接)
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;

        # 保持连接
        proxy_http_version 1.1;
        proxy_set_header Connection "";

        # 传递真实 IP
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        proxy_pass http://llm_backend;
    }

    # WebSocket 端点配置
    location /llm/ws/ {
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;

        proxy_set_header X-Real-IP $remote_addr;

        proxy_pass http://llm_backend;
    }

    # 普通 HTTP 端点
    location / {
        proxy_pass http://llm_backend;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
    }
}

9. 全双工通信与 AI 实时交互:打破"一问一答"的边界

9.1 通讯模式的本质演进

在深入技术实现之前,我们需要理解通讯领域的三种基本模式,这对于设计 LLM 应用至关重要:

全双工 Full-Duplex

同时双向

A

B

半双工 Half-Duplex

交替传输

A

B

单工 Simplex

单向传输

发送方

接收方

模式 特性 生活例子 软件映射
单工 (Simplex) 数据只能单向传输 收音机、电视接收 传统文件下载
半双工 (Half-Duplex) 双向传输但不能同时 对讲机(按说话、松听) HTTP 请求-响应
全双工 (Full-Duplex) 同时双向传输 电话、微信语音 WebSocket、WebRTC

关键洞察:传统的 HTTP 1.0/1.1 本质上是"问答模式"——客户端发起请求,服务器响应,然后连接关闭。这类似于半双工,必须等待一方说完,另一方才能开口。

WebSocket 协议则建立了真正的全双工通道——连接建立后,双方可以随时向对方发送数据,无需等待对方"说完"。

9.2 AI 实时语音交互:全双工的终极应用

你可能已经体验过 OpenAI 的 Advanced Voice Mode——可以随时打断 AI 说话,不需要等它说完才能开口。这种"丝滑的打断体验"正是全双工通信在 AI 领域的典型应用。

TTS语音合成 大语言模型 VAD检测 用户 TTS语音合成 大语言模型 VAD检测 用户 全双工通道建立 (WebRTC) 阶段一:AI 正在说话(下行数据流) 阶段二:用户打断(上行数据流) 生成语音片段 播放语音 用户开始说话 检测到人类语音,打断信号 立即停止当前生成 保留已播放内容 生成新回复
为什么"打断"必须是全双工?

半双工的尴尬:回想早期的智能音箱,当它在播报内容时,你说话它是听不见的——因为信道被"占用",你必须等它说完。这就是半双工的局限。

全双工的优势:在 WebRTC 建立的连接中,上行数据流(麦克风)和下行数据流(扬声器)是独立的通道。服务器可以同时:

  • 向下发送 AI 生成的语音(占用下行带宽)
  • 向上接收用户的语音(占用上行带宽)

9.3 打断背后的技术架构

实现可打断的 AI 实时对话,需要多层技术的协同:

Agent 编排层

处理层

传输层

WebRTC\n低延迟音视频

WebSocket\n信令传输

VAD\n语音活动检测

ASR\n语音转文字

任务取消机制

Checkpoint\n状态保存

恢复执行

核心技术点:
  1. WebRTC 原生全双工

    • 底层使用 RTP/RTCP 协议族
    • 支持 ICE Candidate 穿透 NAT
    • 实现回声消除(AEC)、噪声抑制(ANS)
  2. VAD 语音活动检测

    • 毫秒级判断输入是噪音还是人声
    • 常用算法:WebRTC VAD、Silero VAD
    • 关键参数:灵敏度、帧长、检测阈值
  3. 任务取消与图流转

    • 一旦检测到打断,立即发送 Abort Signal
    • 停止当前的文本生成/TTS 节点
    • 保存已输出的内容到记忆上下文
    • 将用户最新输入作为新 Prompt 触发推理

9.4 实战:FastAPI + WebSocket 实现打断功能

以下是结合 FastAPI WebSocket 实现优雅打断的完整示例:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Any, Optional, Set
import asyncio
import json
from enum import Enum


class MessageType(str, Enum):
    """消息类型枚举"""
    CHAT = "chat"           # 普通对话
    INTERRUPT = "interrupt" # 打断指令
    RESUME = "resume"       # 恢复执行
    HEARTBEAT = "heartbeat" # 心跳


class InterruptibleAgent:
    """
    支持打断的 Agent
    
    核心设计:
    1. 使用 asyncio.Event 控制执行流程
    2. 支持随时设置中断标志
    3. Checkpoint 保存中间状态
    """
    
    def __init__(self):
        # 存储所有活跃会话
        self.sessions: Dict[str, "AgentSession"] = {}
    
    async def handle_connection(self, websocket: WebSocket, session_id: str):
        """处理 WebSocket 连接"""
        await websocket.accept()
        
        # 初始化会话状态
        session = AgentSession(session_id, websocket)
        self.sessions[session_id] = session
        
        try:
            # 持续监听消息
            while True:
                data = await websocket.receive_text()
                message = json.loads(data)
                await self._process_message(session, message)
                
        except WebSocketDisconnect:
            self.sessions.pop(session_id, None)
    
    async def _process_message(self, session: "AgentSession", message: Dict):
        """根据消息类型处理"""
        msg_type = message.get("type")
        
        if msg_type == MessageType.CHAT:
            await session.execute_with_interrupt(message.get("content"))
            
        elif msg_type == MessageType.INTERRUPT:
            await session.interrupt()
            
        elif msg_type == MessageType.RESUME:
            await session.resume()
            
        elif msg_type == MessageType.HEARTBEAT:
            await session.send({"type": "heartbeat_ack"})


class AgentSession:
    """
    单个 Agent 会话
    
    管理一个完整的对话生命周期:
    1. 接收用户输入
    2. 执行 Agent 逻辑
    3. 处理打断请求
    4. 管理 Checkpoint
    """
    
    def __init__(self, session_id: str, websocket: WebSocket):
        self.session_id = session_id
        self.websocket = websocket
        
        # 中断控制
        self._interrupt_event = asyncio.Event()
        self._current_task: Optional[asyncio.Task] = None
        
        # Checkpoint 状态
        self.checkpoint_data: Dict[str, Any] = {}
        self.conversation_history: list = []
    
    async def execute_with_interrupt(self, user_input: str):
        """
        执行对话(支持打断)
        
        这个方法展示了如何在执行过程中优雅地处理打断:
        1. 创建异步任务执行 LLM 调用
        2. 持续监听中断事件
        3. 定期保存 Checkpoint
        """
        # 取消之前的任务(如果有)
        if self._current_task and not self._current_task.done():
            self._current_task.cancel()
            try:
                await self._current_task
            except asyncio.CancelledError:
                pass
        
        # 重置中断标志
        self._interrupt_event.clear()
        
        # 创建新任务
        self._current_task = asyncio.create_task(
            self._stream_response(user_input)
        )
    
    async def _stream_response(self, user_input: str):
        """
        流式响应(可被打断)
        
        关键点:
        - 每次发送前检查中断标志
        - 定期保存 Checkpoint
        - 捕获 CancelledError 优雅退出
        """
        self.conversation_history.append({"role": "user", "content": user_input})
        
        # 模拟 LLM 流式输出
        response_parts = []
        for i in range(10):  # 假设分10个片段
            # 🔑 检查是否被中断
            if self._interrupt_event.is_set():
                # 保存 Checkpoint
                self._save_checkpoint(response_parts)
                await self.websocket.send_json({
                    "type": "interrupted",
                    "content": "已中断",
                    "checkpoint_id": f"cp_{self.session_id}"
                })
                return
            
            # 模拟生成内容
            part = f"片段 {i + 1} "
            response_parts.append(part)
            
            # 发送内容片段
            await self.websocket.send_json({
                "type": "content",
                "content": part,
                "progress": (i + 1) / 10
            })
            
            # 定期保存 Checkpoint
            if (i + 1) % 3 == 0:
                self._save_checkpoint(response_parts)
            
            # 模拟生成延迟
            await asyncio.sleep(0.5)
        
        # 完成
        self.conversation_history.append({
            "role": "assistant", 
            "content": "".join(response_parts)
        })
    
    async def interrupt(self):
        """
        处理打断请求
        
        设置中断标志,执行中的任务会在下次检查时退出
        """
        self._interrupt_event.set()
        await self.websocket.send_json({
            "type": "interrupt_ack",
            "message": "打断指令已接收"
        })
    
    async def resume(self):
        """
        从中断恢复
        
        从 Checkpoint 读取状态,继续执行
        """
        if self.checkpoint_data:
            await self.websocket.send_json({
                "type": "resumed",
                "checkpoint": self.checkpoint_data
            })
            # 继续执行逻辑...
    
    def _save_checkpoint(self, response_parts: list):
        """保存 Checkpoint"""
        self.checkpoint_data = {
            "conversation_history": self.conversation_history.copy(),
            "response_parts": response_parts.copy(),
            "timestamp": asyncio.get_event_loop().time()
        }
客户端使用示例:
// 客户端 JavaScript
class InterruptibleClient {
    constructor(url) {
        this.ws = new WebSocket(url);
        this.setupHandlers();
    }
    
    setupHandlers() {
        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            switch (data.type) {
                case 'content':
                    this.onContent(data.content);
                    break;
                case 'interrupted':
                    this.onInterrupted(data);
                    break;
                case 'heartbeat_ack':
                    this.onHeartbeatAck();
                    break;
            }
        };
    }
    
    // 发送消息
    sendMessage(content) {
        this.ws.send(JSON.stringify({
            type: 'chat',
            content: content
        }));
    }
    
    // 打断(用户按下按钮时调用)
    interrupt() {
        this.ws.send(JSON.stringify({
            type: 'interrupt'
        }));
    }
    
    // 从中断恢复
    resume() {
        this.ws.send(JSON.stringify({
            type: 'resume'
        }));
    }
}

// 使用
const client = new InterruptibleClient('ws://localhost:8000/ws/agent');

// 用户随时可以打断
document.getElementById('interruptBtn').onclick = () => {
    client.interrupt();
};

9.5 全双工通讯决策指南

选择通讯协议

需要双向通讯?

只需要服务器推送?

选择 SSE\n简单、浏览器原生

选择 传统 HTTP\n简单请求-响应

需要音视频?

选择 WebRTC\n低延迟、抗丢包

需要传输二进制?

选择 WebSocket\n全双工二进制

需要极低延迟?

选择 WebSocket\n或 WebTransport

需要服务间通讯?

选择 gRPC\n高性能、强类型

选择 WebSocket\n通用双向通讯

场景 推荐协议 原因
LLM 流式输出 SSE 简单、EventSource 原生支持
实时聊天 WebSocket 全双工、低延迟
语音/视频通话 WebRTC 原生支持音视频
多模态(低延迟) WebTransport HTTP/3 QUIC
微服务通讯 gRPC 高性能、协议缓冲
异步任务队列 Redis/Kafka 解耦、削峰填谷

总结

在这篇文章中,我们深入探讨了 FastAPI 支持的多种通讯协议,以及它们在 LLM 应用中的实际应用。

核心要点回顾

  1. SSE 是 LLM 流式输出的首选:它简单、浏览器原生支持、与 OpenAI 格式兼容

  2. WebSocket 用于需要双向通讯的场景:实时对话、多人协作、低延迟需求

  3. gRPC 用于服务间通讯:高性能、强类型、支持双向流式

  4. 混合协议架构是成熟项目的选择:通过统一的消息格式和适配器,让核心逻辑与协议解耦

  5. 生产环境需要考虑的因素:连接池、心跳保活、背压控制、限流熔断

选择合适的通讯协议,只是构建优秀 LLM 应用的一个环节。但正是这些基础设施的完善,才能支撑起丝滑的用户体验。

希望这篇文章能帮助你在实际项目中做出正确的技术选型。如果还有其他问题,欢迎继续交流!


本文基于 FastAPI 官方文档、社区最佳实践以及实际项目经验编写。文中代码均可直接运行,如有问题,欢迎指正。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐