AI-SSE与Streameable 以及流式实现中断重连
SSE http
2025年前的mcp协议规范中,通信被强制拆分为两个独立的通道(双断点模式)
客户端发送:通过post请求发送到/message端点。
服务器推送:客户端必须先建立一个get请求到/sse端点,服务器通过这个长连接不断推送事件。
情景:给某人打电话(post),为了听到回复,同步开着另外一条视频通话(sse)
“双车道”。先开一条路专门听服务器说话(SSE),然后再开一条路给服务器发指令(POST)。
// --- 前端代码 (SSE 模式) ---
// 1. 【监听通道】先建立长连接,专门用来接收服务器的推送
const eventSource = new EventSource('/mcp/sse');
eventSource.onmessage = (event) => {
// 接收流式数据
const data = JSON.parse(event.data);
console.log('收到 AI 回复:', data.content);
};
// 2. 【发送通道】通过另一个接口发送请求
// 注意:这里发过去,但拿不到返回值,返回值会走上面的 eventSource
fetch('/mcp/message', {
method: 'POST',
body: JSON.stringify({ prompt: "你好" })
});
Streameable http
统一端点:所有通信(发送请求,接收流式响应)通过同一端点。
按需流式:客户端发送POST请求,并在Header中声明Accept:text/event-stream。服务器根据情况选择返回JSON(块)或者建立SSE流。
情景:微信聊天。发送消息,回复直接就在这条消息的线程里回来,不需要额外建立连接。
“单行道”。发请求的地方就是收回复的地方。一次 fetch 搞定所有事。
// --- 前端代码 (Streamable HTTP 模式) ---
// 【统一通道】发送 POST 请求,并告诉服务器“我要流式回复”
const response = await fetch('/mcp', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream' // 关键:申请流式响应
},
body: JSON.stringify({ prompt: "你好" })
});
// 直接从同一个请求的响应体中读取流
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const text = decoder.decode(value);
console.log('收到 AI 回复:', text);
}
| 维度 | 传统 HTTP + SSE (旧) | Streamable HTTP (新) | 优势 |
|---|---|---|---|
| 端点结构 | 双端点 (/message + /sse) |
单端点 (/mcp) |
简化了 API 设计,减少了配置错误。 |
| 连接模式 | 强制长连接 (SSE 通道必须一直开着) | 按需连接 (请求结束即断开,或升级为流) | 节省服务器资源,避免“连接泄露”。 |
| 防火墙兼容性 | 差 (企业防火墙常切断长连接) | 优 (伪装成普通 HTTP 请求) | 在公司内网或复杂网络环境下更稳定。 |
| 并发性能 | 低 (高并发下 TCP 连接数爆炸) | 高 (支持 HTTP/2 多路复用,连接复用) | 能支撑数千个并发用户,不会拖垮服务器。 |
| 断线恢复 | 困难 (连接断开需重连 SSE 通道) | 支持 (基于 Last-Event-ID 和 Session ID) |
网络波动时能自动续传,体验更丝滑。 |
| 状态管理 | 依赖连接保持状态 | 依赖 Mcp-Session-Id 头 |
支持无状态服务器,易于水平扩展(加机器)。 |
Streameable http流式响应
服务端立马不关闭;
持续通过Connection:keep-alive保持TCP连接;
分块发送数据(Transfer-Encoding:chunked);
客户端通过response.body.getReader()分块读取;
const response = await fetch('/stream');
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break; // 连接关闭
console.log('收到数据块:', value); // Uint8Array
}
一旦reader.read()非正常中断或报错,重新发起fetch请求,HTTP是无状态的,不能恢复一个已经断开的TCP连接。
中断:网络丢失(网络切换或丢失),服务端重启或者崩溃或其他的原因导致流式数据未完成而中断;
重连:中断后重新发起全新的请求;
重连的两种机制(断点续传,指数退避)
断点续传-Last-Event-ID
SSE协议的标准做法,使用Streameable也可以使用这种进行重连。
前端:记录每条消息也就是每条消息块的ID,重连时通过ID进行请求头发送。
GET /events HTTP/1.1
Last-Event-ID: abc123
后端:读取Last-Event-ID头,从该ID后的消息开始推送,避免重复或丢失消息。
指数退避-Exponential Backoff
在中断发起重连时,通过时间间隔进行重连比如 (第一次/1s 第二次/2s 第三次/4s 第四次/8s)成指数的形式时间间隔避免时间间隔太短带给服务器压力,设置最大时间不超过30s
delay = min( base * 2^retryCount + random_jitter, max_delay )
Streameable-代码例子
前端fetchServe.js
// src/utils/fetchServe.js
/**
* 增强版 SSE 客户端(基于 fetch)
* 特性:
* - 自动检测中断并重连(指数退避)
* - 支持 Last-Event-ID 断点续传
* - 提供 onStatus 回调(connecting / connected / disconnected)
* - 完全兼容 Vue 2 生命周期
*/
export class FetchSSEClient {
constructor(url, { onMessage, onError, onStatus }) {
this.url = url;
this.onMessage = onMessage || (() => {});
this.onError = onError || console.error;
this.onStatus = onStatus || (() => {});
this.lastEventId = null;
this.abortController = null;
this.reconnectTimer = null;
this.retryCount = 0;
this.isClosing = false;
this.isConnected = false;
}
start() {
if (this.isClosing) return;
this.connect();
}
async connect() {
this.cleanup();
// 更新状态:正在连接
this.updateStatus('connecting');
const controller = new AbortController();
this.abortController = controller;
try {
const headers = {};
if (this.lastEventId) {
headers['Last-Event-ID'] = this.lastEventId;
}
const response = await fetch(this.url, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
Connection: 'keep-alive',
...headers,
},
signal: controller.signal,
});
if (!response.ok || !response.body) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
this.retryCount = 0;
this.isConnected = true;
this.updateStatus('connected'); // 连接成功!
while (true) {
const { done, value } = await reader.read();
if (done) {
// 服务端主动关闭(如 Nginx 超时、后端重启)
console.warn('[SSE] 服务端关闭了连接');
this.handleDisconnect();
break;
}
if (value?.length) {
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
let currentId = '';
let currentData = '';
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.startsWith('id:')) {
currentId = trimmed.slice(3).trim();
} else if (trimmed.startsWith('data:')) {
currentData = trimmed.slice(5).trim();
} else if (trimmed === '') {
if (currentData) {
try {
const parsed = JSON.parse(currentData);
if (currentId) this.lastEventId = currentId;
this.onMessage(parsed, currentId);
} catch (e) {
console.warn('SSE 解析失败:', currentData);
}
currentData = '';
currentId = '';
}
}
}
}
}
} catch (err) {
if (controller.signal.aborted) return; // 正常关闭
console.error('[SSE] 连接异常:', err.message);
this.onError(err);
this.handleDisconnect();
}
}
/**
* 统一处理连接断开逻辑
*/
handleDisconnect() {
this.isConnected = false;
this.updateStatus('disconnected');
this.scheduleReconnect();
}
/**
* 调度自动重连(指数退避 + 随机抖动)
*/
scheduleReconnect() {
if (this.isClosing || this.reconnectTimer) return;
const baseDelay = 1000;
const maxDelay = 30000;
const delay = Math.min(
baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
maxDelay
);
this.reconnectTimer = setTimeout(() => {
console.log(`[SSE] 自动重连中... 第 ${this.retryCount + 1} 次尝试`);
this.reconnectTimer = null;
this.retryCount++;
this.start();
}, delay);
}
updateStatus(status) {
this.onStatus(status); // 通知外部(如 Vue 组件更新 UI)
}
close() {
this.isClosing = true;
this.cleanup();
this.updateStatus('closed');
}
cleanup() {
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.retryCount = 0;
}
}
vue2组件EventSource.vue
<!-- src/components/AutoReconnectSSE.vue -->
<template>
<div style="padding: 20px; font-family: Arial, sans-serif;">
<h2>自动重连 SSE 示例</h2>
<!-- 状态指示器 -->
<div :style="{ color: statusColor }">
● 状态: {{ statusText }}
</div>
<!-- 最新消息 -->
<div v-if="latestMessage" style="margin-top: 15px; padding: 10px; background: #f9f9f9;">
<strong>最新消息 (ID: {{ lastId }}):</strong>
<pre>{{ JSON.stringify(latestMessage, null, 2) }}</pre>
</div>
<!-- 手动重连按钮(仅用于调试) -->
<button
v-if="status !== 'connected'"
@click="forceReconnect"
style="margin-top: 10px; padding: 6px 12px;"
>
强制重连
</button>
</div>
</template>
<script>
import { FetchSSEClient } from '@/utils/FetchSSEClient';
// 状态映射
const STATUS_MAP = {
connecting: { text: '连接中...', color: '#ffa500' },
connected: { text: '已连接', color: '#00aa00' },
disconnected: { text: '连接中断,正在重连...', color: '#ff6600' },
closed: { text: '已关闭', color: '#888' },
};
export default {
name: 'AutoReconnectSSE',
data() {
return {
status: 'connecting',
latestMessage: null,
lastId: null,
sseClient: null,
};
},
computed: {
statusText() {
return STATUS_MAP[this.status]?.text || '未知';
},
statusColor() {
return STATUS_MAP[this.status]?.color || '#000';
}
},
created() {
this.initSSE();
},
beforeDestroy() {
// 清理 防止内存泄漏和重复连接
if (this.sseClient) {
this.sseClient.close();
}
},
methods: {
initSSE() {
this.sseClient = new FetchSSEClient('http://localhost:3000/api/events', {
onMessage: (data, id) => {
this.latestMessage = data;
this.lastId = id;
},
onError: (error) => {
console.error('SSE 错误:', error);
},
onStatus: (status) => {
// 自动同步状态到 Vue 响应式数据
this.status = status;
}
});
this.sseClient.start(); // 启动自动连接
},
forceReconnect() {
if (this.sseClient) {
this.sseClient.close();
}
this.status = 'connecting';
this.initSSE();
}
}
};
</script>
node.js模拟后端服务
// server.js
const express = require('express');
const cors = require('cors');
const app = express();
// 启用 CORS,允许前端跨域请求,并支持自定义 Header(如 Last-Event-ID)
app.use(cors({
origin: 'http://localhost:8080', // Vue DevServer 默认地址
credentials: true,
// 暴露 Last-Event-ID 给前端(虽然本例中前端不读响应头,但良好实践)
exposedHeaders: ['Last-Event-ID'],
}));
// 模拟一个全局消息队列(生产环境应替换为 Redis、Kafka 或数据库)
let globalMessageId = 0;
const messageHistory = []; // 保存最近的消息,用于断点续传
/**
* 生成一条新消息并存入历史记录
* @returns {{id: string, data: object}} 新消息对象
*/
function generateMessage() {
globalMessageId++;
const msg = {
id: String(globalMessageId), // SSE 要求 id 是字符串
data: {
timestamp: new Date().toISOString(),
value: Math.floor(Math.random() * 100),
messageId: globalMessageId,
}
};
messageHistory.push(msg);
// 限制内存占用:只保留最近 100 条
if (messageHistory.length > 100) messageHistory.shift();
return msg;
}
// 初始化几条消息,确保客户端首次连接就有数据
for (let i = 0; i < 5; i++) generateMessage();
/**
* SSE 流式接口:/api/events
* 支持标准 SSE 格式 + Last-Event-ID 断点续传
*/
app.get('/api/events', (req, res) => {
// 从请求头中读取客户端上次收到的最后消息 ID
const lastId = req.headers['last-event-id'] || null;
console.log(`[SSE] 客户端重连,Last-Event-ID: ${lastId}`);
// 设置响应头 —— 这是 SSE 的核心!
res.writeHead(200, {
'Content-Type': 'text/event-stream', // 告诉浏览器这是 SSE 流
'Cache-Control': 'no-cache', // 禁用缓存
'Connection': 'keep-alive', // 保持 TCP 连接
'Access-Control-Allow-Origin': '*', // 允许跨域(生产环境应指定域名)
'X-Accel-Buffering': 'no', // 关键!禁用 Nginx/Apache 缓冲
});
// 确定从哪条消息开始推送(实现断点续传)
let startIndex = 0;
if (lastId) {
// 在历史消息中查找 lastId 的位置
const lastIdx = messageHistory.findIndex(m => m.id === lastId);
if (lastIdx !== -1) {
startIndex = lastIdx + 1; // 从下一条开始发送,避免重复
}
// 如果 lastId 太旧(不在 history 中),则从最新或开头发(按业务需求调整)
}
// 推送缺失的历史消息(确保不丢消息)
for (let i = startIndex; i < messageHistory.length; i++) {
const msg = messageHistory[i];
// SSE 标准格式:每条消息以 "id:" 和 "data:" 开头,空行结束
res.write(`id: ${msg.id}\n`);
res.write(`data: ${JSON.stringify(msg.data)}\n\n`);
}
// 启动定时器,持续生成并推送新消息
const interval = setInterval(() => {
const msg = generateMessage();
res.write(`id: ${msg.id}\n`);
res.write(`data: ${JSON.stringify(msg.data)}\n\n`);
// 注意:res.write() 不会自动 flush,但 Node.js 通常会及时发送
}, 2000); // 每 2 秒推送一次
// 监听客户端断开连接(如关闭标签页、网络中断)
req.on('close', () => {
console.log('[SSE] 客户端断开连接,清理定时器');
clearInterval(interval); // 停止生成新消息
res.end(); // 显式关闭响应
});
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`SSE 服务已启动,请访问 http://localhost:${PORT}`);
});
根据实际对接接口以及需求来参考进行开发
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)