实现一个完整项目剖析 Model Context Protocol (MCP):从协议到完整实现
用UML建模和完整实现来揭示MCP(Model Context Protocol)的核心机制。MCP是一个基于JSON-RPC的协议,用于AI助手(如Claude)与外部工具/数据源交互。核心原语:Resources、Tools、Prompts。关键机制:标准化协议、进程隔离(子进程,stdio通信)、工具调用流程、动态能力发现、易于扩展。
提供一个完整的项目示例,展示MCP服务器和客户端(或模拟AI调用)。代码基于Python异步I/O,展示如何实现一个MCP服务器,以及如何与客户端交互。使用UML(Mermaid)建模项目结构、类图、序列图。
将创建一个简单的MCP服务器,提供数学工具(加、减)和一个资源(当前时间),以及一个提示模板。客户端模拟AI发送tools/list、tools/call等请求。
项目文件结构:
mcp_demo/
├── README.md
├── mcp_server.py # MCP服务器实现
├── mcp_client.py # 模拟客户端(AI)
├── mcp_protocol.py # JSON-RPC消息处理基类
├── requirements.txt # 依赖(无额外依赖,使用asyncio)
└── uml_diagrams.md # Mermaid图
下面详细解释MCP核心机制,并展示代码。
注意:MCP规范可以在官方文档找到,但需要根据理解实现核心部分。重点是展示工具调用、资源获取、提示获取,以及进程隔离(服务器作为子进程,通过stdin/stdout通信)。
深入剖析 Model Context Protocol (MCP):从协议到完整实现
本文通过一个完整的 MCP 服务器+客户端 示例,深入展示 MCP 的核心机制:
- 标准化协议:基于 JSON-RPC 2.0,定义
Resources、Tools、Prompts三种原语。 - 进程隔离:客户端启动服务器子进程,通过
stdio通信,实现安全边界。 - 工具调用流程:AI 生成
tool_use→ 客户端转换为 JSON-RPC 请求 → 服务器执行 → 结果返回 AI。 - 动态能力发现:
resources/list、tools/list等接口允许服务器声明其能力。 - 易于扩展:只需实现
Tool.execute即可增加新工具。
所有代码基于 Python asyncio,清晰展示 MCP 工作原理,可作为实际 MCP 服务器的开发模板。
1. UML 建模
1.1 组件与接口图
1.2 类图
1.3 工具调用序列图
2. 项目文件结构
mcp_demo/
├── README.md
├── requirements.txt # 无依赖,仅标准库
├── mcp_server.py # MCP 服务器实现
├── mcp_client.py # 模拟 AI 客户端
├── mcp_protocol.py # JSON-RPC 基础协议处理
└── uml_diagrams.md # Mermaid 图(本文档)
3. MCP 核心机制详解与代码实现
3.1 标准化协议:JSON-RPC 基础封装
MCP 使用 JSON-RPC 2.0 作为通信协议,所有消息通过换行分隔的 JSON 文本传输(每个 JSON 对象后跟一个 \n)。
mcp_protocol.py 实现消息的解析与封装:
import json
import asyncio
from typing import Any, Dict, Optional
class JSONRPCProtocol:
"""处理 JSON-RPC 消息的基类,提供读写和错误处理"""
@staticmethod
def encode_response(id: Any, result: Any) -> str:
return json.dumps({"jsonrpc": "2.0", "id": id, "result": result}) + "\n"
@staticmethod
def encode_error(id: Any, code: int, message: str) -> str:
return json.dumps({"jsonrpc": "2.0", "id": id, "error": {"code": code, "message": message}}) + "\n"
@staticmethod
def encode_notification(method: str, params: Optional[Dict] = None) -> str:
msg = {"jsonrpc": "2.0", "method": method}
if params:
msg["params"] = params
return json.dumps(msg) + "\n"
@staticmethod
def decode_line(line: str) -> Dict:
return json.loads(line)
3.2 进程隔离:客户端启动服务器子进程
MCP 的安全边界通过将服务器作为子进程运行,并通过 stdin/stdout 通信来实现。服务器崩溃不会影响客户端,且客户端可以严格控制服务器权限。
mcp_client.py 中的客户端实现:
import asyncio
import subprocess
import sys
from typing import Any, Dict
class MCPClient:
def __init__(self, server_script: str):
self.server_script = server_script
self.process: Optional[asyncio.subprocess.Process] = None
self.request_id = 0
async def start(self):
"""启动 MCP 服务器子进程,连接其 stdin/stdout"""
self.process = await asyncio.create_subprocess_exec(
sys.executable, self.server_script,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 启动读取任务
asyncio.create_task(self._read_responses())
async def _read_responses(self):
"""持续读取服务器输出并处理响应"""
while True:
line = await self.process.stdout.readline()
if not line:
break
data = json.loads(line.decode())
# 根据 id 将响应返回给等待的调用者(简化:使用全局字典存储 future)
# 实际应维护 pending_requests
# 此处为清晰,省略具体实现
print(f"[Client] Received: {data}")
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""调用服务器上的工具"""
self.request_id += 1
request = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments
}
}
await self._send_request(request)
# 等待响应(简化:此处应 await future)
# 返回结果...
return None # 简化
注意:完整实现需要维护 pending_requests 映射,此处为简洁省略。
3.3 动态能力发现:tools/list, resources/list, prompts/list
服务器启动后,客户端首先调用 initialize 握手,然后通过 tools/list 等接口发现服务器能力。服务器实现以下方法。
mcp_server.py 核心实现:
import asyncio
import sys
import json
from typing import Dict, Any, Callable, Awaitable
from mcp_protocol import JSONRPCProtocol
class MCPServer:
def __init__(self):
self.tools: Dict[str, Callable] = {}
self.resources: Dict[str, Callable] = {}
self.prompts: Dict[str, Callable] = {}
self.protocol = JSONRPCProtocol()
def add_tool(self, name: str, handler: Callable[[Dict], Awaitable[Any]]):
self.tools[name] = handler
def add_resource(self, uri: str, handler: Callable[[], Awaitable[str]]):
self.resources[uri] = handler
def add_prompt(self, name: str, handler: Callable[[Dict], Awaitable[str]]):
self.prompts[name] = handler
async def run(self):
"""从 stdin 读取请求,向 stdout 写入响应"""
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
writer = asyncio.StreamWriter(sys.stdout, loop=loop)
while True:
line = await reader.readline()
if not line:
break
try:
request = json.loads(line.decode())
response = await self._dispatch(request)
if response:
writer.write(response.encode())
await writer.drain()
except Exception as e:
error_msg = self.protocol.encode_error(None, -32603, str(e))
writer.write(error_msg.encode())
await writer.drain()
async def _dispatch(self, request: Dict) -> str:
method = request.get("method")
req_id = request.get("id")
params = request.get("params", {})
if method == "initialize":
# 返回协议版本和服务器信息
return self.protocol.encode_response(req_id, {
"protocolVersion": "0.1.0",
"serverInfo": {"name": "MCP Demo Server", "version": "1.0"}
})
elif method == "tools/list":
# 返回所有工具的定义(名称、描述、输入模式)
tools_def = []
for name in self.tools:
tools_def.append({
"name": name,
"description": f"Tool {name}",
"inputSchema": {
"type": "object",
"properties": {}
}
})
return self.protocol.encode_response(req_id, {"tools": tools_def})
elif method == "tools/call":
tool_name = params.get("name")
args = params.get("arguments", {})
if tool_name not in self.tools:
return self.protocol.encode_error(req_id, -32601, f"Tool {tool_name} not found")
result = await self.tools[tool_name](args)
return self.protocol.encode_response(req_id, {"result": result})
elif method == "resources/list":
resources_def = [{"uri": uri, "name": uri} for uri in self.resources]
return self.protocol.encode_response(req_id, {"resources": resources_def})
elif method == "resources/read":
uri = params.get("uri")
if uri not in self.resources:
return self.protocol.encode_error(req_id, -32601, f"Resource {uri} not found")
content = await self.resources[uri]()
return self.protocol.encode_response(req_id, {"contents": [{"uri": uri, "text": content}]})
elif method == "prompts/list":
prompts_def = [{"name": name, "description": f"Prompt {name}"} for name in self.prompts]
return self.protocol.encode_response(req_id, {"prompts": prompts_def})
elif method == "prompts/get":
name = params.get("name")
args = params.get("arguments", {})
if name not in self.prompts:
return self.protocol.encode_error(req_id, -32601, f"Prompt {name} not found")
prompt_text = await self.prompts[name](args)
return self.protocol.encode_response(req_id, {"messages": [{"role": "user", "content": prompt_text}]})
else:
return self.protocol.encode_error(req_id, -32601, f"Method {method} not found")
3.4 工具调用流程完整示例
现在定义具体工具、资源和提示。
3.4.1 数学工具
在 mcp_server.py 主入口中注册:
async def add_tool_handler(args: Dict) -> int:
a = args.get("a")
b = args.get("b")
return a + b
async def subtract_tool_handler(args: Dict) -> int:
a = args.get("a")
b = args.get("b")
return a - b
async def time_resource_handler() -> str:
import datetime
return datetime.datetime.now().isoformat()
async def greet_prompt_handler(args: Dict) -> str:
name = args.get("name", "World")
return f"Hello, {name}! Welcome to MCP."
if __name__ == "__main__":
server = MCPServer()
server.add_tool("add", add_tool_handler)
server.add_tool("subtract", subtract_tool_handler)
server.add_resource("time://current", time_resource_handler)
server.add_prompt("greet", greet_prompt_handler)
asyncio.run(server.run())
3.4.2 客户端调用示例(模拟 AI)
创建一个完整的客户端,演示工具调用流程:
import asyncio
import json
import subprocess
import sys
class MCPClient:
def __init__(self, server_script):
self.server_script = server_script
self.process = None
self.next_id = 1
self.pending = {}
async def start(self):
self.process = await asyncio.create_subprocess_exec(
sys.executable, self.server_script,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
asyncio.create_task(self._reader())
# 发送初始化请求
await self._send_request("initialize", {})
# 等待初始化响应(简化:等待一段时间)
await asyncio.sleep(0.1)
async def _reader(self):
while True:
line = await self.process.stdout.readline()
if not line:
break
data = json.loads(line)
req_id = data.get("id")
if req_id in self.pending:
self.pending[req_id].set_result(data)
async def _send_request(self, method, params):
req_id = self.next_id
self.next_id += 1
request = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params
}
self.process.stdin.write((json.dumps(request) + "\n").encode())
await self.process.stdin.drain()
future = asyncio.Future()
self.pending[req_id] = future
return await future
async def call_tool(self, name, arguments):
resp = await self._send_request("tools/call", {"name": name, "arguments": arguments})
return resp.get("result", {}).get("result")
async def list_tools(self):
resp = await self._send_request("tools/list", {})
return resp.get("result", {}).get("tools", [])
async def read_resource(self, uri):
resp = await self._send_request("resources/read", {"uri": uri})
contents = resp.get("result", {}).get("contents", [])
return contents[0].get("text") if contents else None
async def get_prompt(self, name, arguments):
resp = await self._send_request("prompts/get", {"name": name, "arguments": arguments})
messages = resp.get("result", {}).get("messages", [])
return messages[0].get("content") if messages else None
async def stop(self):
if self.process:
self.process.terminate()
await self.process.wait()
async def main():
client = MCPClient("mcp_server.py")
await client.start()
# 发现工具
tools = await client.list_tools()
print("Available tools:", tools)
# 调用 add 工具
result = await client.call_tool("add", {"a": 3, "b": 5})
print("3 + 5 =", result)
# 调用 subtract 工具
result = await client.call_tool("subtract", {"a": 10, "b": 4})
print("10 - 4 =", result)
# 读取资源
time_str = await client.read_resource("time://current")
print("Current time:", time_str)
# 获取提示
greeting = await client.get_prompt("greet", {"name": "Alice"})
print("Prompt:", greeting)
await client.stop()
if __name__ == "__main__":
asyncio.run(main())
3.5 易于扩展:添加新工具只需注册函数
例如,添加一个乘法工具:
async def multiply_tool_handler(args: Dict) -> int:
return args.get("a") * args.get("b")
server.add_tool("multiply", multiply_tool_handler)
无需修改任何其他代码,服务器启动后客户端通过 tools/list 即可发现新工具。
4. 运行与测试
- 将上述代码保存到对应文件。
- 打开终端,运行客户端:
python mcp_client.py - 预期输出:
Available tools: [{'name': 'add', ...}, {'name': 'subtract', ...}] 3 + 5 = 8 10 - 4 = 6 Current time: 2025-03-30T12:34:56.789012 Prompt: Hello, Alice! Welcome to MCP.
5. 核心机制总结
| 机制 | 实现方式 | 代码位置 |
|---|---|---|
| 标准化协议 | JSON-RPC 2.0 通过 \n 分隔消息 |
mcp_protocol.py |
| 进程隔离 | asyncio.create_subprocess_exec 启动子进程,通过 stdin/stdout 通信 |
mcp_client.py::start |
| 动态能力发现 | tools/list, resources/list, prompts/list 方法 |
mcp_server.py::_dispatch |
| 工具调用流程 | 客户端发送 tools/call 请求,服务器分发到注册的处理函数 |
mcp_server.py 的 tools/call 分支 |
| 易于扩展 | 只需调用 add_tool 并实现 async 函数 |
mcp_server.py 的 add_tool |
6. 进阶扩展方向
- 资源模板:支持
time://{zone}动态 URI。 - 工具输入验证:根据
inputSchema自动校验参数。 - 流式响应:支持
tools/call返回流式数据(通过多次 JSON-RPC 响应)。 - 安全沙箱:限制子进程文件系统、网络访问。
- 多客户端:服务器同时处理多个客户端(需使用命名管道或 socket)。
通过以上实现,我们完整展示了 MCP 的核心设计思想——标准化、进程隔离、动态发现、简洁扩展,可立即用于为 AI 助手集成自定义工具和数据源。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)