本文基于 sse-starlette v3.3.3 源码,系统剖析该库的架构设计、核心机制与生产实践。同时结合 A2A(Agent-to-Agent)Python SDK 中的实际集成案例,展示 SSE 技术在 AI Agent 流式通信场景中的工程应用。

一、背景与定位

Server-Sent Events(SSE)是 W3C 定义的一种基于 HTTP 的服务端推送技术。与 WebSocket 的全双工通信不同,SSE 采用单向推送模型——服务端通过一个持久的 HTTP 连接,以纯文本流的形式向客户端持续发送事件。其协议格式简洁,天然兼容 HTTP 基础设施(代理、负载均衡、CDN),且浏览器原生支持自动重连。

sse-starlette 是 Python 生态中针对 Starlette/FastAPI 框架的 SSE 实现库。该库的代码量极为精简——核心仅由 event.pysse.py 两个文件构成,但在这有限的代码中实现了完整的 SSE 协议支持、连接生命周期管理、优雅关闭协调以及多线程安全等关键能力。

二、核心组件架构

sse-starlette 的架构由三个核心组件构成,各司其职:

┌─────────────────────────────────────────────────────────┐
│                    对外公共 API                          │
│  EventSourceResponse  ServerSentEvent  JSONServerSentEvent │
└──────────┬──────────────────┬───────────────────────────┘
           │                  │
           ▼                  ▼
┌─────────────────┐  ┌────────────────────┐
│   sse.py        │  │   event.py         │
│                 │  │                    │
│ EventSourceResponse│  │ ServerSentEvent    │
│ AppStatus       │  │ JSONServerSentEvent│
│ _ShutdownState  │  │ ensure_bytes()     │
│ _shutdown_watcher│  │                    │
└─────────────────┘  └────────────────────┘

2.1 ServerSentEvent:SSE 协议编码器

ServerSentEvent 类负责将结构化数据编码为符合 SSE 规范的字节流。SSE 协议定义了以下字段:

字段 格式 说明
data data: <内容>\r\n 事件数据,多行数据拆分为多个 data:
event event: <类型>\r\n 事件类型,客户端据此分发处理
id id: <标识>\r\n 事件 ID,用于断线重连时的位置恢复
retry retry: <毫秒>\r\n 建议客户端的重连间隔
comment : <注释>\r\n 注释行,常用于心跳保活

每个事件以一个额外的空行(\r\n)作为结束标记。ServerSentEvent.encode() 方法严格按照此规范生成字节流:

class ServerSentEvent:
    _LINE_SEP_EXPR = re.compile(r"\r\n|\r|\n")

    def encode(self) -> bytes:
        buffer = io.StringIO()
        if self.comment is not None:
            for chunk in self._LINE_SEP_EXPR.split(str(self.comment)):
                buffer.write(f": {chunk}{self._sep}")

        if self.id is not None:
            buffer.write("id: " + self._LINE_SEP_EXPR.sub("", self.id) + self._sep)

        if self.event is not None:
            buffer.write("event: " + self._LINE_SEP_EXPR.sub("", self.event) + self._sep)

        if self.data is not None:
            for chunk in self._LINE_SEP_EXPR.split(str(self.data)):
                buffer.write(f"data: {chunk}{self._sep}")

        if self.retry is not None:
            buffer.write(f"retry: {self.retry}{self._sep}")

        buffer.write(self._sep)  # 事件结束的空行
        return buffer.getvalue().encode("utf-8")

值得注意的实现细节:

  • idevent 字段中的换行符被静默移除(sub("", ...)),防止注入非法的多行字段
  • datacomment 字段中的换行符被拆分为多个独立行,符合 SSE 规范对多行数据的处理要求
  • 分隔符(sep)可配置为 \r\n\r\n,以适配不同的网络环境

此外,库还提供了 JSONServerSentEvent 子类,在构造时自动将 Python 对象序列化为紧凑的 JSON 字符串(separators=(",", ":")ensure_ascii=False),简化了 JSON 数据的 SSE 传输。

辅助函数 ensure_bytes() 则提供了统一的类型转换入口,支持 bytesdictServerSentEvent 及任意对象的自动编码:

def ensure_bytes(data, sep):
    if isinstance(data, bytes):       return data
    if isinstance(data, ServerSentEvent): return data.encode()
    if isinstance(data, dict):
        data["sep"] = sep
        return ServerSentEvent(**data).encode()
    return ServerSentEvent(str(data), sep=sep).encode()

这一设计使得开发者可以灵活地向 EventSourceResponse 的生成器中 yield 不同类型的数据——字典、字符串、ServerSentEvent 对象甚至原始字节——库会自动处理编码。

2.2 EventSourceResponse:连接生命周期管理器

EventSourceResponse 继承自 Starlette 的 Response 类,是整个库的核心。它不仅负责将数据流转换为 SSE 格式的 HTTP 响应,更重要的是管理 SSE 连接的完整生命周期——包括数据推送、心跳保活、客户端断开检测和服务器关闭协调。

构造与初始化

构造函数接受丰富的配置参数,覆盖了 SSE 连接管理的各个方面:

EventSourceResponse(
    content,                      # 异步/同步迭代器,产生 SSE 事件数据
    status_code=200,              # HTTP 状态码
    headers=None,                 # 附加 HTTP 头
    ping=15,                      # 心跳间隔(秒),0 禁用
    sep="\r\n",                   # 行分隔符
    ping_message_factory=None,    # 自定义心跳消息工厂
    data_sender_callable=None,    # 推送式数据发送协程
    send_timeout=None,            # 单次发送超时(秒)
    client_close_handler_callable=None,  # 客户端断开回调
    shutdown_event=None,          # 协作式关闭事件
    shutdown_grace_period=0,      # 关闭宽限期(秒)
)

初始化阶段,库自动设置了三个关键的 HTTP 响应头:

响应头 作用
Cache-Control no-store(默认,可覆盖) 禁止缓存 SSE 流
Connection keep-alive 维持持久连接
X-Accel-Buffering no 禁用 Nginx 等反向代理的响应缓冲

其中 Cache-Control 被设计为可覆盖的(setdefault),以支持 Fastly 等 CDN 的扇出代理场景。

任务组竞赛模型

EventSourceResponse.__call__ 是 ASGI 协议的入口点。该方法创建一个 anyio 任务组,同时启动四个并发任务,采用"竞赛"模式运行——任何一个任务先完成,即取消其余所有任务:

async def __call__(self, scope, receive, send):
    async with anyio.create_task_group() as task_group:
        async def cancel_on_finish(coro):
            await coro()
            task_group.cancel_scope.cancel()

        task_group.start_soon(cancel_on_finish, lambda: self._stream_response(send))
        task_group.start_soon(cancel_on_finish, lambda: self._ping(send))
        task_group.start_soon(cancel_on_finish, self._listen_for_exit_signal_with_grace)
        task_group.start_soon(cancel_on_finish, lambda: self._listen_for_disconnect(receive))

        if self.data_sender_callable:
            task_group.start_soon(self.data_sender_callable)

这四个任务各自承担明确的职责:

任务 方法 职责 完成条件
数据推送 _stream_response 从迭代器读取数据,编码为 SSE 格式发送 迭代器耗尽
心跳保活 _ping 定期发送注释行,防止代理/负载均衡器超时断开 永不主动完成
关闭监听 _listen_for_exit_signal_with_grace 监听服务器关闭信号 收到关闭信号
断开检测 _listen_for_disconnect 监听客户端断开连接 收到 http.disconnect

cancel_on_finish 包装器确保了"先完成者胜出"的语义:无论是生成器正常耗尽、客户端主动断开、服务器发起关闭,还是发送超时,都能通过统一的取消机制终止整个连接。

数据推送与发送超时

_stream_response 方法负责从迭代器中逐项读取数据并发送给客户端。当配置了 send_timeout 时,每次发送操作都受到超时保护:

async def _stream_response(self, send):
    await send({"type": "http.response.start", "status": self.status_code, "headers": self.raw_headers})

    async for data in self.body_iterator:
        chunk = ensure_bytes(data, self.sep)
        with anyio.move_on_after(self.send_timeout) as cancel_scope:
            await send({"type": "http.response.body", "body": chunk, "more_body": True})

        if cancel_scope and cancel_scope.cancel_called:
            aclose = getattr(self.body_iterator, "aclose", None)
            if aclose is not None:
                await aclose()
            raise SendTimeoutError()

    async with self._send_lock:
        self.active = False
        await send({"type": "http.response.body", "body": b"", "more_body": False})

send_timeout 机制解决了一个生产环境中的常见问题:当客户端进程被挂起(如 Ctrl+Z)或网络中断但 TCP 连接未关闭时,send() 调用会因内核发送缓冲区满而无限阻塞。通过 anyio.move_on_after 设置超时,服务端可以主动检测并清理这类"僵尸连接",释放服务器资源。

注意 _send_lock 的使用——它确保了 _stream_response_ping 两个任务不会同时向同一个连接写入数据,避免了 SSE 消息的交错损坏。

心跳保活

_ping 方法按配置的间隔(默认 15 秒)发送心跳消息。默认的心跳格式为 SSE 注释行(以 : 开头),对客户端不可见但能维持连接活性:

: ping - 2025-03-20T10:30:00+00:00

开发者可通过 ping_message_factory 参数自定义心跳内容。心跳机制对于穿越反向代理(Nginx 默认 60 秒超时)和负载均衡器(HAProxy 等)的场景至关重要。

2.3 AppStatus 与关闭检测:两层架构

SSE 连接的优雅关闭是一个工程难题——服务器收到 SIGTERM 信号后,需要通知所有活跃的 SSE 连接进行清理,而不是粗暴地断开。sse-starlette 通过两层架构解决这一问题。

第一层:进程级信号捕获
class AppStatus:
    should_exit = False
    original_handler = None

    @staticmethod
    def handle_exit(*args, **kwargs):
        if AppStatus.enable_automatic_graceful_drain:
            AppStatus.should_exit = True
        if AppStatus.original_handler is not None:
            AppStatus.original_handler(*args, **kwargs)

# 模块加载时自动 monkey-patch uvicorn 的退出处理器
try:
    from uvicorn.main import Server
    AppStatus.original_handler = Server.handle_exit
    Server.handle_exit = AppStatus.handle_exit
except ImportError:
    pass

该机制在模块导入时通过 monkey-patch 替换 uvicorn 的 Server.handle_exit 方法。当 uvicorn 收到 SIGTERM/SIGINT 信号时,AppStatus.should_exit 标志被设置为 True,同时原始的 uvicorn 退出处理器仍然被调用,确保 uvicorn 自身的关闭流程不受影响。

对于 monkey-patch 失效的场景(如 uvicorn 版本变更导致 API 不兼容),库提供了回退方案——通过 signal.getsignal(SIGTERM) 内省信号处理器,找到 uvicorn 的 Server 实例并直接检查其 should_exit 属性:

def _get_uvicorn_server():
    try:
        handler = signal.getsignal(signal.SIGTERM)
        if hasattr(handler, "__self__"):
            server = handler.__self__
            if hasattr(server, "should_exit"):
                return server
    except Exception:
        pass
    return None
第二层:线程级事件广播
_thread_state = threading.local()

@dataclass
class _ShutdownState:
    events: Set[anyio.Event] = field(default_factory=set)
    watcher_started: bool = False

每个线程维护独立的 _ShutdownState(通过 threading.local()),包含该线程上所有 SSE 连接注册的 anyio.Event 集合。每个线程启动一个 _shutdown_watcher 协程,以 0.5 秒的间隔轮询 AppStatus.should_exit,一旦检测到关闭信号,即广播给该线程上的所有连接:

线程 A(主事件循环)              线程 B(辅助事件循环)
┌──────────────────────┐      ┌──────────────────────┐
│ _ShutdownState       │      │ _ShutdownState       │
│   events: {ev1, ev2} │      │   events: {ev3}      │
│   watcher: 轮询中     │      │   watcher: 轮询中     │
└──────────────────────┘      └──────────────────────┘
         │                              │
    检测到 should_exit              检测到 should_exit
         │                              │
    ev1.set(), ev2.set()           ev3.set()

使用 threading.local() 而非 contextvars.ContextVar 是一个经过深思熟虑的设计选择(Issue #152 修复)。ContextVar 在每个异步上下文中创建独立副本,可能导致同一线程上启动多个冗余的 watcher;而 threading.local() 确保每个线程(通常对应一个事件循环)仅有一个 watcher 实例。

三、协作式关闭机制(v3.3.0)

在 v3.3.0 之前,服务器关闭时所有 SSE 生成器会立即收到 CancelledError,无法向客户端发送告别消息。v3.3.0 引入的协作式关闭机制解决了这一问题。

3.1 工作原理

开发者在创建 EventSourceResponse 时传入一个 anyio.Event 对象和宽限期:

shutdown_event = anyio.Event()

async def generate():
    while not shutdown_event.is_set():
        yield {"data": "tick"}
        with anyio.move_on_after(1.0):
            await shutdown_event.wait()
    # 关闭信号已到达,发送告别事件
    yield {"event": "shutdown", "data": "server is shutting down, goodbye"}

EventSourceResponse(
    generate(),
    shutdown_event=shutdown_event,
    shutdown_grace_period=5.0,
)

当服务器关闭时,_listen_for_exit_signal_with_grace 方法执行以下流程:

1. _listen_for_exit_signal() 返回(检测到关闭信号)
2. 设置 shutdown_event(通知生成器)
3. 启动宽限期计时器(move_on_after(grace_period))
4. 等待 self.active 变为 False(生成器正常退出)
   ├── 生成器在宽限期内退出 → 干净关闭,告别事件已送达客户端
   └── 宽限期超时 → 方法返回,触发 cancel_on_finish,生成器被强制取消

3.2 设计约束

shutdown_grace_period 必须小于 ASGI 服务器的优雅关闭超时(如 uvicorn 的 --timeout-graceful-shutdown)。否则,进程会在宽限期结束前被操作系统强制终止,导致告别事件无法送达。

当未配置 shutdown_event 时(默认行为),关闭流程与 v3.3.0 之前完全一致——立即取消,无宽限期。这确保了向后兼容性。

四、连接终止场景全景

基于任务组竞赛模型,sse-starlette 能够统一处理所有连接终止场景:

场景 先完成的任务 生成器的命运
生成器正常耗尽 _stream_response 正常退出,告别事件已发送
客户端主动断开 _listen_for_disconnect 收到 CancelledError
服务器关闭(无宽限期) _listen_for_exit_signal_with_grace 收到 CancelledError
服务器关闭(有宽限期,生成器配合) _stream_response 正常退出,告别事件已发送
服务器关闭(有宽限期,生成器未配合) _listen_for_exit_signal_with_grace(超时后) 收到 CancelledError
发送超时 _stream_response(抛出异常) 迭代器被关闭,SendTimeoutError 传播

这种统一的处理模型大幅简化了开发者的心智负担——无论连接因何终止,清理逻辑都通过 anyio 的结构化并发机制自动执行。

五、生产环境实践

5.1 数据库会话的线程安全

在 SSE 生成器中使用数据库会话时,存在一个容易被忽视的陷阱。由于 EventSourceResponseanyio 任务组中消费生成器,生成器的执行上下文与请求处理函数的上下文不同。如果将依赖注入获取的数据库会话传入生成器,会导致跨任务的会话复用,引发线程安全问题。

正确的做法是在生成器内部创建独立的数据库会话:

# ✗ 错误:会话来自依赖注入,跨任务复用
async def wrong_stream(session: AsyncSession, request: Request):
    result = await session.execute(query)  # 危险:跨任务使用会话
    for row in result:
        yield {"data": row.name}

# ✓ 正确:在生成器内部创建会话
async def correct_stream(request: Request):
    async with AsyncSessionLocal() as session:  # 安全:会话在生成器上下文中创建
        result = await session.execute(query)
        for row in result:
            if await request.is_disconnected():
                break
            yield {"data": row.name}

5.2 Memory Channel 模式

对于复杂的数据流场景(如多生产者合并、背压控制),sse-starlette 支持使用 anyio 的 Memory Channel 替代生成器:

async def data_producer(send_channel):
    async with send_channel:
        for i in range(10):
            await send_channel.send({"data": f"Item {i}"})
            await anyio.sleep(1)

async def endpoint(request):
    send_channel, receive_channel = anyio.create_memory_object_stream(max_buffer_size=10)
    return EventSourceResponse(
        receive_channel,
        data_sender_callable=partial(data_producer, send_channel),
    )

data_sender_callable 参数指定的协程会在任务组中与其他任务并行运行。当 send_channel 关闭时,receive_channel 的迭代自然结束,触发 _stream_response 完成,进而取消整个任务组。

这种模式的优势在于将数据生产与 SSE 传输解耦,支持有界缓冲区的背压控制(max_buffer_size),并且可以通过多个生产者向同一个 channel 写入来实现流合并。

5.3 广播模式

sse-starlette 本身不内置广播功能,但其灵活的迭代器协议使得实现广播变得简单。典型的模式是为每个客户端维护一个独立的 asyncio.Queue,广播时向所有队列投递消息:

class MessageBroadcaster:
    def __init__(self):
        self._clients: list[asyncio.Queue] = []

    def add_client(self) -> asyncio.Queue:
        queue = asyncio.Queue()
        self._clients.append(queue)
        return queue

    async def broadcast(self, message: str):
        for queue in self._clients:
            queue.put_nowait(ServerSentEvent(data=message))

客户端通过实现 __aiter__/__anext__ 协议从各自的队列中消费事件,EventSourceResponse 无需任何修改即可驱动这一模式。

5.4 反向代理与 CDN 配置

SSE 流在经过网络基础设施时容易遇到缓冲问题。sse-starlette 默认设置的 X-Accel-Buffering: no 头可以禁用 Nginx 的响应缓冲,但在更复杂的部署拓扑中,可能还需要额外的配置:

location /events {
    proxy_pass http://localhost:8000;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;
    chunked_transfer_encoding off;
}

对于 某些 CDN,由于其默认缓冲约 100KB 数据后才开始向客户端刷新,SSE 的实时性会受到严重影响。在此类场景下,建议将 SSE 端点排除在 CDN 缓存之外,或使用支持 SSE 透传的 CDN 配置。

六、在 A2A Python SDK 中的集成实践

sse-starlette 在 A2A(Agent-to-Agent)协议的 Python SDK 中承担了 Agent 间流式通信的底层传输职责。A2A 协议定义了 SendStreamingMessageSubscribeToTask 两个流式操作,SDK 通过 EventSourceResponse 将 Agent 产生的事件流实时推送给客户端。

6.1 集成架构

SDK 的服务端在两个位置使用了 EventSourceResponse

JSON-RPC 应用jsonrpc_app.py)——处理 message/streamtasks/resubscribe 请求时,将 AsyncGenerator[SendStreamingMessageResponse] 转换为 SSE 流:

def _create_response(self, context, handler_result):
    if isinstance(handler_result, AsyncGenerator):
        async def event_generator(stream):
            async for item in stream:
                yield {'data': item.root.model_dump_json(exclude_none=True)}

        return EventSourceResponse(event_generator(handler_result), headers=headers)

REST 适配器rest_adapter.py)——在 /v1/message:stream/v1/tasks/{id}:subscribe 端点中执行相同的转换:

async def _handle_streaming_request(self, method, request):
    await request.body()  # 预消费请求体,防止死锁

    async def event_generator(stream):
        async for item in stream:
            yield {'data': item}

    return EventSourceResponse(event_generator(method(request, call_context)))

6.2 请求体预消费

REST 适配器中的 await request.body() 调用值得特别关注。Starlette 的 Request.body() 只能被消费一次,且其内部实现依赖于 ASGI 的 receive 回调。而 EventSourceResponse 启动后,receive 回调被 _listen_for_disconnect 任务独占用于监听客户端断开。如果在 EventSourceResponse 启动后再尝试读取请求体,receive 调用会因为竞争而导致死锁。

SDK 通过在创建 EventSourceResponse 之前预消费请求体来规避这一问题。这是一个在 sse-starlette 与 Starlette 集成时容易踩到的坑,SDK 的处理方式值得借鉴。

6.3 集成模式总结

A2A SDK 对 sse-starlette 的使用体现了一个清晰的集成模式:

  1. 业务层产生 AsyncGenerator,yield 领域对象(如 SendStreamingMessageResponse
  2. 适配层通过一个轻量的 event_generator 函数将领域对象序列化为 {'data': json_string} 格式的字典
  3. EventSourceResponse 接管后续的所有 SSE 协议细节——编码、心跳、断开检测、关闭协调

这种分层使得业务代码完全不需要感知 SSE 协议的存在,实现了传输层与业务逻辑的彻底解耦。

七、总结

sse-starlette 以极为精简的代码量(核心约 400 行)实现了一个生产级的 SSE 方案。其设计中有几个值得关注的工程决策:

  • 任务组竞赛模型:通过 anyio 的结构化并发,将数据推送、心跳保活、断开检测和关闭协调统一在一个优雅的并发框架中,任何终止条件都能触发一致的清理流程
  • 两层关闭检测:进程级的信号捕获(monkey-patch + 信号内省回退)与线程级的事件广播(threading.local())相结合,兼顾了可靠性和多线程安全性
  • 协作式关闭shutdown_event + shutdown_grace_period 机制在不破坏向后兼容性的前提下,为生成器提供了优雅退出的能力
  • 灵活的数据输入:支持异步生成器、同步迭代器、Memory Channel 和自定义异步迭代器协议,适配各种数据生产模式

对于需要在 Python 异步 Web 框架中实现服务端推送的场景——无论是 AI Agent 的流式响应、实时数据仪表盘,还是事件驱动的通知系统——sse-starlette 都是一个值得信赖的基础设施组件。


参考资料:

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐