用UML建模和完整实现来揭示MCP(Model Context Protocol)的核心机制。MCP是一个基于JSON-RPC的协议,用于AI助手(如Claude)与外部工具/数据源交互。核心原语:Resources、Tools、Prompts。关键机制:标准化协议、进程隔离(子进程,stdio通信)、工具调用流程、动态能力发现、易于扩展。

提供一个完整的项目示例,展示MCP服务器和客户端(或模拟AI调用)。代码基于Python异步I/O,展示如何实现一个MCP服务器,以及如何与客户端交互。使用UML(Mermaid)建模项目结构、类图、序列图。

将创建一个简单的MCP服务器,提供数学工具(加、减)和一个资源(当前时间),以及一个提示模板。客户端模拟AI发送tools/listtools/call等请求。

项目文件结构:


mcp_demo/
├── README.md
├── mcp_server.py      # MCP服务器实现
├── mcp_client.py      # 模拟客户端(AI)
├── mcp_protocol.py    # JSON-RPC消息处理基类
├── requirements.txt   # 依赖(无额外依赖,使用asyncio)
└── uml_diagrams.md    # Mermaid图

下面详细解释MCP核心机制,并展示代码。

注意:MCP规范可以在官方文档找到,但需要根据理解实现核心部分。重点是展示工具调用、资源获取、提示获取,以及进程隔离(服务器作为子进程,通过stdin/stdout通信)。

深入剖析 Model Context Protocol (MCP):从协议到完整实现

本文通过一个完整的 MCP 服务器+客户端 示例,深入展示 MCP 的核心机制:

  • 标准化协议:基于 JSON-RPC 2.0,定义 ResourcesToolsPrompts 三种原语。
  • 进程隔离:客户端启动服务器子进程,通过 stdio 通信,实现安全边界。
  • 工具调用流程:AI 生成 tool_use → 客户端转换为 JSON-RPC 请求 → 服务器执行 → 结果返回 AI。
  • 动态能力发现resources/listtools/list 等接口允许服务器声明其能力。
  • 易于扩展:只需实现 Tool.execute 即可增加新工具。

所有代码基于 Python asyncio,清晰展示 MCP 工作原理,可作为实际 MCP 服务器的开发模板。


1. UML 建模

1.1 组件与接口图

服务器进程 (隔离)

客户端进程

启动子进程

stdio: JSON-RPC

AI 模型 / 用户

MCP 客户端

MCP 服务器进程

JSON-RPC 消息处理器

请求分发器

工具管理器

资源管理器

提示管理器

数学工具: add

数学工具: subtract

资源: time

问候模板

1.2 类图

启动子进程

MCPServer

+init()

+add_tool(name, execute)

+add_resource(uri, handler)

+add_prompt(name, handler)

+run()

-_handle_request(request)

JSONRPCProtocol

+process_message(line)

+send_response(response)

+send_error(code, message)

Tool

+name: str

+description: str

+input_schema: dict

+execute(args)

Resource

+uri: str

+name: str

+mime_type: str

+read()

Prompt

+name: str

+description: str

+arguments: list

+render(args)

MCPClient

+init(server_script)

+start()

+call_tool(name, args)

+read_resource(uri)

+get_prompt(name, args)

+stop()

1.3 工具调用序列图

数学工具 MCP服务器 MCP客户端 AI / 用户 数学工具 MCP服务器 MCP客户端 AI / 用户 请求: 计算 3+5 启动子进程(stdio) tools/list (JSON-RPC) 工具列表(含add, subtract) tools/call {name:"add", arguments:{a:3,b:5}} execute(a=3,b=5) 结果 8 {result: 8} 返回计算结果

2. 项目文件结构

mcp_demo/
├── README.md
├── requirements.txt      # 无依赖,仅标准库
├── mcp_server.py         # MCP 服务器实现
├── mcp_client.py         # 模拟 AI 客户端
├── mcp_protocol.py       # JSON-RPC 基础协议处理
└── uml_diagrams.md       # Mermaid 图(本文档)

3. MCP 核心机制详解与代码实现

3.1 标准化协议:JSON-RPC 基础封装

MCP 使用 JSON-RPC 2.0 作为通信协议,所有消息通过换行分隔的 JSON 文本传输(每个 JSON 对象后跟一个 \n)。

mcp_protocol.py 实现消息的解析与封装:

import json
import asyncio
from typing import Any, Dict, Optional

class JSONRPCProtocol:
    """处理 JSON-RPC 消息的基类,提供读写和错误处理"""
    
    @staticmethod
    def encode_response(id: Any, result: Any) -> str:
        return json.dumps({"jsonrpc": "2.0", "id": id, "result": result}) + "\n"
    
    @staticmethod
    def encode_error(id: Any, code: int, message: str) -> str:
        return json.dumps({"jsonrpc": "2.0", "id": id, "error": {"code": code, "message": message}}) + "\n"
    
    @staticmethod
    def encode_notification(method: str, params: Optional[Dict] = None) -> str:
        msg = {"jsonrpc": "2.0", "method": method}
        if params:
            msg["params"] = params
        return json.dumps(msg) + "\n"
    
    @staticmethod
    def decode_line(line: str) -> Dict:
        return json.loads(line)

3.2 进程隔离:客户端启动服务器子进程

MCP 的安全边界通过将服务器作为子进程运行,并通过 stdin/stdout 通信来实现。服务器崩溃不会影响客户端,且客户端可以严格控制服务器权限。

mcp_client.py 中的客户端实现:

import asyncio
import subprocess
import sys
from typing import Any, Dict

class MCPClient:
    def __init__(self, server_script: str):
        self.server_script = server_script
        self.process: Optional[asyncio.subprocess.Process] = None
        self.request_id = 0
        
    async def start(self):
        """启动 MCP 服务器子进程,连接其 stdin/stdout"""
        self.process = await asyncio.create_subprocess_exec(
            sys.executable, self.server_script,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        # 启动读取任务
        asyncio.create_task(self._read_responses())
    
    async def _read_responses(self):
        """持续读取服务器输出并处理响应"""
        while True:
            line = await self.process.stdout.readline()
            if not line:
                break
            data = json.loads(line.decode())
            # 根据 id 将响应返回给等待的调用者(简化:使用全局字典存储 future)
            # 实际应维护 pending_requests
            # 此处为清晰,省略具体实现
            print(f"[Client] Received: {data}")
    
    async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """调用服务器上的工具"""
        self.request_id += 1
        request = {
            "jsonrpc": "2.0",
            "id": self.request_id,
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments
            }
        }
        await self._send_request(request)
        # 等待响应(简化:此处应 await future)
        # 返回结果...
        return None  # 简化

注意:完整实现需要维护 pending_requests 映射,此处为简洁省略。

3.3 动态能力发现:tools/list, resources/list, prompts/list

服务器启动后,客户端首先调用 initialize 握手,然后通过 tools/list 等接口发现服务器能力。服务器实现以下方法。

mcp_server.py 核心实现:

import asyncio
import sys
import json
from typing import Dict, Any, Callable, Awaitable
from mcp_protocol import JSONRPCProtocol

class MCPServer:
    def __init__(self):
        self.tools: Dict[str, Callable] = {}
        self.resources: Dict[str, Callable] = {}
        self.prompts: Dict[str, Callable] = {}
        self.protocol = JSONRPCProtocol()
    
    def add_tool(self, name: str, handler: Callable[[Dict], Awaitable[Any]]):
        self.tools[name] = handler
    
    def add_resource(self, uri: str, handler: Callable[[], Awaitable[str]]):
        self.resources[uri] = handler
    
    def add_prompt(self, name: str, handler: Callable[[Dict], Awaitable[str]]):
        self.prompts[name] = handler
    
    async def run(self):
        """从 stdin 读取请求,向 stdout 写入响应"""
        loop = asyncio.get_event_loop()
        reader = asyncio.StreamReader()
        protocol = asyncio.StreamReaderProtocol(reader)
        await loop.connect_read_pipe(lambda: protocol, sys.stdin)
        
        writer = asyncio.StreamWriter(sys.stdout, loop=loop)
        
        while True:
            line = await reader.readline()
            if not line:
                break
            try:
                request = json.loads(line.decode())
                response = await self._dispatch(request)
                if response:
                    writer.write(response.encode())
                    await writer.drain()
            except Exception as e:
                error_msg = self.protocol.encode_error(None, -32603, str(e))
                writer.write(error_msg.encode())
                await writer.drain()
    
    async def _dispatch(self, request: Dict) -> str:
        method = request.get("method")
        req_id = request.get("id")
        params = request.get("params", {})
        
        if method == "initialize":
            # 返回协议版本和服务器信息
            return self.protocol.encode_response(req_id, {
                "protocolVersion": "0.1.0",
                "serverInfo": {"name": "MCP Demo Server", "version": "1.0"}
            })
        elif method == "tools/list":
            # 返回所有工具的定义(名称、描述、输入模式)
            tools_def = []
            for name in self.tools:
                tools_def.append({
                    "name": name,
                    "description": f"Tool {name}",
                    "inputSchema": {
                        "type": "object",
                        "properties": {}
                    }
                })
            return self.protocol.encode_response(req_id, {"tools": tools_def})
        elif method == "tools/call":
            tool_name = params.get("name")
            args = params.get("arguments", {})
            if tool_name not in self.tools:
                return self.protocol.encode_error(req_id, -32601, f"Tool {tool_name} not found")
            result = await self.tools[tool_name](args)
            return self.protocol.encode_response(req_id, {"result": result})
        elif method == "resources/list":
            resources_def = [{"uri": uri, "name": uri} for uri in self.resources]
            return self.protocol.encode_response(req_id, {"resources": resources_def})
        elif method == "resources/read":
            uri = params.get("uri")
            if uri not in self.resources:
                return self.protocol.encode_error(req_id, -32601, f"Resource {uri} not found")
            content = await self.resources[uri]()
            return self.protocol.encode_response(req_id, {"contents": [{"uri": uri, "text": content}]})
        elif method == "prompts/list":
            prompts_def = [{"name": name, "description": f"Prompt {name}"} for name in self.prompts]
            return self.protocol.encode_response(req_id, {"prompts": prompts_def})
        elif method == "prompts/get":
            name = params.get("name")
            args = params.get("arguments", {})
            if name not in self.prompts:
                return self.protocol.encode_error(req_id, -32601, f"Prompt {name} not found")
            prompt_text = await self.prompts[name](args)
            return self.protocol.encode_response(req_id, {"messages": [{"role": "user", "content": prompt_text}]})
        else:
            return self.protocol.encode_error(req_id, -32601, f"Method {method} not found")

3.4 工具调用流程完整示例

现在定义具体工具、资源和提示。

3.4.1 数学工具

mcp_server.py 主入口中注册:

async def add_tool_handler(args: Dict) -> int:
    a = args.get("a")
    b = args.get("b")
    return a + b

async def subtract_tool_handler(args: Dict) -> int:
    a = args.get("a")
    b = args.get("b")
    return a - b

async def time_resource_handler() -> str:
    import datetime
    return datetime.datetime.now().isoformat()

async def greet_prompt_handler(args: Dict) -> str:
    name = args.get("name", "World")
    return f"Hello, {name}! Welcome to MCP."

if __name__ == "__main__":
    server = MCPServer()
    server.add_tool("add", add_tool_handler)
    server.add_tool("subtract", subtract_tool_handler)
    server.add_resource("time://current", time_resource_handler)
    server.add_prompt("greet", greet_prompt_handler)
    asyncio.run(server.run())
3.4.2 客户端调用示例(模拟 AI)

创建一个完整的客户端,演示工具调用流程:

import asyncio
import json
import subprocess
import sys

class MCPClient:
    def __init__(self, server_script):
        self.server_script = server_script
        self.process = None
        self.next_id = 1
        self.pending = {}
    
    async def start(self):
        self.process = await asyncio.create_subprocess_exec(
            sys.executable, self.server_script,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        asyncio.create_task(self._reader())
        # 发送初始化请求
        await self._send_request("initialize", {})
        # 等待初始化响应(简化:等待一段时间)
        await asyncio.sleep(0.1)
    
    async def _reader(self):
        while True:
            line = await self.process.stdout.readline()
            if not line:
                break
            data = json.loads(line)
            req_id = data.get("id")
            if req_id in self.pending:
                self.pending[req_id].set_result(data)
    
    async def _send_request(self, method, params):
        req_id = self.next_id
        self.next_id += 1
        request = {
            "jsonrpc": "2.0",
            "id": req_id,
            "method": method,
            "params": params
        }
        self.process.stdin.write((json.dumps(request) + "\n").encode())
        await self.process.stdin.drain()
        future = asyncio.Future()
        self.pending[req_id] = future
        return await future
    
    async def call_tool(self, name, arguments):
        resp = await self._send_request("tools/call", {"name": name, "arguments": arguments})
        return resp.get("result", {}).get("result")
    
    async def list_tools(self):
        resp = await self._send_request("tools/list", {})
        return resp.get("result", {}).get("tools", [])
    
    async def read_resource(self, uri):
        resp = await self._send_request("resources/read", {"uri": uri})
        contents = resp.get("result", {}).get("contents", [])
        return contents[0].get("text") if contents else None
    
    async def get_prompt(self, name, arguments):
        resp = await self._send_request("prompts/get", {"name": name, "arguments": arguments})
        messages = resp.get("result", {}).get("messages", [])
        return messages[0].get("content") if messages else None
    
    async def stop(self):
        if self.process:
            self.process.terminate()
            await self.process.wait()

async def main():
    client = MCPClient("mcp_server.py")
    await client.start()
    
    # 发现工具
    tools = await client.list_tools()
    print("Available tools:", tools)
    
    # 调用 add 工具
    result = await client.call_tool("add", {"a": 3, "b": 5})
    print("3 + 5 =", result)
    
    # 调用 subtract 工具
    result = await client.call_tool("subtract", {"a": 10, "b": 4})
    print("10 - 4 =", result)
    
    # 读取资源
    time_str = await client.read_resource("time://current")
    print("Current time:", time_str)
    
    # 获取提示
    greeting = await client.get_prompt("greet", {"name": "Alice"})
    print("Prompt:", greeting)
    
    await client.stop()

if __name__ == "__main__":
    asyncio.run(main())

3.5 易于扩展:添加新工具只需注册函数

例如,添加一个乘法工具:

async def multiply_tool_handler(args: Dict) -> int:
    return args.get("a") * args.get("b")

server.add_tool("multiply", multiply_tool_handler)

无需修改任何其他代码,服务器启动后客户端通过 tools/list 即可发现新工具。


4. 运行与测试

  1. 将上述代码保存到对应文件。
  2. 打开终端,运行客户端:
    python mcp_client.py
    
  3. 预期输出:
    Available tools: [{'name': 'add', ...}, {'name': 'subtract', ...}]
    3 + 5 = 8
    10 - 4 = 6
    Current time: 2025-03-30T12:34:56.789012
    Prompt: Hello, Alice! Welcome to MCP.
    

5. 核心机制总结

机制 实现方式 代码位置
标准化协议 JSON-RPC 2.0 通过 \n 分隔消息 mcp_protocol.py
进程隔离 asyncio.create_subprocess_exec 启动子进程,通过 stdin/stdout 通信 mcp_client.py::start
动态能力发现 tools/list, resources/list, prompts/list 方法 mcp_server.py::_dispatch
工具调用流程 客户端发送 tools/call 请求,服务器分发到注册的处理函数 mcp_server.pytools/call 分支
易于扩展 只需调用 add_tool 并实现 async 函数 mcp_server.pyadd_tool

6. 进阶扩展方向

  • 资源模板:支持 time://{zone} 动态 URI。
  • 工具输入验证:根据 inputSchema 自动校验参数。
  • 流式响应:支持 tools/call 返回流式数据(通过多次 JSON-RPC 响应)。
  • 安全沙箱:限制子进程文件系统、网络访问。
  • 多客户端:服务器同时处理多个客户端(需使用命名管道或 socket)。

通过以上实现,我们完整展示了 MCP 的核心设计思想——标准化、进程隔离、动态发现、简洁扩展,可立即用于为 AI 助手集成自定义工具和数据源。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐