SSE http

2025年前的mcp协议规范中,通信被强制拆分为两个独立的通道(双断点模式)

客户端发送:通过post请求发送到/message端点。

服务器推送:客户端必须先建立一个get请求到/sse端点,服务器通过这个长连接不断推送事件。

情景:给某人打电话(post),为了听到回复,同步开着另外一条视频通话(sse)

双车道”。先开一条路专门听服务器说话(SSE),然后再开一条路给服务器发指令(POST)。

// --- 前端代码 (SSE 模式) ---

// 1. 【监听通道】先建立长连接,专门用来接收服务器的推送
const eventSource = new EventSource('/mcp/sse'); 

eventSource.onmessage = (event) => {
    // 接收流式数据
    const data = JSON.parse(event.data);
    console.log('收到 AI 回复:', data.content);
};

// 2. 【发送通道】通过另一个接口发送请求
// 注意:这里发过去,但拿不到返回值,返回值会走上面的 eventSource
fetch('/mcp/message', {
    method: 'POST',
    body: JSON.stringify({ prompt: "你好" })
});

Streameable http

统一端点:所有通信(发送请求,接收流式响应)通过同一端点。

按需流式:客户端发送POST请求,并在Header中声明Accept:text/event-stream。服务器根据情况选择返回JSON(块)或者建立SSE流。

情景:微信聊天。发送消息,回复直接就在这条消息的线程里回来,不需要额外建立连接。

单行道”。发请求的地方就是收回复的地方。一次 fetch 搞定所有事。

// --- 前端代码 (Streamable HTTP 模式) ---

// 【统一通道】发送 POST 请求,并告诉服务器“我要流式回复”
const response = await fetch('/mcp', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'Accept': 'text/event-stream' // 关键:申请流式响应
    },
    body: JSON.stringify({ prompt: "你好" })
});

// 直接从同一个请求的响应体中读取流
const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    console.log('收到 AI 回复:', text);
}
维度 传统 HTTP + SSE (旧) Streamable HTTP (新) 优势
端点结构 双端点 (/message + /sse) 单端点 (/mcp) 简化了 API 设计,减少了配置错误。
连接模式 强制长连接 (SSE 通道必须一直开着) 按需连接 (请求结束即断开,或升级为流) 节省服务器资源,避免“连接泄露”。
防火墙兼容性 差 (企业防火墙常切断长连接) 优 (伪装成普通 HTTP 请求) 在公司内网或复杂网络环境下更稳定。
并发性能 低 (高并发下 TCP 连接数爆炸) 高 (支持 HTTP/2 多路复用,连接复用) 能支撑数千个并发用户,不会拖垮服务器。
断线恢复 困难 (连接断开需重连 SSE 通道) 支持 (基于 Last-Event-ID 和 Session ID) 网络波动时能自动续传,体验更丝滑。
状态管理 依赖连接保持状态 依赖 Mcp-Session-Id 支持无状态服务器,易于水平扩展(加机器)。

Streameable http流式响应

服务端立马不关闭;

持续通过Connection:keep-alive保持TCP连接;

分块发送数据(Transfer-Encoding:chunked);

客户端通过response.body.getReader()分块读取;

const response = await fetch('/stream');
const reader = response.body.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break; // 连接关闭
  console.log('收到数据块:', value); // Uint8Array
}

一旦reader.read()非正常中断或报错重新发起fetch请求,HTTP是无状态的,不能恢复一个已经断开的TCP连接。

中断:网络丢失(网络切换或丢失),服务端重启或者崩溃或其他的原因导致流式数据未完成而中断;

重连:中断后重新发起全新的请求;

重连的两种机制(断点续传,指数退避)

断点续传-Last-Event-ID

SSE协议的标准做法,使用Streameable也可以使用这种进行重连。

前端:记录每条消息也就是每条消息块的ID,重连时通过ID进行请求头发送。

GET /events HTTP/1.1
Last-Event-ID: abc123

后端:读取Last-Event-ID头,从该ID后的消息开始推送,避免重复或丢失消息。

指数退避-Exponential Backoff

在中断发起重连时,通过时间间隔进行重连比如 (第一次/1s 第二次/2s 第三次/4s 第四次/8s)成指数的形式时间间隔避免时间间隔太短带给服务器压力,设置最大时间不超过30s

delay = min( base * 2^retryCount + random_jitter, max_delay )

Streameable-代码例子

前端fetchServe.js

// src/utils/fetchServe.js

/**
 * 增强版 SSE 客户端(基于 fetch)
 * 特性:
 * - 自动检测中断并重连(指数退避)
 * - 支持 Last-Event-ID 断点续传
 * - 提供 onStatus 回调(connecting / connected / disconnected)
 * - 完全兼容 Vue 2 生命周期
 */
export class FetchSSEClient {
  constructor(url, { onMessage, onError, onStatus }) {
    this.url = url;
    this.onMessage = onMessage || (() => {});
    this.onError = onError || console.error;
    this.onStatus = onStatus || (() => {});

    this.lastEventId = null;
    this.abortController = null;
    this.reconnectTimer = null;
    this.retryCount = 0;
    this.isClosing = false;
    this.isConnected = false;
  }

  start() {
    if (this.isClosing) return;
    this.connect();
  }

  async connect() {
    this.cleanup();

    // 更新状态:正在连接
    this.updateStatus('connecting');

    const controller = new AbortController();
    this.abortController = controller;

    try {
      const headers = {};
      if (this.lastEventId) {
        headers['Last-Event-ID'] = this.lastEventId;
      }

      const response = await fetch(this.url, {
        method: 'GET',
        headers: {
          Accept: 'text/event-stream',
          Connection: 'keep-alive',
          ...headers,
        },
        signal: controller.signal,
      });

      if (!response.ok || !response.body) {
        throw new Error(`HTTP ${response.status}`);
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      this.retryCount = 0;
      this.isConnected = true;
      this.updateStatus('connected'); // 连接成功!

      while (true) {
        const { done, value } = await reader.read();

        if (done) {
          // 服务端主动关闭(如 Nginx 超时、后端重启)
          console.warn('[SSE] 服务端关闭了连接');
          this.handleDisconnect();
          break;
        }

        if (value?.length) {
          buffer += decoder.decode(value, { stream: true });
          const lines = buffer.split('\n');
          buffer = lines.pop() || '';

          let currentId = '';
          let currentData = '';

          for (const line of lines) {
            const trimmed = line.trim();
            if (trimmed.startsWith('id:')) {
              currentId = trimmed.slice(3).trim();
            } else if (trimmed.startsWith('data:')) {
              currentData = trimmed.slice(5).trim();
            } else if (trimmed === '') {
              if (currentData) {
                try {
                  const parsed = JSON.parse(currentData);
                  if (currentId) this.lastEventId = currentId;
                  this.onMessage(parsed, currentId);
                } catch (e) {
                  console.warn('SSE 解析失败:', currentData);
                }
                currentData = '';
                currentId = '';
              }
            }
          }
        }
      }
    } catch (err) {
      if (controller.signal.aborted) return; // 正常关闭

      console.error('[SSE] 连接异常:', err.message);
      this.onError(err);
      this.handleDisconnect();
    }
  }

  /**
   * 统一处理连接断开逻辑
   */
  handleDisconnect() {
    this.isConnected = false;
    this.updateStatus('disconnected');
    this.scheduleReconnect();
  }

  /**
   * 调度自动重连(指数退避 + 随机抖动)
   */
  scheduleReconnect() {
    if (this.isClosing || this.reconnectTimer) return;

    const baseDelay = 1000;
    const maxDelay = 30000;
    const delay = Math.min(
      baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
      maxDelay
    );

    this.reconnectTimer = setTimeout(() => {
      console.log(`[SSE] 自动重连中... 第 ${this.retryCount + 1} 次尝试`);
      this.reconnectTimer = null;
      this.retryCount++;
      this.start();
    }, delay);
  }

  updateStatus(status) {
    this.onStatus(status); // 通知外部(如 Vue 组件更新 UI)
  }

  close() {
    this.isClosing = true;
    this.cleanup();
    this.updateStatus('closed');
  }

  cleanup() {
    if (this.abortController) {
      this.abortController.abort();
      this.abortController = null;
    }
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }
    this.retryCount = 0;
  }
}

vue2组件EventSource.vue

<!-- src/components/AutoReconnectSSE.vue -->
<template>
  <div style="padding: 20px; font-family: Arial, sans-serif;">
    <h2>自动重连 SSE 示例</h2>
    
    <!-- 状态指示器 -->
    <div :style="{ color: statusColor }">
      ● 状态: {{ statusText }}
    </div>

    <!-- 最新消息 -->
    <div v-if="latestMessage" style="margin-top: 15px; padding: 10px; background: #f9f9f9;">
      <strong>最新消息 (ID: {{ lastId }}):</strong>
      <pre>{{ JSON.stringify(latestMessage, null, 2) }}</pre>
    </div>

    <!-- 手动重连按钮(仅用于调试) -->
    <button 
      v-if="status !== 'connected'" 
      @click="forceReconnect"
      style="margin-top: 10px; padding: 6px 12px;"
    >
      强制重连
    </button>
  </div>
</template>

<script>
import { FetchSSEClient } from '@/utils/FetchSSEClient';

// 状态映射
const STATUS_MAP = {
  connecting: { text: '连接中...', color: '#ffa500' },
  connected: { text: '已连接', color: '#00aa00' },
  disconnected: { text: '连接中断,正在重连...', color: '#ff6600' },
  closed: { text: '已关闭', color: '#888' },
};

export default {
  name: 'AutoReconnectSSE',
  data() {
    return {
      status: 'connecting',
      latestMessage: null,
      lastId: null,
      sseClient: null,
    };
  },

  computed: {
    statusText() {
      return STATUS_MAP[this.status]?.text || '未知';
    },
    statusColor() {
      return STATUS_MAP[this.status]?.color || '#000';
    }
  },

  created() {
    this.initSSE();
  },

  beforeDestroy() {
    // 清理 防止内存泄漏和重复连接
    if (this.sseClient) {
      this.sseClient.close();
    }
  },

  methods: {
    initSSE() {
      this.sseClient = new FetchSSEClient('http://localhost:3000/api/events', {
        onMessage: (data, id) => {
          this.latestMessage = data;
          this.lastId = id;
        },
        onError: (error) => {
          console.error('SSE 错误:', error);
        },
        onStatus: (status) => {
          // 自动同步状态到 Vue 响应式数据
          this.status = status;
        }
      });

      this.sseClient.start(); // 启动自动连接
    },

    forceReconnect() {
      if (this.sseClient) {
        this.sseClient.close();
      }
      this.status = 'connecting';
      this.initSSE();
    }
  }
};
</script>

node.js模拟后端服务

// server.js
const express = require('express');
const cors = require('cors');

const app = express();

// 启用 CORS,允许前端跨域请求,并支持自定义 Header(如 Last-Event-ID)
app.use(cors({
  origin: 'http://localhost:8080', // Vue DevServer 默认地址
  credentials: true,
  // 暴露 Last-Event-ID 给前端(虽然本例中前端不读响应头,但良好实践)
  exposedHeaders: ['Last-Event-ID'],
}));

// 模拟一个全局消息队列(生产环境应替换为 Redis、Kafka 或数据库)
let globalMessageId = 0;
const messageHistory = []; // 保存最近的消息,用于断点续传

/**
 * 生成一条新消息并存入历史记录
 * @returns {{id: string, data: object}} 新消息对象
 */
function generateMessage() {
  globalMessageId++;
  const msg = {
    id: String(globalMessageId), // SSE 要求 id 是字符串
    data: {
      timestamp: new Date().toISOString(),
      value: Math.floor(Math.random() * 100),
      messageId: globalMessageId,
    }
  };
  messageHistory.push(msg);
  // 限制内存占用:只保留最近 100 条
  if (messageHistory.length > 100) messageHistory.shift();
  return msg;
}

// 初始化几条消息,确保客户端首次连接就有数据
for (let i = 0; i < 5; i++) generateMessage();

/**
 * SSE 流式接口:/api/events
 * 支持标准 SSE 格式 + Last-Event-ID 断点续传
 */
app.get('/api/events', (req, res) => {
  // 从请求头中读取客户端上次收到的最后消息 ID
  const lastId = req.headers['last-event-id'] || null;
  console.log(`[SSE] 客户端重连,Last-Event-ID: ${lastId}`);

  // 设置响应头 —— 这是 SSE 的核心!
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',     // 告诉浏览器这是 SSE 流
    'Cache-Control': 'no-cache',             // 禁用缓存
    'Connection': 'keep-alive',              // 保持 TCP 连接
    'Access-Control-Allow-Origin': '*',      // 允许跨域(生产环境应指定域名)
    'X-Accel-Buffering': 'no',               // 关键!禁用 Nginx/Apache 缓冲
  });

  // 确定从哪条消息开始推送(实现断点续传)
  let startIndex = 0;
  if (lastId) {
    // 在历史消息中查找 lastId 的位置
    const lastIdx = messageHistory.findIndex(m => m.id === lastId);
    if (lastIdx !== -1) {
      startIndex = lastIdx + 1; // 从下一条开始发送,避免重复
    }
    // 如果 lastId 太旧(不在 history 中),则从最新或开头发(按业务需求调整)
  }

  // 推送缺失的历史消息(确保不丢消息)
  for (let i = startIndex; i < messageHistory.length; i++) {
    const msg = messageHistory[i];
    // SSE 标准格式:每条消息以 "id:" 和 "data:" 开头,空行结束
    res.write(`id: ${msg.id}\n`);
    res.write(`data: ${JSON.stringify(msg.data)}\n\n`);
  }

  // 启动定时器,持续生成并推送新消息
  const interval = setInterval(() => {
    const msg = generateMessage();
    res.write(`id: ${msg.id}\n`);
    res.write(`data: ${JSON.stringify(msg.data)}\n\n`);
    // 注意:res.write() 不会自动 flush,但 Node.js 通常会及时发送
  }, 2000); // 每 2 秒推送一次

  // 监听客户端断开连接(如关闭标签页、网络中断)
  req.on('close', () => {
    console.log('[SSE] 客户端断开连接,清理定时器');
    clearInterval(interval); // 停止生成新消息
    res.end();              // 显式关闭响应
  });
});

// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`SSE 服务已启动,请访问 http://localhost:${PORT}`);
});

根据实际对接接口以及需求来参考进行开发

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐