MCP 协议实战
·
前言
💡 痛点:AI 模型是孤岛?每次都要手动传数据?想让 AI 直接操作你的工具和数据?
🎯 解决方案:掌握 MCP(Model Context Protocol) — AI 模型的"USB 接口",标准化工具集成协议。
MCP 是什么?
MCP 核心概念:
| 概念 | 说明 | 类比 |
|---|---|---|
| Host | 宿主应用(如 IDE、聊天工具) | 电脑 |
| Client | 与单个 MCP Server 通信 | USB 驱动 |
| Server | 提供工具/资源/提示词 | USB 设备 |
| Protocol | JSON-RPC 2.0 通信协议 | USB 协议 |
传输方式:
| 方式 | 协议 | 适用场景 |
|---|---|---|
| stdio | 标准输入输出 | 本地进程 |
| SSE | Server-Sent Events | 远程 HTTP |
| Streamable HTTP | HTTP + 流式 | 新版推荐 |
一、MCP 协议基础
1.1 JSON-RPC 2.0 通信
# ===== MCP 通信协议 =====
import json
import asyncio
from typing import Dict, Any, Optional
class MCPMessage:
"""MCP 消息封装"""
@staticmethod
def request(method: str, params: Dict = None, request_id: int = 1) -> str:
"""构建请求"""
return json.dumps({
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params or {}
})
@staticmethod
def response(result: Any = None, error: Dict = None, request_id: int = 1) -> str:
"""构建响应"""
msg = {"jsonrpc": "2.0", "id": request_id}
if error:
msg["error"] = error
else:
msg["result"] = result
return json.dumps(msg)
@staticmethod
def notification(method: str, params: Dict = None) -> str:
"""构建通知(无 id,不期望响应)"""
return json.dumps({
"jsonrpc": "2.0",
"method": method,
"params": params or {}
})
@staticmethod
def parse(raw: str) -> Dict:
"""解析消息"""
return json.loads(raw)
# 消息示例
print("=== 初始化请求 ===")
print(MCPMessage.request(
method="initialize",
params={
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {"listChanged": True}
},
"clientInfo": {"name": "my-client", "version": "1.0.0"}
},
request_id=1
))
print("\n=== 工具调用请求 ===")
print(MCPMessage.request(
method="tools/call",
params={
"name": "read_file",
"arguments": {"path": "/tmp/test.txt"}
},
request_id=2
))
print("\n=== 工具列表请求 ===")
print(MCPMessage.notification(
method="notifications/tools/list_changed"
))
1.2 MCP 生命周期
# ===== MCP 生命周期管理 =====
import asyncio
import json
from typing import Dict, Optional
from enum import Enum
class MCPState(Enum):
"""MCP 连接状态"""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
INITIALIZING = "initializing"
READY = "ready"
SHUTDOWN = "shutdown"
ERROR = "error"
class MCPLifecycle:
"""MCP 生命周期管理"""
def __init__(self, client_name: str = "mcp-client", client_version: str = "1.0.0"):
self.state = MCPState.DISCONNECTED
self.client_name = client_name
self.client_version = client_version
self.server_info: Dict = {}
self.server_capabilities: Dict = {}
self.protocol_version: str = ""
self.request_counter = 0
def _next_id(self) -> int:
self.request_counter += 1
return self.request_counter
def create_initialize(self) -> str:
"""创建初始化请求"""
self.state = MCPState.CONNECTING
return MCPMessage.request(
method="initialize",
params={
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {}
},
"clientInfo": {
"name": self.client_name,
"version": self.client_version
}
},
request_id=self._next_id()
)
def handle_init_response(self, response: Dict):
"""处理初始化响应"""
if "error" in response:
self.state = MCPState.ERROR
print(f"初始化失败: {response['error']}")
return
result = response.get("result", {})
self.server_info = result.get("serverInfo", {})
self.server_capabilities = result.get("capabilities", {})
self.protocol_version = result.get("protocolVersion", "")
print(f"✅ 初始化成功")
print(f" Server: {self.server_info.get('name')} v{self.server_info.get('version')}")
print(f" Protocol: {self.protocol_version}")
print(f" Capabilities: {json.dumps(self.server_capabilities, ensure_ascii=False)}")
def create_initialized(self) -> str:
"""发送 initialized 通知"""
self.state = MCPState.INITIALIZING
return MCPMessage.notification(
method="notifications/initialized"
)
def set_ready(self):
"""设为就绪状态"""
self.state = MCPState.READY
print("🟢 MCP 连接就绪")
def create_shutdown(self) -> str:
"""创建关闭请求"""
return MCPMessage.request(
method="shutdown",
request_id=self._next_id()
)
def get_status(self) -> Dict:
"""获取状态"""
return {
"state": self.state.value,
"server": self.server_info,
"protocol": self.protocol_version,
"capabilities": self.server_capabilities
}
# 使用
lifecycle = MCPLifecycle("demo-client", "1.0.0")
# 模拟生命周期
init_request = lifecycle.create_initialize()
print(f"\n发送: {init_request}")
# 模拟响应
mock_response = json.loads(MCPMessage.response(
result={
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"subscribe": True},
"prompts": {"listChanged": True}
},
"serverInfo": {"name": "demo-server", "version": "1.0.0"}
},
request_id=1
))
lifecycle.handle_init_response(mock_response)
lifecycle.create_initialized()
lifecycle.set_ready()
print(f"\n状态: {json.dumps(lifecycle.get_status(), ensure_ascii=False, indent=2)}")
二、MCP Server 开发
2.1 基础 Server 框架
# ===== MCP Server 基础框架 =====
import sys
import json
import asyncio
from typing import Dict, Any, Callable, List, Optional
from dataclasses import dataclass, field
@dataclass
class MCPTool:
"""MCP 工具定义"""
name: str
description: str
input_schema: Dict = field(default_factory=dict)
@dataclass
class MCPResource:
"""MCP 资源定义"""
uri: str
name: str
description: str
mime_type: str = "text/plain"
@dataclass
class MCPPrompt:
"""MCP 提示词定义"""
name: str
description: str
arguments: List[Dict] = field(default_factory=list)
class MCPServer:
"""MCP Server 基础实现"""
def __init__(
self,
name: str,
version: str = "1.0.0",
protocol_version: str = "2024-11-05"
):
self.name = name
self.version = version
self.protocol_version = protocol_version
self.tools: Dict[str, MCPTool] = {}
self.resources: Dict[str, MCPResource] = {}
self.prompts: Dict[str, MCPPrompt] = {}
self.tool_handlers: Dict[str, Callable] = {}
self.resource_handlers: Dict[str, Callable] = {}
self.prompt_handlers: Dict[str, Callable] = {}
# 注册内置方法
self._method_handlers = {
"initialize": self._handle_initialize,
"ping": self._handle_ping,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tool_call,
"resources/list": self._handle_resources_list,
"resources/read": self._handle_resource_read,
"prompts/list": self._handle_prompts_list,
"prompts/get": self._handle_prompt_get,
"shutdown": self._handle_shutdown,
}
# ---- 工具注册 ----
def tool(
self,
name: str,
description: str,
input_schema: Dict = None
):
"""工具注册装饰器"""
def decorator(func: Callable):
self.tools[name] = MCPTool(
name=name,
description=description,
input_schema=input_schema or self._infer_schema(func)
)
self.tool_handlers[name] = func
return func
return decorator
def resource(
self,
uri: str,
name: str,
description: str,
mime_type: str = "text/plain"
):
"""资源注册装饰器"""
def decorator(func: Callable):
self.resources[uri] = MCPResource(
uri=uri, name=name,
description=description,
mime_type=mime_type
)
self.resource_handlers[uri] = func
return func
return decorator
def prompt(
self,
name: str,
description: str,
arguments: List[Dict] = None
):
"""提示词注册装饰器"""
def decorator(func: Callable):
self.prompts[name] = MCPPrompt(
name=name,
description=description,
arguments=arguments or []
)
self.prompt_handlers[name] = func
return func
return decorator
# ---- 方法处理器 ----
async def _handle_initialize(self, params: Dict) -> Dict:
"""处理初始化"""
return {
"protocolVersion": self.protocol_version,
"capabilities": {
"tools": {"listChanged": bool(self.tools)},
"resources": {
"subscribe": True,
"listChanged": bool(self.resources)
},
"prompts": {"listChanged": bool(self.prompts)}
},
"serverInfo": {
"name": self.name,
"version": self.version
}
}
async def _handle_ping(self, params: Dict) -> Dict:
return {}
async def _handle_tools_list(self, params: Dict) -> Dict:
return {
"tools": [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema
} for t in self.tools.values()
]
}
async def _handle_tool_call(self, params: Dict) -> Dict:
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
if tool_name not in self.tool_handlers:
return {
"content": [{"type": "text", "text": f"未知工具: {tool_name}"}],
"isError": True
}
try:
handler = self.tool_handlers[tool_name]
if asyncio.iscoroutinefunction(handler):
result = await handler(**arguments)
else:
result = handler(**arguments)
return {
"content": [{"type": "text", "text": str(result)}],
"isError": False
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"工具执行错误: {e}"}],
"isError": True
}
async def _handle_resources_list(self, params: Dict) -> Dict:
return {
"resources": [
{
"uri": r.uri,
"name": r.name,
"description": r.description,
"mimeType": r.mime_type
} for r in self.resources.values()
]
}
async def _handle_resource_read(self, params: Dict) -> Dict:
uri = params.get("uri", "")
if uri not in self.resource_handlers:
return {
"contents": [{"uri": uri, "mimeType": "text/plain", "text": f"未知资源: {uri}"}]
}
handler = self.resource_handlers[uri]
if asyncio.iscoroutinefunction(handler):
content = await handler(uri)
else:
content = handler(uri)
return {
"contents": [{"uri": uri, "mimeType": "text/plain", "text": str(content)}]
}
async def _handle_prompts_list(self, params: Dict) -> Dict:
return {
"prompts": [
{
"name": p.name,
"description": p.description,
"arguments": p.arguments
} for p in self.prompts.values()
]
}
async def _handle_prompt_get(self, params: Dict) -> Dict:
prompt_name = params.get("name", "")
arguments = params.get("arguments", {})
if prompt_name not in self.prompt_handlers:
return {"error": f"未知提示词: {prompt_name}"}
handler = self.prompt_handlers[prompt_name]
if asyncio.iscoroutinefunction(handler):
messages = await handler(**arguments)
else:
messages = handler(**arguments)
return {"messages": messages}
async def _handle_shutdown(self, params: Dict) -> Dict:
return {}
# ---- 消息处理 ----
async def handle_message(self, raw: str) -> Optional[str]:
"""处理收到的消息"""
try:
message = json.loads(raw)
method = message.get("method", "")
params = message.get("params", {})
request_id = message.get("id")
handler = self._method_handlers.get(method)
if not handler:
error_response = MCPMessage.response(
error={"code": -32601, "message": f"Method not found: {method}"},
request_id=request_id or 0
)
return error_response
result = await handler(params)
# 通知不需要响应
if request_id is None:
return None
return MCPMessage.response(result=result, request_id=request_id)
except json.JSONDecodeError as e:
return MCPMessage.response(
error={"code": -32700, "message": f"Parse error: {e}"},
request_id=0
)
except Exception as e:
return MCPMessage.response(
error={"code": -32603, "message": f"Internal error: {e}"},
request_id=0
)
def _infer_schema(self, func: Callable) -> Dict:
"""从函数推断 JSON Schema"""
import inspect
sig = inspect.signature(func)
type_map = {
str: "string", int: "integer",
float: "number", bool: "boolean",
list: "array", dict: "object"
}
properties = {}
required = []
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
ptype = type_map.get(param.annotation, "string")
properties[param_name] = {"type": ptype, "description": param_name}
if param.default == inspect.Parameter.empty:
required.append(param_name)
return {"type": "object", "properties": properties, "required": required}
async def run_stdio(self):
"""通过 stdio 运行"""
reader = asyncio.StreamReader()
loop = asyncio.get_event_loop()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
writer = asyncio.StreamWriter(
writer_transport, writer_protocol, reader, loop
)
print(f"MCP Server '{self.name}' v{self.version} 启动", file=sys.stderr)
while True:
line = await reader.readline()
if not line:
break
line = line.decode().strip()
if not line:
continue
response = await self.handle_message(line)
if response:
writer.write((response + "\n").encode())
await writer.drain()
# 使用
if __name__ == '__main__':
server = MCPServer("demo-server", "1.0.0")
@server.tool("echo", "回显输入文本")
def echo(text: str) -> str:
return f"Echo: {text}"
@server.tool("add", "两数相加")
def add(a: int, b: int) -> int:
return a + b
print("Server 已创建,注册工具:", list(server.tools.keys()))
2.2 文件系统 MCP Server
# ===== 文件系统 MCP Server =====
import os
import json
import asyncio
class FileSystemMCPServer(MCPServer):
"""文件系统 MCP Server"""
def __init__(self, allowed_root: str = "/"):
super().__init__("filesystem-server", "1.0.0")
self.allowed_root = os.path.abspath(allowed_root)
self._register_filesystem_tools()
self._register_filesystem_resources()
def _safe_path(self, path: str) -> str:
"""安全路径检查(防止目录穿越)"""
abs_path = os.path.abspath(path)
if not abs_path.startswith(self.allowed_root):
raise ValueError(f"路径超出允许范围: {path}")
return abs_path
def _register_filesystem_tools(self):
"""注册文件系统工具"""
@self.tool(
"read_file",
"读取文件内容",
{"type": "object", "properties": {
"path": {"type": "string", "description": "文件路径"},
"encoding": {"type": "string", "description": "编码(默认 utf-8)"}
}, "required": ["path"]}
)
def read_file(path: str, encoding: str = "utf-8") -> str:
safe_path = self._safe_path(path)
with open(safe_path, 'r', encoding=encoding) as f:
return f.read()
@self.tool(
"write_file",
"写入文件",
{"type": "object", "properties": {
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "写入内容"}
}, "required": ["path", "content"]}
)
def write_file(path: str, content: str) -> str:
safe_path = self._safe_path(path)
os.makedirs(os.path.dirname(safe_path), exist_ok=True)
with open(safe_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"已写入 {safe_path}({len(content)} 字符)"
@self.tool(
"list_directory",
"列出目录内容",
{"type": "object", "properties": {
"path": {"type": "string", "description": "目录路径"},
"pattern": {"type": "string", "description": "过滤模式(可选)"}
}, "required": ["path"]}
)
def list_directory(path: str, pattern: str = "*") -> str:
safe_path = self._safe_path(path)
import fnmatch
entries = []
for name in os.listdir(safe_path):
if fnmatch.fnmatch(name, pattern):
full = os.path.join(safe_path, name)
is_dir = os.path.isdir(full)
size = os.path.getsize(full) if not is_dir else 0
entries.append(f"{'📁' if is_dir else '📄'} {name} ({size}B)")
return "\n".join(entries) if entries else "空目录"
@self.tool(
"search_files",
"搜索文件",
{"type": "object", "properties": {
"path": {"type": "string", "description": "搜索根路径"},
"pattern": {"type": "string", "description": "文件名匹配模式"},
"max_depth": {"type": "integer", "description": "最大深度(默认 3)"}
}, "required": ["path", "pattern"]}
)
def search_files(path: str, pattern: str, max_depth: int = 3) -> str:
safe_path = self._safe_path(path)
results = []
def walk(current: str, depth: int):
if depth > max_depth:
return
import fnmatch
try:
for name in os.listdir(current):
full = os.path.join(current, name)
if os.path.isdir(full):
walk(full, depth + 1)
elif fnmatch.fnmatch(name, pattern):
results.append(full)
except PermissionError:
pass
walk(safe_path, 0)
return "\n".join(results) if results else "未找到匹配文件"
@self.tool(
"get_file_info",
"获取文件信息",
{"type": "object", "properties": {
"path": {"type": "string", "description": "文件路径"}
}, "required": ["path"]}
)
def get_file_info(path: str) -> str:
safe_path = self._safe_path(path)
stat = os.stat(safe_path)
return json.dumps({
"path": safe_path,
"size": stat.st_size,
"is_file": os.path.isfile(safe_path),
"is_dir": os.path.isdir(safe_path),
"modified": stat.st_mtime,
"created": stat.st_ctime
}, ensure_ascii=False, indent=2)
def _register_filesystem_resources(self):
"""注册文件系统资源"""
@self.resource(
"file:///workspace",
"workspace",
"工作区文件列表"
)
def workspace_resource(uri: str) -> str:
entries = os.listdir(self.allowed_root)
return "\n".join(entries)
# 使用
if __name__ == '__main__':
server = FileSystemMCPServer(allowed_root=".")
print("文件系统 MCP Server")
print(f"允许路径: {server.allowed_root}")
print(f"工具: {list(server.tools.keys())}")
2.3 数据库 MCP Server
# ===== 数据库 MCP Server =====
class DatabaseMCPServer(MCPServer):
"""数据库 MCP Server"""
def __init__(self, connection_string: str = "sqlite:///demo.db"):
super().__init__("database-server", "1.0.0")
self.connection_string = connection_string
self._register_database_tools()
def _get_connection(self):
"""获取数据库连接(示例用 SQLite)"""
import sqlite3
return sqlite3.connect(":memory:")
def _register_database_tools(self):
"""注册数据库工具"""
@self.tool(
"query",
"执行 SQL 查询",
{"type": "object", "properties": {
"sql": {"type": "string", "description": "SQL 查询语句"},
"params": {"type": "array", "description": "查询参数(可选)"},
"limit": {"type": "integer", "description": "返回行数限制(默认 100)"}
}, "required": ["sql"]}
)
def query(sql: str, params: list = None, limit: int = 100) -> str:
conn = self._get_connection()
try:
cursor = conn.cursor()
# 创建示例表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("INSERT OR IGNORE INTO users (id, name, email) VALUES (1, '张三', 'zhangsan@example.com')")
cursor.execute("INSERT OR IGNORE INTO users (id, name, email) VALUES (2, '李四', 'lisi@example.com')")
cursor.execute(sql, params or [])
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# 格式化输出
result = f"列名: {columns}\n"
result += f"返回 {len(rows)} 行\n\n"
for row in rows[:limit]:
row_dict = dict(zip(columns, row))
result += json.dumps(row_dict, ensure_ascii=False) + "\n"
return result
except Exception as e:
return f"SQL 错误: {e}"
finally:
conn.close()
@self.tool(
"list_tables",
"列出所有表",
{"type": "object", "properties": {}, "required": []}
)
def list_tables() -> str:
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
return f"数据表: {', '.join(tables)}" if tables else "没有数据表"
finally:
conn.close()
@self.tool(
"get_table_schema",
"获取表结构",
{"type": "object", "properties": {
"table": {"type": "string", "description": "表名"}
}, "required": ["table"]}
)
def get_table_schema(table: str) -> str:
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table})")
columns = cursor.fetchall()
schema_lines = [f"表: {table}\n"]
for col in columns:
cid, name, dtype, notnull, default, pk = col
schema_lines.append(
f" - {name}: {dtype} "
f"{'NOT NULL' if notnull else 'NULL'} "
f"{'PK' if pk else ''}"
)
return "\n".join(schema_lines)
except Exception as e:
return f"错误: {e}"
finally:
conn.close()
# 使用
if __name__ == '__main__':
server = DatabaseMCPServer()
print("数据库 MCP Server")
print(f"工具: {list(server.tools.keys())}")
三、MCP Client 开发
3.1 stdio Client
# ===== MCP Client (stdio) =====
import asyncio
import json
import sys
from typing import Dict, List, Optional
class MCPStdioClient:
"""MCP stdio 客户端"""
def __init__(self, server_command: str, server_args: List[str] = None):
self.server_command = server_command
self.server_args = server_args or []
self.process = None
self.request_id = 0
self.server_info: Dict = {}
self.capabilities: Dict = {}
self._pending: Dict[int, asyncio.Future] = {}
async def connect(self):
"""连接到 MCP Server"""
self.process = await asyncio.create_subprocess_exec(
self.server_command, *self.server_args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 启动读取任务
asyncio.create_task(self._read_loop())
# 初始化
init_result = await self.send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {"roots": {"listChanged": True}},
"clientInfo": {"name": "stdio-client", "version": "1.0.0"}
})
self.server_info = init_result.get("serverInfo", {})
self.capabilities = init_result.get("capabilities", {})
# 发送 initialized 通知
await self.send_notification("notifications/initialized")
print(f"✅ 已连接到 {self.server_info.get('name')} v{self.server_info.get('version')}")
async def _read_loop(self):
"""读取 Server 消息"""
while True:
try:
line = await self.process.stdout.readline()
if not line:
break
line = line.decode().strip()
if not line:
continue
message = json.loads(line)
request_id = message.get("id")
if request_id in self._pending:
self._pending[request_id].set_result(message)
del self._pending[request_id]
except Exception as e:
print(f"读取错误: {e}")
break
async def send_request(self, method: str, params: Dict = None) -> Dict:
"""发送请求"""
self.request_id += 1
request_id = self.request_id
message = json.dumps({
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params or {}
})
self._pending[request_id] = asyncio.get_event_loop().create_future()
self.process.stdin.write((message + "\n").encode())
await self.process.stdin.drain()
# 等待响应(10s 超时)
try:
response = await asyncio.wait_for(self._pending[request_id], timeout=10)
if "error" in response:
raise Exception(response["error"])
return response.get("result", {})
except asyncio.TimeoutError:
raise TimeoutError(f"请求超时: {method}")
async def send_notification(self, method: str, params: Dict = None):
"""发送通知"""
message = json.dumps({
"jsonrpc": "2.0",
"method": method,
"params": params or {}
})
self.process.stdin.write((message + "\n").encode())
await self.process.stdin.drain()
async def list_tools(self) -> List[Dict]:
"""获取工具列表"""
result = await self.send_request("tools/list")
return result.get("tools", [])
async def call_tool(self, name: str, arguments: Dict = None) -> str:
"""调用工具"""
result = await self.send_request("tools/call", {
"name": name,
"arguments": arguments or {}
})
contents = result.get("content", [])
texts = [c.get("text", "") for c in contents if c.get("type") == "text"]
return "\n".join(texts)
async def list_resources(self) -> List[Dict]:
"""获取资源列表"""
result = await self.send_request("resources/list")
return result.get("resources", [])
async def read_resource(self, uri: str) -> str:
"""读取资源"""
result = await self.send_request("resources/read", {"uri": uri})
contents = result.get("contents", [])
texts = [c.get("text", "") for c in contents if c.get("type") == "text"]
return "\n".join(texts)
async def disconnect(self):
"""断开连接"""
await self.send_request("shutdown")
self.process.stdin.close()
await self.process.wait()
print("🔌 已断开连接")
# 使用
if __name__ == '__main__':
async def main():
client = MCPStdioClient("python", ["my_server.py"])
await client.connect()
# 列出工具
tools = await client.list_tools()
for tool in tools:
print(f" 🔧 {tool['name']}: {tool['description']}")
# 调用工具
result = await client.call_tool("echo", {"text": "Hello MCP!"})
print(f"结果: {result}")
await client.disconnect()
asyncio.run(main())
3.2 Streamable HTTP Client
# ===== MCP Client (Streamable HTTP) =====
import aiohttp
import json
from typing import Dict, List, Optional
class MCPHttpClient:
"""MCP Streamable HTTP 客户端"""
def __init__(self, base_url: str, headers: Dict = None):
self.base_url = base_url.rstrip("/")
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
**(headers or {})
}
self.request_id = 0
self.session_id: Optional[str] = None
async def connect(self):
"""连接到 MCP Server"""
result = await self.send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "http-client", "version": "1.0.0"}
})
print(f"✅ 已连接: {result.get('serverInfo', {}).get('name')}")
await self.send_notification("notifications/initialized")
async def send_request(self, method: str, params: Dict = None) -> Dict:
"""发送请求"""
self.request_id += 1
payload = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": method,
"params": params or {}
}
headers = dict(self.headers)
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
async with aiohttp.ClientSession() as session:
async with session.post(
self.base_url,
json=payload,
headers=headers
) as resp:
# 保存 session ID
sid = resp.headers.get("Mcp-Session-Id")
if sid:
self.session_id = sid
data = await resp.json()
if "error" in data:
raise Exception(data["error"])
return data.get("result", {})
async def send_notification(self, method: str, params: Dict = None):
"""发送通知"""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {}
}
headers = dict(self.headers)
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
async with aiohttp.ClientSession() as session:
async with session.post(
self.base_url,
json=payload,
headers=headers
) as resp:
pass
async def list_tools(self) -> List[Dict]:
result = await self.send_request("tools/list")
return result.get("tools", [])
async def call_tool(self, name: str, arguments: Dict = None) -> str:
result = await self.send_request("tools/call", {
"name": name,
"arguments": arguments or {}
})
contents = result.get("content", [])
return "\n".join(c.get("text", "") for c in contents if c.get("type") == "text")
# 使用示例
if __name__ == '__main__':
async def main():
client = MCPHttpClient("http://localhost:8080/mcp")
await client.connect()
tools = await client.list_tools()
print(f"可用工具: {len(tools)} 个")
asyncio.run(main())
四、MCP 与 LLM 集成
4.1 MCP + OpenAI 函数调用
# ===== MCP + OpenAI 集成 =====
from openai import OpenAI
import asyncio
client = OpenAI()
class MCPWithLLM:
"""MCP + LLM 集成"""
def __init__(self, mcp_client: MCPStdioClient, model: str = "gpt-4o"):
self.mcp = mcp_client
self.model = model
self.llm = OpenAI()
async def chat(self, user_message: str, max_tool_calls: int = 5) -> str:
"""对话(自动调用 MCP 工具)"""
# 获取 MCP 工具列表
mcp_tools = await self.mcp.list_tools()
# 转换为 OpenAI function schema
openai_tools = [
{
"type": "function",
"function": {
"name": t["name"],
"description": t["description"],
"parameters": t.get("inputSchema", {})
}
} for t in mcp_tools
]
messages = [
{
"role": "system",
"content": "你是一个助手,可以使用 MCP 工具来完成任务。根据需要调用工具,然后基于结果回答用户。"
},
{"role": "user", "content": user_message}
]
for _ in range(max_tool_calls):
# 调用 LLM
response = self.llm.chat.completions.create(
model=self.model,
messages=messages,
tools=openai_tools if openai_tools else None,
tool_choice="auto"
)
message = response.choices[0].message
messages.append(message)
if not message.tool_calls:
return message.content
# 执行 MCP 工具调用
for tool_call in message.tool_calls:
func_name = tool_call.function.name
func_args = json.loads(tool_call.function.arguments)
print(f" 🔧 MCP 调用: {func_name}({json.dumps(func_args, ensure_ascii=False)})")
# 通过 MCP 调用工具
result = await self.mcp.call_tool(func_name, func_args)
print(f" 📋 结果: {result}")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
return "达到最大工具调用次数。"
# 使用
if __name__ == '__main__':
async def main():
mcp_client = MCPStdioClient("python", ["filesystem_server.py"])
await mcp_client.connect()
assistant = MCPWithLLM(mcp_client)
result = await assistant.chat("帮我查看当前目录有哪些文件")
print(f"\n回答: {result}")
await mcp_client.disconnect()
asyncio.run(main())
4.2 MCP + Claude 集成
# ===== MCP + Claude 集成 =====
import anthropic
class MCPWithClaude:
"""MCP + Claude 集成"""
def __init__(self, mcp_client, api_key: str = None):
self.mcp = mcp_client
self.client = anthropic.Anthropic(api_key=api_key)
async def chat(self, user_message: str, max_turns: int = 5) -> str:
"""对话"""
mcp_tools = await self.mcp.list_tools()
# 转换为 Claude tool schema
claude_tools = [
{
"name": t["name"],
"description": t["description"],
"input_schema": t.get("inputSchema", {})
} for t in mcp_tools
]
messages = [{"role": "user", "content": user_message}]
for _ in range(max_turns):
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="使用 MCP 工具完成任务。",
tools=claude_tools if claude_tools else [],
messages=messages
)
# 处理响应
assistant_content = []
stop_reason = response.stop_reason
for block in response.content:
if block.type == "text":
assistant_content.append({"type": "text", "text": block.text})
elif block.type == "tool_use":
assistant_content.append({
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input
)
messages.append({"role": "assistant", "content": assistant_content})
if stop_reason == "end_turn":
# 提取文本
texts = [b.text for b in response.content if b.type == "text"]
return "\n".join(texts)
if stop_reason == "tool_use":
# 执行工具
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = await self.mcp.call_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result
})
messages.append({"role": "user", "content": tool_results})
return "达到最大轮次。"
五、MCP 资源与提示词
5.1 资源(Resources)系统
# ===== MCP 资源系统 =====
class ResourceBasedMCPServer(MCPServer):
"""基于资源的 MCP Server"""
def __init__(self, data_dir: str = "./data"):
super().__init__("resource-server", "1.0.0")
self.data_dir = data_dir
self._register_resources()
def _register_resources(self):
"""注册资源"""
@self.resource(
"file:///config",
"app-config",
"应用配置文件",
mime_type="application/json"
)
def config_resource(uri: str) -> str:
return json.dumps({
"app_name": "Demo App",
"version": "1.0",
"debug": False
}, indent=2)
@self.resource(
"file:///logs/recent",
"recent-logs",
"最近日志"
)
def logs_resource(uri: str) -> str:
return """[2026-06-08 09:00] Server started
[2026-06-08 09:01] Connection established
[2026-06-08 09:05] Query executed (23ms)
[2026-06-08 09:10] Cache refreshed"""
@self.resource(
"file:///stats",
"system-stats",
"系统统计信息"
)
def stats_resource(uri: str) -> str:
return json.dumps({
"uptime": "2h 35m",
"requests": 1247,
"avg_latency": "45ms",
"error_rate": "0.3%"
}, indent=2)
# 动态资源:读取目录下的文件
import os
if os.path.exists(self.data_dir):
for filename in os.listdir(self.data_dir):
filepath = os.path.join(self.data_dir, filename)
if os.path.isfile(filepath):
self._register_dynamic_resource(filepath)
def _register_dynamic_resource(self, filepath: str):
"""注册动态资源"""
uri = f"file:///{filepath}"
name = os.path.basename(filepath)
@self.resource(uri, name, f"文件: {name}")
def file_resource(uri: str = uri) -> str:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
self.resource_handlers[uri] = file_resource
# 使用
if __name__ == '__main__':
server = ResourceBasedMCPServer()
print("资源:")
for uri, res in server.resources.items():
print(f" 📄 {uri}: {res.name} - {res.description}")
5.2 提示词(Prompts)系统
# ===== MCP 提示词系统 =====
class PromptBasedMCPServer(MCPServer):
"""基于提示词的 MCP Server"""
def __init__(self):
super().__init__("prompt-server", "1.0.0")
self._register_prompts()
def _register_prompts(self):
"""注册提示词"""
@self.prompt(
"code_review",
"代码审查提示词",
[{"name": "code", "description": "要审查的代码", "required": True}]
)
def code_review_prompt(code: str) -> list:
return [
{
"role": "user",
"content": f"""请审查以下代码,关注:
1. 潜在 bug
2. 安全问题
3. 性能优化
4. 代码规范
代码:
{code}
}
]
@self.prompt(
"sql_optimizer",
"SQL 优化提示词",
[{"name": "sql", "description": "SQL 查询语句", "required": True}]
)
def sql_optimizer_prompt(sql: str) -> list:
return [
{
"role": "user",
"content": f"""请优化以下 SQL 查询:
1. 分析执行计划
2. 建议索引优化
3. 重写优化版本
SQL: {sql}"""
}
]
@self.prompt(
"document_summary",
"文档摘要提示词",
[{"name": "document", "description": "文档内容", "required": True}]
)
def document_summary_prompt(document: str) -> list:
return [
{
"role": "user",
"content": f"请用 3-5 句话总结以下文档的要点:\n\n{document}"
}
]
@self.prompt(
"debug_assistant",
"调试助手提示词",
[
{"name": "error", "description": "错误信息", "required": True},
{"name": "context", "description": "上下文代码", "required": False}
]
)
def debug_assistant_prompt(error: str, context: str = "") -> list:
content = f"错误信息:\n{error}\n"
if context:
content += f"\n相关代码:\n{context}\n"
content += "\n请分析可能的原因和解决方案。"
return [{"role": "user", "content": content}]
# 使用
if __name__ == '__main__':
server = PromptBasedMCPServer()
print("提示词:")
for name, prompt in server.prompts.items():
args_desc = ", ".join(a["name"] for a in prompt.arguments)
print(f" 💬 {name}({args_desc}): {prompt.description}")
六、多 Server 协作
6.1 MCP Server 管理器
# ===== 多 Server 管理 =====
import asyncio
from typing import Dict, List, Optional
class MCPServerManager:
"""管理多个 MCP Server"""
def __init__(self):
self.servers: Dict[str, MCPServer] = {}
self.clients: Dict[str, MCPStdioClient] = {}
def register_local(self, name: str, server: MCPServer):
"""注册本地 Server"""
self.servers[name] = server
async def connect_remote(self, name: str, command: str, args: List[str] = None):
"""连接远程 Server"""
client = MCPStdioClient(command, args)
await client.connect()
self.clients[name] = client
async def list_all_tools(self) -> List[Dict]:
"""列出所有 Server 的工具"""
all_tools = []
# 本地 Server
for server_name, server in self.servers.items():
tools = server.tools
for tool in tools.values():
all_tools.append({
"source": server_name,
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
})
# 远程 Server
for client_name, client in self.clients.items():
tools = await client.list_tools()
for tool in tools:
all_tools.append({
"source": client_name,
"name": tool["name"],
"description": tool["description"],
"inputSchema": tool.get("inputSchema", {})
})
return all_tools
async def call_tool(self, server_name: str, tool_name: str, arguments: Dict = None) -> str:
"""调用指定 Server 的工具"""
if server_name in self.servers:
# 本地 Server
server = self.servers[server_name]
handler = server.tool_handlers.get(tool_name)
if handler:
result = handler(**(arguments or {}))
return str(result) if not asyncio.iscoroutinefunction(handler) else await result(**(arguments or {}))
return f"工具不存在: {tool_name}"
elif server_name in self.clients:
# 远程 Server
return await self.clients[server_name].call_tool(tool_name, arguments)
return f"Server 不存在: {server_name}"
# 使用
if __name__ == '__main__':
async def main():
manager = MCPServerManager()
# 注册本地 Server
fs_server = FileSystemMCPServer(".")
manager.register_local("filesystem", fs_server)
# 列出所有工具
tools = await manager.list_all_tools()
for tool in tools:
print(f" [{tool['source']}] {tool['name']}: {tool['description']}")
asyncio.run(main())
6.2 统一工具代理
# ===== 统一 MCP 工具代理 =====
class UnifiedMCPAgent:
"""统一 MCP 工具代理(自动路由到正确的 Server)"""
def __init__(self, manager: MCPServerManager, model: str = "gpt-4o"):
self.manager = manager
self.model = model
self.llm = OpenAI()
async def chat(self, message: str, max_turns: int = 10) -> str:
"""对话(自动路由 MCP 工具)"""
all_tools = await self.manager.list_all_tools()
# 添加 server 来源到工具名避免冲突
openai_tools = []
for tool in all_tools:
qualified_name = f"{tool['source']}__{tool['name']}"
openai_tools.append({
"type": "function",
"function": {
"name": qualified_name,
"description": f"[{tool['source']}] {tool['description']}",
"parameters": tool.get("inputSchema", {})
}
})
messages = [
{
"role": "system",
"content": "你是一个助手,可以调用多个 MCP Server 的工具。工具名格式为 server__tool_name。"
},
{"role": "user", "content": message}
]
for _ in range(max_turns):
response = self.llm.chat.completions.create(
model=self.model,
messages=messages,
tools=openai_tools if openai_tools else None,
tool_choice="auto"
)
msg = response.choices[0].message
messages.append(msg)
if not msg.tool_calls:
return msg.content
for tool_call in msg.tool_calls:
full_name = tool_call.function.name
parts = full_name.split("__", 1)
if len(parts) == 2:
server_name, tool_name = parts
else:
server_name = list(self.manager.servers.keys())[0] if self.manager.servers else ""
tool_name = full_name
func_args = json.loads(tool_call.function.arguments)
print(f" 🔧 [{server_name}].{tool_name}({json.dumps(func_args, ensure_ascii=False)})")
result = await self.manager.call_tool(server_name, tool_name, func_args)
print(f" 📋 {result[:200]}")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)[:4000]
})
return "达到最大轮次。"
七、生产案例
7.1 IDE MCP Server
# ===== 案例:IDE MCP Server =====
class IDEMCPServer(MCPServer):
"""IDE MCP Server"""
def __init__(self, project_root: str):
super().__init__("ide-server", "1.0.0")
self.project_root = project_root
self._buffer_contents: Dict[str, str] = {}
self._register_ide_tools()
def _register_ide_tools(self):
"""注册 IDE 工具"""
@self.tool(
"search_code",
"搜索代码",
{"type": "object", "properties": {
"pattern": {"type": "string", "description": "搜索模式(正则或文本)"},
"file_pattern": {"type": "string", "description": "文件过滤(如 *.py)"},
"context_lines": {"type": "integer", "description": "上下文行数"}
}, "required": ["pattern"]}
)
def search_code(pattern: str, file_pattern: str = "*", context_lines: int = 2) -> str:
import re, os, fnmatch
results = []
regex = re.compile(pattern)
for root, dirs, files in os.walk(self.project_root):
for f in files:
if not fnmatch.fnmatch(f, file_pattern):
continue
filepath = os.path.join(root, f)
try:
with open(filepath, 'r', encoding='utf-8') as fh:
lines = fh.readlines()
for i, line in enumerate(lines):
if regex.search(line):
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
context = "".join(
f"{j+1:4d} | {lines[j]}" for j in range(start, end)
)
results.append(
f"\n📄 {filepath}:{i+1}\n{context}"
)
except (UnicodeDecodeError, PermissionError):
continue
return "\n".join(results[:20]) if results else "未找到匹配"
@self.tool(
"get_diagnostics",
"获取诊断信息",
{"type": "object", "properties": {
"file_path": {"type": "string", "description": "文件路径"}
}, "required": ["file_path"]}
)
def get_diagnostics(file_path: str) -> str:
# 模拟诊断结果
return json.dumps([
{
"severity": "warning",
"line": 42,
"message": "未使用的变量 'tmp'",
"file": file_path
},
{
"severity": "error",
"line": 15,
"message": "缺少类型注解",
"file": file_path
}
], ensure_ascii=False, indent=2)
@self.tool(
"apply_edit",
"应用编辑",
{"type": "object", "properties": {
"file_path": {"type": "string", "description": "文件路径"},
"old_text": {"type": "string", "description": "要替换的文本"},
"new_text": {"type": "string", "description": "替换为新文本"}
}, "required": ["file_path", "old_text", "new_text"]}
)
def apply_edit(file_path: str, old_text: str, new_text: str) -> str:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if old_text not in content:
return f"未找到匹配文本"
new_content = content.replace(old_text, new_text, 1)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
return f"已编辑 {file_path}"
except Exception as e:
return f"编辑失败: {e}"
@self.tool(
"run_tests",
"运行测试",
{"type": "object", "properties": {
"test_pattern": {"type": "string", "description": "测试匹配模式"}
}, "required": []}
)
def run_tests(test_pattern: str = "*") -> str:
return f"运行测试: {test_pattern}\n✅ 12/12 passed (3.2s)"
# 使用
if __name__ == '__main__':
server = IDEMCPServer(".")
print(f"IDE Server 工具: {list(server.tools.keys())}")
7.2 Git MCP Server
# ===== 案例:Git MCP Server =====
class GitMCPServer(MCPServer):
"""Git MCP Server"""
def __init__(self, repo_path: str):
super().__init__("git-server", "1.0.0")
self.repo_path = repo_path
self._register_git_tools()
def _run_git(self, *args) -> str:
"""执行 git 命令"""
import subprocess
result = subprocess.run(
["git"] + list(args),
cwd=self.repo_path,
capture_output=True,
text=True
)
return result.stdout.strip() or result.stderr.strip()
def _register_git_tools(self):
@self.tool("git_status", "查看 Git 状态")
def git_status() -> str:
return self._run_git("status", "--short")
@self.tool("git_log", "查看提交历史")
def git_log(limit: int = 10) -> str:
return self._run_git("log", f"--{limit}", "--oneline", "--decorate")
@self.tool("git_diff", "查看差异")
def git_diff(file: str = "", staged: bool = False) -> str:
args = ["diff"]
if staged:
args.append("--staged")
if file:
args.append(file)
return self._run_git(*args)
@self.tool("git_branch", "查看/创建分支")
def git_branch(name: str = "", create: bool = False) -> str:
if create and name:
return self._run_git("checkout", "-b", name)
return self._run_git("branch", "-a")
@self.tool("git_commit", "提交更改")
def git_commit(message: str, files: str = ".") -> str:
self._run_git("add", files)
return self._run_git("commit", "-m", message)
# 使用
if __name__ == '__main__':
server = GitMCPServer(".")
print(f"Git Server 工具: {list(server.tools.keys())}")
7.3 Claude Desktop 配置
// claude_desktop_config.json 配置示例
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["C:/path/to/filesystem_server.py"],
"env": {
"ALLOWED_ROOT": "C:/Users/user/projects"
}
},
"database": {
"command": "python",
"args": ["C:/path/to/database_server.py"],
"env": {
"DATABASE_URL": "sqlite:///demo.db"
}
},
"git": {
"command": "python",
"args": ["C:/path/to/git_server.py"],
"env": {
"REPO_PATH": "."
}
},
"remote-api": {
"url": "http://localhost:8080/mcp",
"headers": {
"Authorization": "Bearer token123"
}
}
}
}
# ===== Claude Desktop 配置生成器 =====
import json
import os
def generate_claude_config(
output_path: str,
servers: Dict[str, Dict]
):
"""生成 Claude Desktop 配置"""
config = {"mcpServers": servers}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
print(f"✅ 配置已生成: {output_path}")
# Windows Claude Desktop 配置路径
claude_config_path = os.path.expandvars(
r"%APPDATA%\Claude\claude_desktop_config.json"
)
# 示例配置
if __name__ == '__main__':
generate_claude_config(
claude_config_path,
{
"filesystem": {
"command": "python",
"args": ["filesystem_server.py"],
"env": {"ALLOWED_ROOT": os.getcwd()}
},
"database": {
"command": "python",
"args": ["database_server.py"],
"env": {"DATABASE_URL": "sqlite:///demo.db"}
}
}
)
八、总结
8.1 MCP 架构全景
8.2 MCP 三大能力对比
| 能力 | 用途 | 类比 |
|---|---|---|
| Tools | 执行操作(读写文件/查询DB) | 函数调用 |
| Resources | 提供数据(配置/日志/文件) | GET 接口 |
| Prompts | 提供模板(代码审查/调试) | 提示词模板 |
8.3 最佳实践
| 实践 | 说明 |
|---|---|
| 安全性 | 路径检查、输入验证、权限控制 |
| 错误处理 | 标准化错误码、超时机制 |
| 资源管理 | 连接池、内存限制、清理 |
| 版本管理 | 协议版本协商、向后兼容 |
| 调试 | stderr 日志、请求追踪 |
8.4 MCP vs 其他方案
| 方案 | 标准化 | 生态 | 复杂度 |
|---|---|---|---|
| MCP | ✅ 标准协议 | Claude/Cursor | 中 |
| Function Calling | 模型绑定 | OpenAI/Claude | 低 |
| LangChain Tools | 框架绑定 | Python/JS | 中 |
| OpenAPI | ✅ 通用标准 | 广泛 | 高 |
本文基于 MCP Protocol 2024-11-05 版本编写。MCP 正在快速发展,请关注官方最新文档。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)