AI工具集:服务器端基于云端AI模型使用SSE封装自定义MCP服务

背景

购买 Coding Plan 除了能替代 AI编辑器 内置 Agent 处理代码问题,还可以封装自定义 MCP 服务。在 技术方案 / 代码修改 / 单元测试 / Code Review时候进行审核互补,毕竟单一模型在不同能力的侧重上有所差异。搭建服务器常驻 MCP服务 便于 公司 / 家 / 朋友 调用的方案变得很有意义

本文将一步步带你配置服务器版本 MCP 服务解决多用户设备/多场地同时的使用问题,也可以学习自建MCP服务的过程和注意事项

资源应用介绍

  1. Agent:讯飞星辰 MaaS · Astron Coding Plan
  2. 编辑器:Trae(其他编辑器类同)
  3. 服务器:宝塔面板Node搭建SSE通信(Linux面板11.7.0)

1. Agent套餐

Coding Plan是专为开发者打造的高性能 AI 算力订阅服务,可一站式调用顶流大模型,全面提效代码生产。

1.1 购买套餐

最最最主要是便宜,首月3.9元不限量 https://maas.xfyun.cn/packageSubscription

在这里插入图片描述

1.2 可用模型

在这里插入图片描述

2. 搭建MCP服务

2.1 上传文件

新建一个目录,上传【package.json】【sse.js】

在这里插入图片描述

2.1.1 package.json
{
  "name": "agentmcp",
  "version": "1.0.0",
  "description": "MCP 服务,调用云端 AI 模型",
  "main": "index.js",
  "scripts": {
    "start:sse": "node sse.js"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.29.0",
    "express": "^4.18.2",
    "zod": "^4.4.3"
  }
}

2.1.2 sse.js

sse.js 是 MCP (Model Context Protocol) 服务的远程部署版本 ,使用 SSE (Server-Sent Events) 协议实现 HTTP 远程访问。本地使用 stdio 模式,和本文不一致

替换自己的 API_KEY,若是其他平台更换 url地址 和模型ID

#!/usr/bin/env node

const { McpServer } = require('@modelcontextprotocol/sdk/server/mcp.js');
const { SSEServerTransport } = require('@modelcontextprotocol/sdk/server/sse.js');
const express = require('express');
const { z } = require('zod');

/**
 * 调用云端 AI 模型进行对话或代码生成
 * @param {Array<{role: 'user'|'assistant', content: string}>} messages - 对话消息数组
 * @param {Object} [options={}] - 配置选项
 * @param {string} [options.model='astron-code-latest'] - 模型名称
 * @param {'openai'|'anthropic'} [options.provider='openai'] - API 提供商类型
 * @returns {Promise<{content: Array<{type: string, text: string}>}>} AI 响应结果
 * @throws {Error} 当 API 请求失败或返回非 JSON 格式时抛出错误
 */
async function callAI(messages, options = {}) {
  const maxTokens = 131072; // 128k - 模型限制
  const OPENAI_URL = 'https://maas-coding-api.cn-huabei-1.xf-yun.com/v2/chat/completions';
  const ANTHROPIC_URL = 'https://maas-coding-api.cn-huabei-1.xf-yun.com/anthropic/v1/messages';
  const MODEL_ID = 'astron-code-latest';
  const API_KEY = 'xxxxxxxx'; // 替换自己的key

  const { model = MODEL_ID, provider = 'openai' } = options;
  const apiUrl = provider === 'anthropic' ? ANTHROPIC_URL : OPENAI_URL;
  const isAnthropic = provider === 'anthropic';
  const body = isAnthropic
    ? { model, max_tokens: maxTokens, messages: messages.map(m => ({ role: m.role, content: m.content })) }
    : { model, messages: messages.map(m => ({ role: m.role, content: m.content })), max_tokens: maxTokens };

  const res = await fetch(apiUrl, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${API_KEY}`, 'X-Api-Key': API_KEY, 'X-Language': 'zh-CN', ...(isAnthropic ? { 'anthropic-version': '2023-06-01' } : {}) },
    body: JSON.stringify(body)
  });

  if (!res.ok) {
    const errText = await res.text();
    throw new Error(`API ${res.status}: ${errText.slice(0, 500)}`);
  }

  const contentType = res.headers.get('content-type') || '';
  if (!contentType.includes('application/json')) {
    const errText = await res.text();
    throw new Error(`API返回非JSON格式(${contentType}): ${errText.slice(0, 200)}`);
  }

  const data = await res.json();

  if (isAnthropic) return { content: [{ type: 'text', text: data.content?.[0]?.text || '' }] };
  return { content: [{ type: 'text', text: data.choices?.[0]?.message?.content || data.choices?.[0]?.message?.reasoning_content || JSON.stringify(data) }] };
}

/**
 * 创建 MCP 服务器实例
 * @description 定义服务名称和版本号,用于与客户端建立通信
 */
const server = new McpServer({
  name: 'agentmcp',
  version: '1.0.0'
});

/**
 * 注册 'chat' 工具
 * @description 提供调用云端 AI 模型进行对话或代码生成的能力
 * 支持 OpenAI 和 Anthropic 兼容接口
 * @param {messages} 对话消息数组,支持 user/assistant 角色
 * @param {model} [可选] 指定使用的模型名称(默认 astron-code-latest)
 * @param {provider} [可选] API 提供商类型(openai 或 anthropic)
 */
server.tool('chat', '调用云端 AI 模型进行对话或代码生成,支持 OpenAI 和 Anthropic 兼容接口', {
  messages: z.array(z.object({
    role: z.enum(['user', 'assistant']),
    content: z.string()
  })),
  model: z.string().optional().describe('模型名称,默认 astron-code-latest'),
  provider: z.enum(['openai', 'anthropic']).optional().describe('API 提供商,默认 openai')
}, async ({ messages, model, provider }) => {
  try {
    return await callAI(messages, { model, provider });
  } catch (err) {
    return { content: [{ type: 'text', text: `[ERROR] ${err.message}` }], isError: true };
  }
});

/**
 * Transport 会话存储
 * @description 使用 Map 存储多个 SSE 连接的 transport 实例
 * 每个连接有唯一的 sessionId,支持多个客户端同时连接
 * 当连接关闭时自动清理对应的 transport,防止内存泄漏
 */
const transports = new Map();

/**
 * 初始化 Express 应用并配置路由
 * @description 创建 HTTP 服务,配置以下端点:
 * - GET /:健康检查端点,返回服务状态信息
 * - GET /sse:建立 SSE 长连接,用于服务端向客户端推送数据
 * - POST /message?sessionId=<id>:接收客户端发送的消息请求
 * @param {number} PORT - 监听端口,默认从环境变量读取,fallback 到 5067
 * @returns {void}
 */
async function main() {
  const app = express();
  const PORT = process.env.PORT || 5067;

  /**
   * 配置 Express 中间件
   * @description 启用 JSON 请求体解析,限制请求大小防止内存溢出
   * POST /message 端点依赖此中间件来解析 MCP 协议消息
   */
  app.use(express.json({ limit: '10mb' }));

  /**
   * 健康检查端点
   * @route GET /
   * @description 返回服务基本信息和运行状态
   * 用于监控服务是否正常运行,以及确认部署成功
   */
  app.get('/', (req, res) => {
    res.json({
      status: 'ok',
      service: 'agentmcp',
      version: '1.0.0',
      mode: 'sse',
      activeConnections: transports.size,
      endpoints: {
        sse: '/sse',
        message: '/message?sessionId=<id>'
      },
      timestamp: new Date().toISOString()
    });
  });

  /**
   * SSE 端点 - 建立长连接
   * @route GET /sse
   * @description 客户端通过此端点建立 Server-Sent Events 连接
   *
   * 工作流程:
   * 1. 接收客户端的 GET 请求
   * 2. 创建 SSEServerTransport 实例,绑定到当前响应对象
   * 3. 将 transport 存储到 transports Map(key=sessionId)
   * 4. 设置连接关闭时的清理回调
   * 5. 将 transport 连接到 MCP server,开始监听事件
   * 6. 保持长连接开放,等待服务端推送数据
   *
   * 支持反向代理部署(如 Nginx),需确保代理配置了正确的 SSE 头
   */
  app.get('/sse', async (req, res) => {
    console.log('[SSE] 新客户端连接:', req.ip, req.headers['user-agent']);
    try {
      /**
       * 创建 SSE Server Transport 实例
       * @param {string} '/message' - POST 消息端点路径
       * @param {Response} res - Express 响应对象,用于建立 SSE 流
       */
      const transport = new SSEServerTransport('/message', res);

      /**
       * 存储 transport 到 Map
       * @description 使用 sessionId 作为 key,支持多客户端并发连接
       * 每个客户端有独立的 session,互不干扰
       */
      transports.set(transport.sessionId, transport);
      console.log(`[SSE] Transport 已创建,sessionId: ${transport.sessionId}`);

      /**
       * 注册连接关闭清理回调
       * @description 当客户端断开连接时:
       * 1. 从 transports Map 中移除对应 transport
       * 2. 释放相关资源,防止内存泄漏
       * 3. 输出日志便于监控和调试
       */
      res.on('close', () => {
        console.log(`[SSE] 客户端断开连接,sessionId: ${transport.sessionId}`);
        transports.delete(transport.sessionId);
      });

      /**
       * 连接到 MCP Server
       * @description 将 transport 连接到 server 后,
       * server 可以通过此 transport 向客户端推送事件和响应
       */
      await server.connect(transport);
      console.log('[SSE] 客户端连接成功');
    } catch (err) {
      console.error('[SSE] 连接失败:', err.message);
      if (!res.headersSent) {
        res.status(500).json({ error: 'SSE connection failed', message: err.message });
      }
    }
  });

  /**
   * 消息处理端点 - 接收客户端请求
   * @route POST /message?sessionId=<id>
   * @description 客户端通过此端点发送工具调用等请求
   *
   * 工作流程:
   * 1. 从查询参数获取 sessionId(必需)
   * 2. 从 transports Map 中查找对应的 transport
   * 3. 验证 transport 类型是否为 SSEServerTransport
   * 4. 将消息转发给 transport 处理
   * 5. Transport 将消息传递给 MCP server 执行对应工具
   * 6. 执行结果通过之前建立的 SSE 连接异步返回给客户端
   *
   * 注意:此端点必须与 /sse 端点配对使用
   * 客户端需要先建立 SSE 连接(GET /sse),获得 sessionId 后才能发送消息
   *
   * @query {string} sessionId - 从 GET /sse 响应中获得的会话 ID
   */
  app.post('/message', async (req, res) => {
    console.log('[Message] 收到请求');

    try {
      /**
       * 获取 sessionId
       * @description 从 URL 查询参数中提取 sessionId
       * 这是 MCP 协议规定的标准方式,用于关联 SSE 连接和消息通道
       */
      const sessionId = req.query.sessionId;

      if (!sessionId) {
        throw new Error('Missing required query parameter: sessionId');
      }

      /**
       * 查找对应的 transport
       * @description 从 transports Map 中根据 sessionId 获取 transport 实例
       * 如果找不到,说明客户端未先建立 SSE 连接或连接已过期
       */
      const transport = transports.get(sessionId);

      if (!transport) {
        throw new Error(`No active SSE connection for sessionId: ${sessionId}. Active sessions: ${Array.from(transports.keys()).join(', ')}`);
      }

      /**
       * 验证 transport 类型
       * @description 确保 transport 是 SSEServerTransport 实例
       * 防止类型混淆导致的安全问题或错误行为
       */
      if (!(transport instanceof SSEServerTransport)) {
        throw new Error('Invalid transport type for this session');
      }

      /**
       * 处理消息
       * @description 将请求转发给 SSEServerTransport 处理
       * transport 内部会解析 MCP 协议消息并路由到正确的处理器
       * 处理结果会自动通过 SSE 连接返回给客户端
       */
      await transport.handlePostMessage(req, res, req.body);
      console.log('[Message] 消息处理成功');
    } catch (err) {
      console.error('[Message] 处理失败:', err.message);
      if (!res.headersSent) {
        res.status(500).json({
          error: 'Message handling failed',
          message: err.message,
          hint: '请确保已先建立 SSE 连接(GET /sse)并获得有效的 sessionId'
        });
      }
    }
  });

  /**
   * 全局错误处理中间件
   * @description 捕获所有未处理的异常,返回统一格式的错误响应
   * 防止服务因未捕获的异常而崩溃
   */
  app.use((err, req, res, next) => {
    console.error('[Error]', err.stack);
    res.status(500).json({
      error: 'Internal Server Error',
      message: err.message
    });
  });

  /**
   * 启动 HTTP 服务器监听
   * @description 在指定端口启动 Express 服务,输出运行状态信息
   * 启动后可通过 http://host:port/sse 访问 MCP 服务
   */
  app.listen(PORT, () => {
    console.log(`========================================`);
    console.log(`MCP Server running in SSE mode`);
    console.log(`Local URL: http://localhost:${PORT}`);
    console.log(`SSE endpoint: http://localhost:${PORT}/sse`);
    console.log(`Message endpoint: http://localhost:${PORT}/message`);
    console.log(`Health check: http://localhost:${PORT}/`);
    console.log(`========================================`);
  });
}

/**
 * 启动 MCP SSE 服务器
 * @description 调用 main 函数启动服务,捕获并输出异常信息到控制台
 * 异常通常包括端口占用、网络配置等问题
 */
main().catch(console.error);

/**
 * 优雅关闭处理
 * @description 监听 SIGINT 信号(Ctrl+C),确保:
 * 1. 关闭所有活跃的 SSE 连接
 * 2. 清空 transports Map
 * 3. 释放 MCP 服务器资源
 * 4. 安全退出进程,避免资源泄漏
 */
process.on('SIGINT', async () => {
  console.log('\n正在关闭 MCP 服务器...');
  console.log(`[Shutdown] 当前活跃连接数: ${transports.size}`);

  /**
   * 关闭所有活跃的 transport
   * @description 遍历 transports Map,逐个关闭每个 SSE 连接
   * 确保所有客户端都能收到连接关闭通知
   */
  for (const [sessionId, transport] of transports) {
    try {
      console.log(`[Shutdown] 正在关闭连接: ${sessionId}`);
      await transport.close();
      transports.delete(sessionId);
    } catch (err) {
      console.error(`[Shutdown] 关闭连接 ${sessionId} 失败:`, err.message);
    }
  }

  await server.close();
  console.log('[Shutdown] MCP 服务器已安全关闭');
  process.exit(0);
});

/**
 * 进程异常处理
 * @description 捕获未处理的 Promise 拒绝,防止进程意外崩溃
 * 输出详细的错误堆栈信息便于排查问题
 */
process.on('unhandledRejection', (reason, promise) => {
  console.error('未处理的 Promise 拒绝:', reason);
});

2.2 配置服务器端

2.2.1 创建node服务

在这里插入图片描述

2.2.2 添加域名(非必须)

不配置可以使用ip调用也行

在这里插入图片描述

2.2.3 配置nginx

在这里插入图片描述

# ========== 核心修复:MCP SSE 服务代理配置 ==========
    location / {
        proxy_pass http://127.0.0.1:5067;
        
        # 基础 Header 设置(只设置一次,避免重复)
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 【关键】请求体大小限制 - MCP 消息可能很大
        client_max_body_size 50m;
        
        # 【关键】SSE 必须配置:禁用所有缓冲
        proxy_buffering off;
        proxy_cache off;
        proxy_request_buffering off;
        
        # 超时设置(SSE 需要长连接)
        proxy_connect_timeout 60s;
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
        
        # HTTP/1.1 长连接支持
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        
        # WebSocket 升级支持(如果需要)
        proxy_set_header Upgrade $http_upgrade;
        
        # ⚠️ 删除这行:不要强制设置 Content-Type!
        # proxy_set_header Content-Type 'application/json';
        
        # 支持分块传输编码
        chunked_transfer_encoding on;
        
        # 禁用对响应的压缩(SSE 不需要)
        gzip off;
        proxy_set_header Accept-Encoding '';
    }

3. 编辑器调用MCP

在这里插入图片描述

JSON 数据
{
  "mcpServers": {
    "agentmcp": {
      "url": "http://xxxx.com/sse"
    }
  }
}

4. 测试功能

发送一个对话,测试mcp服务功能。恭喜搭建完毕,可以在公司 和 在家 调用。

在这里插入图片描述

5. 整体通信流程图

宝塔 Node.js 应用 (单进程服务) Trae 客户端 (MCP Client) 宝塔 Node.js 应用 (单进程服务) Trae 客户端 (MCP Client) 包含三个模块: HTTP 服务器, MCP Server, SSE Transport 阶段 1: 建立 SSE 长连接 阶段 2: 发送工具调用请求 alt [执行成功] [执行失败] 阶段 3: 连接关闭清理 GET /sse 1 生成唯一 sessionId (UUID/时间戳) 2 创建 SSEServerTransport 3 存储 transport 到 transports Map (key: sessionId) 4 transport 连接到 MCP server 5 建立 SSE 连接 (发送 SSE 事件流) 6 POST /message?sessionId=<id> (工具调用请求) 7 通过 transport.handlePostMessage 处理消息 8 转发消息至 MCP server 9 执行工具逻辑 10 返回成功结果 11 SSE 推送成功事件 (异步返回结果) 12 抛出错误 13 错误处理/日志记录 14 SSE 推送错误事件 15 断开连接/超时 16 从 transports Map 删除 (key: sessionId) 17 断开 MCP 连接 18
设计决策 原因 优势
SSE 双通道设计 HTTP 单向限制 服务端可主动推送,无需 WebSocket Map
存储会话 多客户端并发 支持多用户同时连接,互不干扰
Zod 参数校验 类型安全 早失败、自动文档、防注入
三层错误防护 防御性编程 sessionId → session有效性 → 类型验证
优雅关闭 资源管理 无泄漏、数据完整、日志可追踪
分离关注点 可维护性 AI 调用 / MCP 协议 / HTTP 各司其职

6. 方案锁定原因

为什么需要 MCP Server?

MCP 协议要求:
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │ ←→ │   Transport  │ ←→ │   Server     │
│  (Trae IDE) │     │  (SSE/stdio)│     │  (McpServer) │
└─────────────┘     └─────────────┘     └─────────────┘

McpServer 的职责:
1. 定义服务能力(工具、资源、提示词)
2. 处理 JSON-RPC 协议消息
3. 管理会话状态
4. 路由工具调用到对应的 handler

为什么需要 SSE 而不是普通 HTTP?

❌ 普通 HTTP 请求-响应模式:
   Client ──request──> Server ──response──> Client (结束)
   
   问题: 服务器无法主动向客户端推送数据!

✅ SSE (Server-Sent Events):
   Client ──GET /sse──> Server ──保持连接── Client
                          │
                          ├── push event 1 ──> Client
                          ├── push event 2 ──> Client
                          ├── push event 3 ──> Client
                          └── ...
   
   优势: 
   - 服务器可以主动推送(工具执行结果、通知等)
   - 单向通信,简单可靠
   - 自动重连(浏览器原生支持)
   - 基于 HTTP,无需 WebSocket 的复杂性

为什么需要 ?sessionId= 参数?

MCP SSE 协议的双通道设计:

通道 1: SSE 长连接 (GET /sse)
   - 方向: Server → Client
   - 用途: 推送工具执行结果、通知
   - 特点: 长时间保持

通道 2: HTTP 请求 (POST /message?sessionId=xxx)
   - 方向: Client → Server
   - 用途: 发送工具调用请求
   - 特点: 短连接,按需发起

问题: 如何关联这两个通道?
答案: sessionId!

为什么需要全局错误处理?

没有全局错误处理的后果:

场景: 某个路由抛出未捕获异常
结果: 
  ❌ Express 默认行为 -> 返回 HTML 错误页(对 API 不友好)
  ❌ 客户端收到非 JSON 响应 -> 解析失败
  ❌ 可能泄露堆栈信息(安全隐患)
  ❌ 进程可能崩溃(unhandled rejection)

有全局错误处理:
✅ 统一返回 JSON 格式错误
✅ 记录详细日志便于排查
✅ 保护敏感信息(不暴露堆栈给客户端)
✅ 保持进程稳定运行

为什么需要优雅关闭?

❌ 强制关闭 (kill -9):
   问题:
   - SSE 连接突然断开,客户端收到错误
   - 正在处理的请求丢失
   - 文件句柄、网络 socket 未释放
   - 可能导致资源泄漏

✅ 优雅关闭 (Ctrl+C / SIGINT):
   流程:
   1. 停止接受新连接
   2. 等待进行中的请求完成
   3. 通知所有客户端即将关闭
   4. 关闭所有 SSE 连接
   5. 清理所有资源
   6. 安全退出
   
   优势:
   - 客户端体验好(收到关闭通知)
   - 数据完整性(请求不会丢失)
   - 资源无泄漏
   - 日志完整(可追踪关闭原因)

7. 调用建议

在编辑器定义【个人规则】或 【项目规则】,约定自动触发时机 和 内容,避免每次还要在对话框内说明调用,比较麻烦

优先级权重:对话框 > 项目规则 > 个人规则

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐