sse-starlette 源码深度解析:SSE 协议在 Python 异步生态中的工程实现
本文基于 sse-starlette v3.3.3 源码,系统剖析该库的架构设计、核心机制与生产实践。同时结合 A2A(Agent-to-Agent)Python SDK 中的实际集成案例,展示 SSE 技术在 AI Agent 流式通信场景中的工程应用。
一、背景与定位
Server-Sent Events(SSE)是 W3C 定义的一种基于 HTTP 的服务端推送技术。与 WebSocket 的全双工通信不同,SSE 采用单向推送模型——服务端通过一个持久的 HTTP 连接,以纯文本流的形式向客户端持续发送事件。其协议格式简洁,天然兼容 HTTP 基础设施(代理、负载均衡、CDN),且浏览器原生支持自动重连。
sse-starlette 是 Python 生态中针对 Starlette/FastAPI 框架的 SSE 实现库。该库的代码量极为精简——核心仅由 event.py 和 sse.py 两个文件构成,但在这有限的代码中实现了完整的 SSE 协议支持、连接生命周期管理、优雅关闭协调以及多线程安全等关键能力。
二、核心组件架构
sse-starlette 的架构由三个核心组件构成,各司其职:
┌─────────────────────────────────────────────────────────┐
│ 对外公共 API │
│ EventSourceResponse ServerSentEvent JSONServerSentEvent │
└──────────┬──────────────────┬───────────────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌────────────────────┐
│ sse.py │ │ event.py │
│ │ │ │
│ EventSourceResponse│ │ ServerSentEvent │
│ AppStatus │ │ JSONServerSentEvent│
│ _ShutdownState │ │ ensure_bytes() │
│ _shutdown_watcher│ │ │
└─────────────────┘ └────────────────────┘
2.1 ServerSentEvent:SSE 协议编码器
ServerSentEvent 类负责将结构化数据编码为符合 SSE 规范的字节流。SSE 协议定义了以下字段:
| 字段 | 格式 | 说明 |
|---|---|---|
data |
data: <内容>\r\n |
事件数据,多行数据拆分为多个 data: 行 |
event |
event: <类型>\r\n |
事件类型,客户端据此分发处理 |
id |
id: <标识>\r\n |
事件 ID,用于断线重连时的位置恢复 |
retry |
retry: <毫秒>\r\n |
建议客户端的重连间隔 |
comment |
: <注释>\r\n |
注释行,常用于心跳保活 |
每个事件以一个额外的空行(\r\n)作为结束标记。ServerSentEvent.encode() 方法严格按照此规范生成字节流:
class ServerSentEvent:
_LINE_SEP_EXPR = re.compile(r"\r\n|\r|\n")
def encode(self) -> bytes:
buffer = io.StringIO()
if self.comment is not None:
for chunk in self._LINE_SEP_EXPR.split(str(self.comment)):
buffer.write(f": {chunk}{self._sep}")
if self.id is not None:
buffer.write("id: " + self._LINE_SEP_EXPR.sub("", self.id) + self._sep)
if self.event is not None:
buffer.write("event: " + self._LINE_SEP_EXPR.sub("", self.event) + self._sep)
if self.data is not None:
for chunk in self._LINE_SEP_EXPR.split(str(self.data)):
buffer.write(f"data: {chunk}{self._sep}")
if self.retry is not None:
buffer.write(f"retry: {self.retry}{self._sep}")
buffer.write(self._sep) # 事件结束的空行
return buffer.getvalue().encode("utf-8")
值得注意的实现细节:
id和event字段中的换行符被静默移除(sub("", ...)),防止注入非法的多行字段data和comment字段中的换行符被拆分为多个独立行,符合 SSE 规范对多行数据的处理要求- 分隔符(
sep)可配置为\r\n、\r或\n,以适配不同的网络环境
此外,库还提供了 JSONServerSentEvent 子类,在构造时自动将 Python 对象序列化为紧凑的 JSON 字符串(separators=(",", ":"),ensure_ascii=False),简化了 JSON 数据的 SSE 传输。
辅助函数 ensure_bytes() 则提供了统一的类型转换入口,支持 bytes、dict、ServerSentEvent 及任意对象的自动编码:
def ensure_bytes(data, sep):
if isinstance(data, bytes): return data
if isinstance(data, ServerSentEvent): return data.encode()
if isinstance(data, dict):
data["sep"] = sep
return ServerSentEvent(**data).encode()
return ServerSentEvent(str(data), sep=sep).encode()
这一设计使得开发者可以灵活地向 EventSourceResponse 的生成器中 yield 不同类型的数据——字典、字符串、ServerSentEvent 对象甚至原始字节——库会自动处理编码。
2.2 EventSourceResponse:连接生命周期管理器
EventSourceResponse 继承自 Starlette 的 Response 类,是整个库的核心。它不仅负责将数据流转换为 SSE 格式的 HTTP 响应,更重要的是管理 SSE 连接的完整生命周期——包括数据推送、心跳保活、客户端断开检测和服务器关闭协调。
构造与初始化
构造函数接受丰富的配置参数,覆盖了 SSE 连接管理的各个方面:
EventSourceResponse(
content, # 异步/同步迭代器,产生 SSE 事件数据
status_code=200, # HTTP 状态码
headers=None, # 附加 HTTP 头
ping=15, # 心跳间隔(秒),0 禁用
sep="\r\n", # 行分隔符
ping_message_factory=None, # 自定义心跳消息工厂
data_sender_callable=None, # 推送式数据发送协程
send_timeout=None, # 单次发送超时(秒)
client_close_handler_callable=None, # 客户端断开回调
shutdown_event=None, # 协作式关闭事件
shutdown_grace_period=0, # 关闭宽限期(秒)
)
初始化阶段,库自动设置了三个关键的 HTTP 响应头:
| 响应头 | 值 | 作用 |
|---|---|---|
Cache-Control |
no-store(默认,可覆盖) |
禁止缓存 SSE 流 |
Connection |
keep-alive |
维持持久连接 |
X-Accel-Buffering |
no |
禁用 Nginx 等反向代理的响应缓冲 |
其中 Cache-Control 被设计为可覆盖的(setdefault),以支持 Fastly 等 CDN 的扇出代理场景。
任务组竞赛模型
EventSourceResponse.__call__ 是 ASGI 协议的入口点。该方法创建一个 anyio 任务组,同时启动四个并发任务,采用"竞赛"模式运行——任何一个任务先完成,即取消其余所有任务:
async def __call__(self, scope, receive, send):
async with anyio.create_task_group() as task_group:
async def cancel_on_finish(coro):
await coro()
task_group.cancel_scope.cancel()
task_group.start_soon(cancel_on_finish, lambda: self._stream_response(send))
task_group.start_soon(cancel_on_finish, lambda: self._ping(send))
task_group.start_soon(cancel_on_finish, self._listen_for_exit_signal_with_grace)
task_group.start_soon(cancel_on_finish, lambda: self._listen_for_disconnect(receive))
if self.data_sender_callable:
task_group.start_soon(self.data_sender_callable)
这四个任务各自承担明确的职责:
| 任务 | 方法 | 职责 | 完成条件 |
|---|---|---|---|
| 数据推送 | _stream_response |
从迭代器读取数据,编码为 SSE 格式发送 | 迭代器耗尽 |
| 心跳保活 | _ping |
定期发送注释行,防止代理/负载均衡器超时断开 | 永不主动完成 |
| 关闭监听 | _listen_for_exit_signal_with_grace |
监听服务器关闭信号 | 收到关闭信号 |
| 断开检测 | _listen_for_disconnect |
监听客户端断开连接 | 收到 http.disconnect |
cancel_on_finish 包装器确保了"先完成者胜出"的语义:无论是生成器正常耗尽、客户端主动断开、服务器发起关闭,还是发送超时,都能通过统一的取消机制终止整个连接。
数据推送与发送超时
_stream_response 方法负责从迭代器中逐项读取数据并发送给客户端。当配置了 send_timeout 时,每次发送操作都受到超时保护:
async def _stream_response(self, send):
await send({"type": "http.response.start", "status": self.status_code, "headers": self.raw_headers})
async for data in self.body_iterator:
chunk = ensure_bytes(data, self.sep)
with anyio.move_on_after(self.send_timeout) as cancel_scope:
await send({"type": "http.response.body", "body": chunk, "more_body": True})
if cancel_scope and cancel_scope.cancel_called:
aclose = getattr(self.body_iterator, "aclose", None)
if aclose is not None:
await aclose()
raise SendTimeoutError()
async with self._send_lock:
self.active = False
await send({"type": "http.response.body", "body": b"", "more_body": False})
send_timeout 机制解决了一个生产环境中的常见问题:当客户端进程被挂起(如 Ctrl+Z)或网络中断但 TCP 连接未关闭时,send() 调用会因内核发送缓冲区满而无限阻塞。通过 anyio.move_on_after 设置超时,服务端可以主动检测并清理这类"僵尸连接",释放服务器资源。
注意 _send_lock 的使用——它确保了 _stream_response 和 _ping 两个任务不会同时向同一个连接写入数据,避免了 SSE 消息的交错损坏。
心跳保活
_ping 方法按配置的间隔(默认 15 秒)发送心跳消息。默认的心跳格式为 SSE 注释行(以 : 开头),对客户端不可见但能维持连接活性:
: ping - 2025-03-20T10:30:00+00:00
开发者可通过 ping_message_factory 参数自定义心跳内容。心跳机制对于穿越反向代理(Nginx 默认 60 秒超时)和负载均衡器(HAProxy 等)的场景至关重要。
2.3 AppStatus 与关闭检测:两层架构
SSE 连接的优雅关闭是一个工程难题——服务器收到 SIGTERM 信号后,需要通知所有活跃的 SSE 连接进行清理,而不是粗暴地断开。sse-starlette 通过两层架构解决这一问题。
第一层:进程级信号捕获
class AppStatus:
should_exit = False
original_handler = None
@staticmethod
def handle_exit(*args, **kwargs):
if AppStatus.enable_automatic_graceful_drain:
AppStatus.should_exit = True
if AppStatus.original_handler is not None:
AppStatus.original_handler(*args, **kwargs)
# 模块加载时自动 monkey-patch uvicorn 的退出处理器
try:
from uvicorn.main import Server
AppStatus.original_handler = Server.handle_exit
Server.handle_exit = AppStatus.handle_exit
except ImportError:
pass
该机制在模块导入时通过 monkey-patch 替换 uvicorn 的 Server.handle_exit 方法。当 uvicorn 收到 SIGTERM/SIGINT 信号时,AppStatus.should_exit 标志被设置为 True,同时原始的 uvicorn 退出处理器仍然被调用,确保 uvicorn 自身的关闭流程不受影响。
对于 monkey-patch 失效的场景(如 uvicorn 版本变更导致 API 不兼容),库提供了回退方案——通过 signal.getsignal(SIGTERM) 内省信号处理器,找到 uvicorn 的 Server 实例并直接检查其 should_exit 属性:
def _get_uvicorn_server():
try:
handler = signal.getsignal(signal.SIGTERM)
if hasattr(handler, "__self__"):
server = handler.__self__
if hasattr(server, "should_exit"):
return server
except Exception:
pass
return None
第二层:线程级事件广播
_thread_state = threading.local()
@dataclass
class _ShutdownState:
events: Set[anyio.Event] = field(default_factory=set)
watcher_started: bool = False
每个线程维护独立的 _ShutdownState(通过 threading.local()),包含该线程上所有 SSE 连接注册的 anyio.Event 集合。每个线程启动一个 _shutdown_watcher 协程,以 0.5 秒的间隔轮询 AppStatus.should_exit,一旦检测到关闭信号,即广播给该线程上的所有连接:
线程 A(主事件循环) 线程 B(辅助事件循环)
┌──────────────────────┐ ┌──────────────────────┐
│ _ShutdownState │ │ _ShutdownState │
│ events: {ev1, ev2} │ │ events: {ev3} │
│ watcher: 轮询中 │ │ watcher: 轮询中 │
└──────────────────────┘ └──────────────────────┘
│ │
检测到 should_exit 检测到 should_exit
│ │
ev1.set(), ev2.set() ev3.set()
使用 threading.local() 而非 contextvars.ContextVar 是一个经过深思熟虑的设计选择(Issue #152 修复)。ContextVar 在每个异步上下文中创建独立副本,可能导致同一线程上启动多个冗余的 watcher;而 threading.local() 确保每个线程(通常对应一个事件循环)仅有一个 watcher 实例。
三、协作式关闭机制(v3.3.0)
在 v3.3.0 之前,服务器关闭时所有 SSE 生成器会立即收到 CancelledError,无法向客户端发送告别消息。v3.3.0 引入的协作式关闭机制解决了这一问题。
3.1 工作原理
开发者在创建 EventSourceResponse 时传入一个 anyio.Event 对象和宽限期:
shutdown_event = anyio.Event()
async def generate():
while not shutdown_event.is_set():
yield {"data": "tick"}
with anyio.move_on_after(1.0):
await shutdown_event.wait()
# 关闭信号已到达,发送告别事件
yield {"event": "shutdown", "data": "server is shutting down, goodbye"}
EventSourceResponse(
generate(),
shutdown_event=shutdown_event,
shutdown_grace_period=5.0,
)
当服务器关闭时,_listen_for_exit_signal_with_grace 方法执行以下流程:
1. _listen_for_exit_signal() 返回(检测到关闭信号)
2. 设置 shutdown_event(通知生成器)
3. 启动宽限期计时器(move_on_after(grace_period))
4. 等待 self.active 变为 False(生成器正常退出)
├── 生成器在宽限期内退出 → 干净关闭,告别事件已送达客户端
└── 宽限期超时 → 方法返回,触发 cancel_on_finish,生成器被强制取消
3.2 设计约束
shutdown_grace_period 必须小于 ASGI 服务器的优雅关闭超时(如 uvicorn 的 --timeout-graceful-shutdown)。否则,进程会在宽限期结束前被操作系统强制终止,导致告别事件无法送达。
当未配置 shutdown_event 时(默认行为),关闭流程与 v3.3.0 之前完全一致——立即取消,无宽限期。这确保了向后兼容性。
四、连接终止场景全景
基于任务组竞赛模型,sse-starlette 能够统一处理所有连接终止场景:
| 场景 | 先完成的任务 | 生成器的命运 |
|---|---|---|
| 生成器正常耗尽 | _stream_response |
正常退出,告别事件已发送 |
| 客户端主动断开 | _listen_for_disconnect |
收到 CancelledError |
| 服务器关闭(无宽限期) | _listen_for_exit_signal_with_grace |
收到 CancelledError |
| 服务器关闭(有宽限期,生成器配合) | _stream_response |
正常退出,告别事件已发送 |
| 服务器关闭(有宽限期,生成器未配合) | _listen_for_exit_signal_with_grace(超时后) |
收到 CancelledError |
| 发送超时 | _stream_response(抛出异常) |
迭代器被关闭,SendTimeoutError 传播 |
这种统一的处理模型大幅简化了开发者的心智负担——无论连接因何终止,清理逻辑都通过 anyio 的结构化并发机制自动执行。
五、生产环境实践
5.1 数据库会话的线程安全
在 SSE 生成器中使用数据库会话时,存在一个容易被忽视的陷阱。由于 EventSourceResponse 在 anyio 任务组中消费生成器,生成器的执行上下文与请求处理函数的上下文不同。如果将依赖注入获取的数据库会话传入生成器,会导致跨任务的会话复用,引发线程安全问题。
正确的做法是在生成器内部创建独立的数据库会话:
# ✗ 错误:会话来自依赖注入,跨任务复用
async def wrong_stream(session: AsyncSession, request: Request):
result = await session.execute(query) # 危险:跨任务使用会话
for row in result:
yield {"data": row.name}
# ✓ 正确:在生成器内部创建会话
async def correct_stream(request: Request):
async with AsyncSessionLocal() as session: # 安全:会话在生成器上下文中创建
result = await session.execute(query)
for row in result:
if await request.is_disconnected():
break
yield {"data": row.name}
5.2 Memory Channel 模式
对于复杂的数据流场景(如多生产者合并、背压控制),sse-starlette 支持使用 anyio 的 Memory Channel 替代生成器:
async def data_producer(send_channel):
async with send_channel:
for i in range(10):
await send_channel.send({"data": f"Item {i}"})
await anyio.sleep(1)
async def endpoint(request):
send_channel, receive_channel = anyio.create_memory_object_stream(max_buffer_size=10)
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(data_producer, send_channel),
)
data_sender_callable 参数指定的协程会在任务组中与其他任务并行运行。当 send_channel 关闭时,receive_channel 的迭代自然结束,触发 _stream_response 完成,进而取消整个任务组。
这种模式的优势在于将数据生产与 SSE 传输解耦,支持有界缓冲区的背压控制(max_buffer_size),并且可以通过多个生产者向同一个 channel 写入来实现流合并。
5.3 广播模式
sse-starlette 本身不内置广播功能,但其灵活的迭代器协议使得实现广播变得简单。典型的模式是为每个客户端维护一个独立的 asyncio.Queue,广播时向所有队列投递消息:
class MessageBroadcaster:
def __init__(self):
self._clients: list[asyncio.Queue] = []
def add_client(self) -> asyncio.Queue:
queue = asyncio.Queue()
self._clients.append(queue)
return queue
async def broadcast(self, message: str):
for queue in self._clients:
queue.put_nowait(ServerSentEvent(data=message))
客户端通过实现 __aiter__/__anext__ 协议从各自的队列中消费事件,EventSourceResponse 无需任何修改即可驱动这一模式。
5.4 反向代理与 CDN 配置
SSE 流在经过网络基础设施时容易遇到缓冲问题。sse-starlette 默认设置的 X-Accel-Buffering: no 头可以禁用 Nginx 的响应缓冲,但在更复杂的部署拓扑中,可能还需要额外的配置:
location /events {
proxy_pass http://localhost:8000;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
chunked_transfer_encoding off;
}
对于 某些 CDN,由于其默认缓冲约 100KB 数据后才开始向客户端刷新,SSE 的实时性会受到严重影响。在此类场景下,建议将 SSE 端点排除在 CDN 缓存之外,或使用支持 SSE 透传的 CDN 配置。
六、在 A2A Python SDK 中的集成实践
sse-starlette 在 A2A(Agent-to-Agent)协议的 Python SDK 中承担了 Agent 间流式通信的底层传输职责。A2A 协议定义了 SendStreamingMessage 和 SubscribeToTask 两个流式操作,SDK 通过 EventSourceResponse 将 Agent 产生的事件流实时推送给客户端。
6.1 集成架构
SDK 的服务端在两个位置使用了 EventSourceResponse:
JSON-RPC 应用(jsonrpc_app.py)——处理 message/stream 和 tasks/resubscribe 请求时,将 AsyncGenerator[SendStreamingMessageResponse] 转换为 SSE 流:
def _create_response(self, context, handler_result):
if isinstance(handler_result, AsyncGenerator):
async def event_generator(stream):
async for item in stream:
yield {'data': item.root.model_dump_json(exclude_none=True)}
return EventSourceResponse(event_generator(handler_result), headers=headers)
REST 适配器(rest_adapter.py)——在 /v1/message:stream 和 /v1/tasks/{id}:subscribe 端点中执行相同的转换:
async def _handle_streaming_request(self, method, request):
await request.body() # 预消费请求体,防止死锁
async def event_generator(stream):
async for item in stream:
yield {'data': item}
return EventSourceResponse(event_generator(method(request, call_context)))
6.2 请求体预消费
REST 适配器中的 await request.body() 调用值得特别关注。Starlette 的 Request.body() 只能被消费一次,且其内部实现依赖于 ASGI 的 receive 回调。而 EventSourceResponse 启动后,receive 回调被 _listen_for_disconnect 任务独占用于监听客户端断开。如果在 EventSourceResponse 启动后再尝试读取请求体,receive 调用会因为竞争而导致死锁。
SDK 通过在创建 EventSourceResponse 之前预消费请求体来规避这一问题。这是一个在 sse-starlette 与 Starlette 集成时容易踩到的坑,SDK 的处理方式值得借鉴。
6.3 集成模式总结
A2A SDK 对 sse-starlette 的使用体现了一个清晰的集成模式:
- 业务层产生
AsyncGenerator,yield 领域对象(如SendStreamingMessageResponse) - 适配层通过一个轻量的
event_generator函数将领域对象序列化为{'data': json_string}格式的字典 EventSourceResponse接管后续的所有 SSE 协议细节——编码、心跳、断开检测、关闭协调
这种分层使得业务代码完全不需要感知 SSE 协议的存在,实现了传输层与业务逻辑的彻底解耦。
七、总结
sse-starlette 以极为精简的代码量(核心约 400 行)实现了一个生产级的 SSE 方案。其设计中有几个值得关注的工程决策:
- 任务组竞赛模型:通过
anyio的结构化并发,将数据推送、心跳保活、断开检测和关闭协调统一在一个优雅的并发框架中,任何终止条件都能触发一致的清理流程 - 两层关闭检测:进程级的信号捕获(monkey-patch + 信号内省回退)与线程级的事件广播(
threading.local())相结合,兼顾了可靠性和多线程安全性 - 协作式关闭:
shutdown_event+shutdown_grace_period机制在不破坏向后兼容性的前提下,为生成器提供了优雅退出的能力 - 灵活的数据输入:支持异步生成器、同步迭代器、Memory Channel 和自定义异步迭代器协议,适配各种数据生产模式
对于需要在 Python 异步 Web 框架中实现服务端推送的场景——无论是 AI Agent 的流式响应、实时数据仪表盘,还是事件驱动的通知系统——sse-starlette 都是一个值得信赖的基础设施组件。
参考资料:
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)