前言

💡 痛点:AI 模型是孤岛?每次都要手动传数据?想让 AI 直接操作你的工具和数据?

🎯 解决方案:掌握 MCP(Model Context Protocol) — AI 模型的"USB 接口",标准化工具集成协议。

MCP 是什么?

MCP模式

Server 1

MCP Host

Server 2

Server 3

LLM Client

LLM

传统模式

App 1

LLM

App 2

App 3

MCP 核心概念:

概念 说明 类比
Host 宿主应用(如 IDE、聊天工具) 电脑
Client 与单个 MCP Server 通信 USB 驱动
Server 提供工具/资源/提示词 USB 设备
Protocol JSON-RPC 2.0 通信协议 USB 协议

传输方式:

方式 协议 适用场景
stdio 标准输入输出 本地进程
SSE Server-Sent Events 远程 HTTP
Streamable HTTP HTTP + 流式 新版推荐

一、MCP 协议基础

1.1 JSON-RPC 2.0 通信

# ===== MCP 通信协议 =====

import json
import asyncio
from typing import Dict, Any, Optional

class MCPMessage:
    """MCP 消息封装"""
    
    @staticmethod
    def request(method: str, params: Dict = None, request_id: int = 1) -> str:
        """构建请求"""
        return json.dumps({
            "jsonrpc": "2.0",
            "id": request_id,
            "method": method,
            "params": params or {}
        })
    
    @staticmethod
    def response(result: Any = None, error: Dict = None, request_id: int = 1) -> str:
        """构建响应"""
        msg = {"jsonrpc": "2.0", "id": request_id}
        if error:
            msg["error"] = error
        else:
            msg["result"] = result
        return json.dumps(msg)
    
    @staticmethod
    def notification(method: str, params: Dict = None) -> str:
        """构建通知(无 id,不期望响应)"""
        return json.dumps({
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {}
        })
    
    @staticmethod
    def parse(raw: str) -> Dict:
        """解析消息"""
        return json.loads(raw)

# 消息示例
print("=== 初始化请求 ===")
print(MCPMessage.request(
    method="initialize",
    params={
        "protocolVersion": "2024-11-05",
        "capabilities": {
            "roots": {"listChanged": True}
        },
        "clientInfo": {"name": "my-client", "version": "1.0.0"}
    },
    request_id=1
))

print("\n=== 工具调用请求 ===")
print(MCPMessage.request(
    method="tools/call",
    params={
        "name": "read_file",
        "arguments": {"path": "/tmp/test.txt"}
    },
    request_id=2
))

print("\n=== 工具列表请求 ===")
print(MCPMessage.notification(
    method="notifications/tools/list_changed"
))

1.2 MCP 生命周期

# ===== MCP 生命周期管理 =====

import asyncio
import json
from typing import Dict, Optional
from enum import Enum

class MCPState(Enum):
    """MCP 连接状态"""
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    INITIALIZING = "initializing"
    READY = "ready"
    SHUTDOWN = "shutdown"
    ERROR = "error"

class MCPLifecycle:
    """MCP 生命周期管理"""
    
    def __init__(self, client_name: str = "mcp-client", client_version: str = "1.0.0"):
        self.state = MCPState.DISCONNECTED
        self.client_name = client_name
        self.client_version = client_version
        self.server_info: Dict = {}
        self.server_capabilities: Dict = {}
        self.protocol_version: str = ""
        self.request_counter = 0
    
    def _next_id(self) -> int:
        self.request_counter += 1
        return self.request_counter
    
    def create_initialize(self) -> str:
        """创建初始化请求"""
        self.state = MCPState.CONNECTING
        
        return MCPMessage.request(
            method="initialize",
            params={
                "protocolVersion": "2024-11-05",
                "capabilities": {
                    "roots": {"listChanged": True},
                    "sampling": {}
                },
                "clientInfo": {
                    "name": self.client_name,
                    "version": self.client_version
                }
            },
            request_id=self._next_id()
        )
    
    def handle_init_response(self, response: Dict):
        """处理初始化响应"""
        if "error" in response:
            self.state = MCPState.ERROR
            print(f"初始化失败: {response['error']}")
            return
        
        result = response.get("result", {})
        self.server_info = result.get("serverInfo", {})
        self.server_capabilities = result.get("capabilities", {})
        self.protocol_version = result.get("protocolVersion", "")
        
        print(f"✅ 初始化成功")
        print(f"  Server: {self.server_info.get('name')} v{self.server_info.get('version')}")
        print(f"  Protocol: {self.protocol_version}")
        print(f"  Capabilities: {json.dumps(self.server_capabilities, ensure_ascii=False)}")
    
    def create_initialized(self) -> str:
        """发送 initialized 通知"""
        self.state = MCPState.INITIALIZING
        return MCPMessage.notification(
            method="notifications/initialized"
        )
    
    def set_ready(self):
        """设为就绪状态"""
        self.state = MCPState.READY
        print("🟢 MCP 连接就绪")
    
    def create_shutdown(self) -> str:
        """创建关闭请求"""
        return MCPMessage.request(
            method="shutdown",
            request_id=self._next_id()
        )
    
    def get_status(self) -> Dict:
        """获取状态"""
        return {
            "state": self.state.value,
            "server": self.server_info,
            "protocol": self.protocol_version,
            "capabilities": self.server_capabilities
        }

# 使用
lifecycle = MCPLifecycle("demo-client", "1.0.0")

# 模拟生命周期
init_request = lifecycle.create_initialize()
print(f"\n发送: {init_request}")

# 模拟响应
mock_response = json.loads(MCPMessage.response(
    result={
        "protocolVersion": "2024-11-05",
        "capabilities": {
            "tools": {"listChanged": True},
            "resources": {"subscribe": True},
            "prompts": {"listChanged": True}
        },
        "serverInfo": {"name": "demo-server", "version": "1.0.0"}
    },
    request_id=1
))

lifecycle.handle_init_response(mock_response)
lifecycle.create_initialized()
lifecycle.set_ready()

print(f"\n状态: {json.dumps(lifecycle.get_status(), ensure_ascii=False, indent=2)}")

二、MCP Server 开发

2.1 基础 Server 框架

# ===== MCP Server 基础框架 =====

import sys
import json
import asyncio
from typing import Dict, Any, Callable, List, Optional
from dataclasses import dataclass, field

@dataclass
class MCPTool:
    """MCP 工具定义"""
    name: str
    description: str
    input_schema: Dict = field(default_factory=dict)

@dataclass
class MCPResource:
    """MCP 资源定义"""
    uri: str
    name: str
    description: str
    mime_type: str = "text/plain"

@dataclass
class MCPPrompt:
    """MCP 提示词定义"""
    name: str
    description: str
    arguments: List[Dict] = field(default_factory=list)

class MCPServer:
    """MCP Server 基础实现"""
    
    def __init__(
        self,
        name: str,
        version: str = "1.0.0",
        protocol_version: str = "2024-11-05"
    ):
        self.name = name
        self.version = version
        self.protocol_version = protocol_version
        self.tools: Dict[str, MCPTool] = {}
        self.resources: Dict[str, MCPResource] = {}
        self.prompts: Dict[str, MCPPrompt] = {}
        self.tool_handlers: Dict[str, Callable] = {}
        self.resource_handlers: Dict[str, Callable] = {}
        self.prompt_handlers: Dict[str, Callable] = {}
        
        # 注册内置方法
        self._method_handlers = {
            "initialize": self._handle_initialize,
            "ping": self._handle_ping,
            "tools/list": self._handle_tools_list,
            "tools/call": self._handle_tool_call,
            "resources/list": self._handle_resources_list,
            "resources/read": self._handle_resource_read,
            "prompts/list": self._handle_prompts_list,
            "prompts/get": self._handle_prompt_get,
            "shutdown": self._handle_shutdown,
        }
    
    # ---- 工具注册 ----
    
    def tool(
        self,
        name: str,
        description: str,
        input_schema: Dict = None
    ):
        """工具注册装饰器"""
        def decorator(func: Callable):
            self.tools[name] = MCPTool(
                name=name,
                description=description,
                input_schema=input_schema or self._infer_schema(func)
            )
            self.tool_handlers[name] = func
            return func
        return decorator
    
    def resource(
        self,
        uri: str,
        name: str,
        description: str,
        mime_type: str = "text/plain"
    ):
        """资源注册装饰器"""
        def decorator(func: Callable):
            self.resources[uri] = MCPResource(
                uri=uri, name=name,
                description=description,
                mime_type=mime_type
            )
            self.resource_handlers[uri] = func
            return func
        return decorator
    
    def prompt(
        self,
        name: str,
        description: str,
        arguments: List[Dict] = None
    ):
        """提示词注册装饰器"""
        def decorator(func: Callable):
            self.prompts[name] = MCPPrompt(
                name=name,
                description=description,
                arguments=arguments or []
            )
            self.prompt_handlers[name] = func
            return func
        return decorator
    
    # ---- 方法处理器 ----
    
    async def _handle_initialize(self, params: Dict) -> Dict:
        """处理初始化"""
        return {
            "protocolVersion": self.protocol_version,
            "capabilities": {
                "tools": {"listChanged": bool(self.tools)},
                "resources": {
                    "subscribe": True,
                    "listChanged": bool(self.resources)
                },
                "prompts": {"listChanged": bool(self.prompts)}
            },
            "serverInfo": {
                "name": self.name,
                "version": self.version
            }
        }
    
    async def _handle_ping(self, params: Dict) -> Dict:
        return {}
    
    async def _handle_tools_list(self, params: Dict) -> Dict:
        return {
            "tools": [
                {
                    "name": t.name,
                    "description": t.description,
                    "inputSchema": t.input_schema
                } for t in self.tools.values()
            ]
        }
    
    async def _handle_tool_call(self, params: Dict) -> Dict:
        tool_name = params.get("name", "")
        arguments = params.get("arguments", {})
        
        if tool_name not in self.tool_handlers:
            return {
                "content": [{"type": "text", "text": f"未知工具: {tool_name}"}],
                "isError": True
            }
        
        try:
            handler = self.tool_handlers[tool_name]
            if asyncio.iscoroutinefunction(handler):
                result = await handler(**arguments)
            else:
                result = handler(**arguments)
            
            return {
                "content": [{"type": "text", "text": str(result)}],
                "isError": False
            }
        except Exception as e:
            return {
                "content": [{"type": "text", "text": f"工具执行错误: {e}"}],
                "isError": True
            }
    
    async def _handle_resources_list(self, params: Dict) -> Dict:
        return {
            "resources": [
                {
                    "uri": r.uri,
                    "name": r.name,
                    "description": r.description,
                    "mimeType": r.mime_type
                } for r in self.resources.values()
            ]
        }
    
    async def _handle_resource_read(self, params: Dict) -> Dict:
        uri = params.get("uri", "")
        
        if uri not in self.resource_handlers:
            return {
                "contents": [{"uri": uri, "mimeType": "text/plain", "text": f"未知资源: {uri}"}]
            }
        
        handler = self.resource_handlers[uri]
        if asyncio.iscoroutinefunction(handler):
            content = await handler(uri)
        else:
            content = handler(uri)
        
        return {
            "contents": [{"uri": uri, "mimeType": "text/plain", "text": str(content)}]
        }
    
    async def _handle_prompts_list(self, params: Dict) -> Dict:
        return {
            "prompts": [
                {
                    "name": p.name,
                    "description": p.description,
                    "arguments": p.arguments
                } for p in self.prompts.values()
            ]
        }
    
    async def _handle_prompt_get(self, params: Dict) -> Dict:
        prompt_name = params.get("name", "")
        arguments = params.get("arguments", {})
        
        if prompt_name not in self.prompt_handlers:
            return {"error": f"未知提示词: {prompt_name}"}
        
        handler = self.prompt_handlers[prompt_name]
        if asyncio.iscoroutinefunction(handler):
            messages = await handler(**arguments)
        else:
            messages = handler(**arguments)
        
        return {"messages": messages}
    
    async def _handle_shutdown(self, params: Dict) -> Dict:
        return {}
    
    # ---- 消息处理 ----
    
    async def handle_message(self, raw: str) -> Optional[str]:
        """处理收到的消息"""
        try:
            message = json.loads(raw)
            method = message.get("method", "")
            params = message.get("params", {})
            request_id = message.get("id")
            
            handler = self._method_handlers.get(method)
            if not handler:
                error_response = MCPMessage.response(
                    error={"code": -32601, "message": f"Method not found: {method}"},
                    request_id=request_id or 0
                )
                return error_response
            
            result = await handler(params)
            
            # 通知不需要响应
            if request_id is None:
                return None
            
            return MCPMessage.response(result=result, request_id=request_id)
            
        except json.JSONDecodeError as e:
            return MCPMessage.response(
                error={"code": -32700, "message": f"Parse error: {e}"},
                request_id=0
            )
        except Exception as e:
            return MCPMessage.response(
                error={"code": -32603, "message": f"Internal error: {e}"},
                request_id=0
            )
    
    def _infer_schema(self, func: Callable) -> Dict:
        """从函数推断 JSON Schema"""
        import inspect
        sig = inspect.signature(func)
        type_map = {
            str: "string", int: "integer",
            float: "number", bool: "boolean",
            list: "array", dict: "object"
        }
        
        properties = {}
        required = []
        
        for param_name, param in sig.parameters.items():
            if param_name == "self":
                continue
            ptype = type_map.get(param.annotation, "string")
            properties[param_name] = {"type": ptype, "description": param_name}
            if param.default == inspect.Parameter.empty:
                required.append(param_name)
        
        return {"type": "object", "properties": properties, "required": required}
    
    async def run_stdio(self):
        """通过 stdio 运行"""
        reader = asyncio.StreamReader()
        loop = asyncio.get_event_loop()
        protocol = asyncio.StreamReaderProtocol(reader)
        await loop.connect_read_pipe(lambda: protocol, sys.stdin)
        
        writer_transport, writer_protocol = await loop.connect_write_pipe(
            asyncio.streams.FlowControlMixin, sys.stdout
        )
        writer = asyncio.StreamWriter(
            writer_transport, writer_protocol, reader, loop
        )
        
        print(f"MCP Server '{self.name}' v{self.version} 启动", file=sys.stderr)
        
        while True:
            line = await reader.readline()
            if not line:
                break
            
            line = line.decode().strip()
            if not line:
                continue
            
            response = await self.handle_message(line)
            if response:
                writer.write((response + "\n").encode())
                await writer.drain()

# 使用
if __name__ == '__main__':
    server = MCPServer("demo-server", "1.0.0")
    
    @server.tool("echo", "回显输入文本")
    def echo(text: str) -> str:
        return f"Echo: {text}"
    
    @server.tool("add", "两数相加")
    def add(a: int, b: int) -> int:
        return a + b
    
    print("Server 已创建,注册工具:", list(server.tools.keys()))

2.2 文件系统 MCP Server

# ===== 文件系统 MCP Server =====

import os
import json
import asyncio

class FileSystemMCPServer(MCPServer):
    """文件系统 MCP Server"""
    
    def __init__(self, allowed_root: str = "/"):
        super().__init__("filesystem-server", "1.0.0")
        self.allowed_root = os.path.abspath(allowed_root)
        self._register_filesystem_tools()
        self._register_filesystem_resources()
    
    def _safe_path(self, path: str) -> str:
        """安全路径检查(防止目录穿越)"""
        abs_path = os.path.abspath(path)
        if not abs_path.startswith(self.allowed_root):
            raise ValueError(f"路径超出允许范围: {path}")
        return abs_path
    
    def _register_filesystem_tools(self):
        """注册文件系统工具"""
        
        @self.tool(
            "read_file",
            "读取文件内容",
            {"type": "object", "properties": {
                "path": {"type": "string", "description": "文件路径"},
                "encoding": {"type": "string", "description": "编码(默认 utf-8)"}
            }, "required": ["path"]}
        )
        def read_file(path: str, encoding: str = "utf-8") -> str:
            safe_path = self._safe_path(path)
            with open(safe_path, 'r', encoding=encoding) as f:
                return f.read()
        
        @self.tool(
            "write_file",
            "写入文件",
            {"type": "object", "properties": {
                "path": {"type": "string", "description": "文件路径"},
                "content": {"type": "string", "description": "写入内容"}
            }, "required": ["path", "content"]}
        )
        def write_file(path: str, content: str) -> str:
            safe_path = self._safe_path(path)
            os.makedirs(os.path.dirname(safe_path), exist_ok=True)
            with open(safe_path, 'w', encoding='utf-8') as f:
                f.write(content)
            return f"已写入 {safe_path}{len(content)} 字符)"
        
        @self.tool(
            "list_directory",
            "列出目录内容",
            {"type": "object", "properties": {
                "path": {"type": "string", "description": "目录路径"},
                "pattern": {"type": "string", "description": "过滤模式(可选)"}
            }, "required": ["path"]}
        )
        def list_directory(path: str, pattern: str = "*") -> str:
            safe_path = self._safe_path(path)
            import fnmatch
            entries = []
            for name in os.listdir(safe_path):
                if fnmatch.fnmatch(name, pattern):
                    full = os.path.join(safe_path, name)
                    is_dir = os.path.isdir(full)
                    size = os.path.getsize(full) if not is_dir else 0
                    entries.append(f"{'📁' if is_dir else '📄'} {name} ({size}B)")
            return "\n".join(entries) if entries else "空目录"
        
        @self.tool(
            "search_files",
            "搜索文件",
            {"type": "object", "properties": {
                "path": {"type": "string", "description": "搜索根路径"},
                "pattern": {"type": "string", "description": "文件名匹配模式"},
                "max_depth": {"type": "integer", "description": "最大深度(默认 3)"}
            }, "required": ["path", "pattern"]}
        )
        def search_files(path: str, pattern: str, max_depth: int = 3) -> str:
            safe_path = self._safe_path(path)
            results = []
            
            def walk(current: str, depth: int):
                if depth > max_depth:
                    return
                import fnmatch
                try:
                    for name in os.listdir(current):
                        full = os.path.join(current, name)
                        if os.path.isdir(full):
                            walk(full, depth + 1)
                        elif fnmatch.fnmatch(name, pattern):
                            results.append(full)
                except PermissionError:
                    pass
            
            walk(safe_path, 0)
            return "\n".join(results) if results else "未找到匹配文件"
        
        @self.tool(
            "get_file_info",
            "获取文件信息",
            {"type": "object", "properties": {
                "path": {"type": "string", "description": "文件路径"}
            }, "required": ["path"]}
        )
        def get_file_info(path: str) -> str:
            safe_path = self._safe_path(path)
            stat = os.stat(safe_path)
            return json.dumps({
                "path": safe_path,
                "size": stat.st_size,
                "is_file": os.path.isfile(safe_path),
                "is_dir": os.path.isdir(safe_path),
                "modified": stat.st_mtime,
                "created": stat.st_ctime
            }, ensure_ascii=False, indent=2)
    
    def _register_filesystem_resources(self):
        """注册文件系统资源"""
        
        @self.resource(
            "file:///workspace",
            "workspace",
            "工作区文件列表"
        )
        def workspace_resource(uri: str) -> str:
            entries = os.listdir(self.allowed_root)
            return "\n".join(entries)

# 使用
if __name__ == '__main__':
    server = FileSystemMCPServer(allowed_root=".")
    print("文件系统 MCP Server")
    print(f"允许路径: {server.allowed_root}")
    print(f"工具: {list(server.tools.keys())}")

2.3 数据库 MCP Server

# ===== 数据库 MCP Server =====

class DatabaseMCPServer(MCPServer):
    """数据库 MCP Server"""
    
    def __init__(self, connection_string: str = "sqlite:///demo.db"):
        super().__init__("database-server", "1.0.0")
        self.connection_string = connection_string
        self._register_database_tools()
    
    def _get_connection(self):
        """获取数据库连接(示例用 SQLite)"""
        import sqlite3
        return sqlite3.connect(":memory:")
    
    def _register_database_tools(self):
        """注册数据库工具"""
        
        @self.tool(
            "query",
            "执行 SQL 查询",
            {"type": "object", "properties": {
                "sql": {"type": "string", "description": "SQL 查询语句"},
                "params": {"type": "array", "description": "查询参数(可选)"},
                "limit": {"type": "integer", "description": "返回行数限制(默认 100)"}
            }, "required": ["sql"]}
        )
        def query(sql: str, params: list = None, limit: int = 100) -> str:
            conn = self._get_connection()
            try:
                cursor = conn.cursor()
                # 创建示例表
                cursor.execute("""
                    CREATE TABLE IF NOT EXISTS users (
                        id INTEGER PRIMARY KEY,
                        name TEXT,
                        email TEXT,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                """)
                cursor.execute("INSERT OR IGNORE INTO users (id, name, email) VALUES (1, '张三', 'zhangsan@example.com')")
                cursor.execute("INSERT OR IGNORE INTO users (id, name, email) VALUES (2, '李四', 'lisi@example.com')")
                
                cursor.execute(sql, params or [])
                rows = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]
                
                # 格式化输出
                result = f"列名: {columns}\n"
                result += f"返回 {len(rows)} 行\n\n"
                
                for row in rows[:limit]:
                    row_dict = dict(zip(columns, row))
                    result += json.dumps(row_dict, ensure_ascii=False) + "\n"
                
                return result
            except Exception as e:
                return f"SQL 错误: {e}"
            finally:
                conn.close()
        
        @self.tool(
            "list_tables",
            "列出所有表",
            {"type": "object", "properties": {}, "required": []}
        )
        def list_tables() -> str:
            conn = self._get_connection()
            try:
                cursor = conn.cursor()
                cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
                tables = [row[0] for row in cursor.fetchall()]
                return f"数据表: {', '.join(tables)}" if tables else "没有数据表"
            finally:
                conn.close()
        
        @self.tool(
            "get_table_schema",
            "获取表结构",
            {"type": "object", "properties": {
                "table": {"type": "string", "description": "表名"}
            }, "required": ["table"]}
        )
        def get_table_schema(table: str) -> str:
            conn = self._get_connection()
            try:
                cursor = conn.cursor()
                cursor.execute(f"PRAGMA table_info({table})")
                columns = cursor.fetchall()
                
                schema_lines = [f"表: {table}\n"]
                for col in columns:
                    cid, name, dtype, notnull, default, pk = col
                    schema_lines.append(
                        f"  - {name}: {dtype} "
                        f"{'NOT NULL' if notnull else 'NULL'} "
                        f"{'PK' if pk else ''}"
                    )
                
                return "\n".join(schema_lines)
            except Exception as e:
                return f"错误: {e}"
            finally:
                conn.close()

# 使用
if __name__ == '__main__':
    server = DatabaseMCPServer()
    print("数据库 MCP Server")
    print(f"工具: {list(server.tools.keys())}")

三、MCP Client 开发

3.1 stdio Client

# ===== MCP Client (stdio) =====

import asyncio
import json
import sys
from typing import Dict, List, Optional

class MCPStdioClient:
    """MCP stdio 客户端"""
    
    def __init__(self, server_command: str, server_args: List[str] = None):
        self.server_command = server_command
        self.server_args = server_args or []
        self.process = None
        self.request_id = 0
        self.server_info: Dict = {}
        self.capabilities: Dict = {}
        self._pending: Dict[int, asyncio.Future] = {}
    
    async def connect(self):
        """连接到 MCP Server"""
        self.process = await asyncio.create_subprocess_exec(
            self.server_command, *self.server_args,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        # 启动读取任务
        asyncio.create_task(self._read_loop())
        
        # 初始化
        init_result = await self.send_request("initialize", {
            "protocolVersion": "2024-11-05",
            "capabilities": {"roots": {"listChanged": True}},
            "clientInfo": {"name": "stdio-client", "version": "1.0.0"}
        })
        
        self.server_info = init_result.get("serverInfo", {})
        self.capabilities = init_result.get("capabilities", {})
        
        # 发送 initialized 通知
        await self.send_notification("notifications/initialized")
        
        print(f"✅ 已连接到 {self.server_info.get('name')} v{self.server_info.get('version')}")
    
    async def _read_loop(self):
        """读取 Server 消息"""
        while True:
            try:
                line = await self.process.stdout.readline()
                if not line:
                    break
                
                line = line.decode().strip()
                if not line:
                    continue
                
                message = json.loads(line)
                request_id = message.get("id")
                
                if request_id in self._pending:
                    self._pending[request_id].set_result(message)
                    del self._pending[request_id]
            except Exception as e:
                print(f"读取错误: {e}")
                break
    
    async def send_request(self, method: str, params: Dict = None) -> Dict:
        """发送请求"""
        self.request_id += 1
        request_id = self.request_id
        
        message = json.dumps({
            "jsonrpc": "2.0",
            "id": request_id,
            "method": method,
            "params": params or {}
        })
        
        self._pending[request_id] = asyncio.get_event_loop().create_future()
        self.process.stdin.write((message + "\n").encode())
        await self.process.stdin.drain()
        
        # 等待响应(10s 超时)
        try:
            response = await asyncio.wait_for(self._pending[request_id], timeout=10)
            if "error" in response:
                raise Exception(response["error"])
            return response.get("result", {})
        except asyncio.TimeoutError:
            raise TimeoutError(f"请求超时: {method}")
    
    async def send_notification(self, method: str, params: Dict = None):
        """发送通知"""
        message = json.dumps({
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {}
        })
        self.process.stdin.write((message + "\n").encode())
        await self.process.stdin.drain()
    
    async def list_tools(self) -> List[Dict]:
        """获取工具列表"""
        result = await self.send_request("tools/list")
        return result.get("tools", [])
    
    async def call_tool(self, name: str, arguments: Dict = None) -> str:
        """调用工具"""
        result = await self.send_request("tools/call", {
            "name": name,
            "arguments": arguments or {}
        })
        
        contents = result.get("content", [])
        texts = [c.get("text", "") for c in contents if c.get("type") == "text"]
        return "\n".join(texts)
    
    async def list_resources(self) -> List[Dict]:
        """获取资源列表"""
        result = await self.send_request("resources/list")
        return result.get("resources", [])
    
    async def read_resource(self, uri: str) -> str:
        """读取资源"""
        result = await self.send_request("resources/read", {"uri": uri})
        contents = result.get("contents", [])
        texts = [c.get("text", "") for c in contents if c.get("type") == "text"]
        return "\n".join(texts)
    
    async def disconnect(self):
        """断开连接"""
        await self.send_request("shutdown")
        self.process.stdin.close()
        await self.process.wait()
        print("🔌 已断开连接")

# 使用
if __name__ == '__main__':
    async def main():
        client = MCPStdioClient("python", ["my_server.py"])
        await client.connect()
        
        # 列出工具
        tools = await client.list_tools()
        for tool in tools:
            print(f"  🔧 {tool['name']}: {tool['description']}")
        
        # 调用工具
        result = await client.call_tool("echo", {"text": "Hello MCP!"})
        print(f"结果: {result}")
        
        await client.disconnect()
    
    asyncio.run(main())

3.2 Streamable HTTP Client

# ===== MCP Client (Streamable HTTP) =====

import aiohttp
import json
from typing import Dict, List, Optional

class MCPHttpClient:
    """MCP Streamable HTTP 客户端"""
    
    def __init__(self, base_url: str, headers: Dict = None):
        self.base_url = base_url.rstrip("/")
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
            **(headers or {})
        }
        self.request_id = 0
        self.session_id: Optional[str] = None
    
    async def connect(self):
        """连接到 MCP Server"""
        result = await self.send_request("initialize", {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "http-client", "version": "1.0.0"}
        })
        
        print(f"✅ 已连接: {result.get('serverInfo', {}).get('name')}")
        await self.send_notification("notifications/initialized")
    
    async def send_request(self, method: str, params: Dict = None) -> Dict:
        """发送请求"""
        self.request_id += 1
        
        payload = {
            "jsonrpc": "2.0",
            "id": self.request_id,
            "method": method,
            "params": params or {}
        }
        
        headers = dict(self.headers)
        if self.session_id:
            headers["Mcp-Session-Id"] = self.session_id
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.base_url,
                json=payload,
                headers=headers
            ) as resp:
                # 保存 session ID
                sid = resp.headers.get("Mcp-Session-Id")
                if sid:
                    self.session_id = sid
                
                data = await resp.json()
                if "error" in data:
                    raise Exception(data["error"])
                return data.get("result", {})
    
    async def send_notification(self, method: str, params: Dict = None):
        """发送通知"""
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {}
        }
        
        headers = dict(self.headers)
        if self.session_id:
            headers["Mcp-Session-Id"] = self.session_id
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.base_url,
                json=payload,
                headers=headers
            ) as resp:
                pass
    
    async def list_tools(self) -> List[Dict]:
        result = await self.send_request("tools/list")
        return result.get("tools", [])
    
    async def call_tool(self, name: str, arguments: Dict = None) -> str:
        result = await self.send_request("tools/call", {
            "name": name,
            "arguments": arguments or {}
        })
        contents = result.get("content", [])
        return "\n".join(c.get("text", "") for c in contents if c.get("type") == "text")

# 使用示例
if __name__ == '__main__':
    async def main():
        client = MCPHttpClient("http://localhost:8080/mcp")
        await client.connect()
        
        tools = await client.list_tools()
        print(f"可用工具: {len(tools)} 个")
    
    asyncio.run(main())

四、MCP 与 LLM 集成

4.1 MCP + OpenAI 函数调用

# ===== MCP + OpenAI 集成 =====

from openai import OpenAI
import asyncio

client = OpenAI()

class MCPWithLLM:
    """MCP + LLM 集成"""
    
    def __init__(self, mcp_client: MCPStdioClient, model: str = "gpt-4o"):
        self.mcp = mcp_client
        self.model = model
        self.llm = OpenAI()
    
    async def chat(self, user_message: str, max_tool_calls: int = 5) -> str:
        """对话(自动调用 MCP 工具)"""
        # 获取 MCP 工具列表
        mcp_tools = await self.mcp.list_tools()
        
        # 转换为 OpenAI function schema
        openai_tools = [
            {
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t["description"],
                    "parameters": t.get("inputSchema", {})
                }
            } for t in mcp_tools
        ]
        
        messages = [
            {
                "role": "system",
                "content": "你是一个助手,可以使用 MCP 工具来完成任务。根据需要调用工具,然后基于结果回答用户。"
            },
            {"role": "user", "content": user_message}
        ]
        
        for _ in range(max_tool_calls):
            # 调用 LLM
            response = self.llm.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=openai_tools if openai_tools else None,
                tool_choice="auto"
            )
            
            message = response.choices[0].message
            messages.append(message)
            
            if not message.tool_calls:
                return message.content
            
            # 执行 MCP 工具调用
            for tool_call in message.tool_calls:
                func_name = tool_call.function.name
                func_args = json.loads(tool_call.function.arguments)
                
                print(f"  🔧 MCP 调用: {func_name}({json.dumps(func_args, ensure_ascii=False)})")
                
                # 通过 MCP 调用工具
                result = await self.mcp.call_tool(func_name, func_args)
                print(f"  📋 结果: {result}")
                
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result
                })
        
        return "达到最大工具调用次数。"

# 使用
if __name__ == '__main__':
    async def main():
        mcp_client = MCPStdioClient("python", ["filesystem_server.py"])
        await mcp_client.connect()
        
        assistant = MCPWithLLM(mcp_client)
        result = await assistant.chat("帮我查看当前目录有哪些文件")
        print(f"\n回答: {result}")
        
        await mcp_client.disconnect()
    
    asyncio.run(main())

4.2 MCP + Claude 集成

# ===== MCP + Claude 集成 =====

import anthropic

class MCPWithClaude:
    """MCP + Claude 集成"""
    
    def __init__(self, mcp_client, api_key: str = None):
        self.mcp = mcp_client
        self.client = anthropic.Anthropic(api_key=api_key)
    
    async def chat(self, user_message: str, max_turns: int = 5) -> str:
        """对话"""
        mcp_tools = await self.mcp.list_tools()
        
        # 转换为 Claude tool schema
        claude_tools = [
            {
                "name": t["name"],
                "description": t["description"],
                "input_schema": t.get("inputSchema", {})
            } for t in mcp_tools
        ]
        
        messages = [{"role": "user", "content": user_message}]
        
        for _ in range(max_turns):
            response = self.client.messages.create(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                system="使用 MCP 工具完成任务。",
                tools=claude_tools if claude_tools else [],
                messages=messages
            )
            
            # 处理响应
            assistant_content = []
            stop_reason = response.stop_reason
            
            for block in response.content:
                if block.type == "text":
                    assistant_content.append({"type": "text", "text": block.text})
                elif block.type == "tool_use":
                    assistant_content.append({
                        "type": "tool_use",
                        "id": block.id,
                        "name": block.name,
                        "input": block.input
                    )
            
            messages.append({"role": "assistant", "content": assistant_content})
            
            if stop_reason == "end_turn":
                # 提取文本
                texts = [b.text for b in response.content if b.type == "text"]
                return "\n".join(texts)
            
            if stop_reason == "tool_use":
                # 执行工具
                tool_results = []
                for block in response.content:
                    if block.type == "tool_use":
                        result = await self.mcp.call_tool(block.name, block.input)
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": result
                        })
                
                messages.append({"role": "user", "content": tool_results})
        
        return "达到最大轮次。"

五、MCP 资源与提示词

5.1 资源(Resources)系统

# ===== MCP 资源系统 =====

class ResourceBasedMCPServer(MCPServer):
    """基于资源的 MCP Server"""
    
    def __init__(self, data_dir: str = "./data"):
        super().__init__("resource-server", "1.0.0")
        self.data_dir = data_dir
        self._register_resources()
    
    def _register_resources(self):
        """注册资源"""
        
        @self.resource(
            "file:///config",
            "app-config",
            "应用配置文件",
            mime_type="application/json"
        )
        def config_resource(uri: str) -> str:
            return json.dumps({
                "app_name": "Demo App",
                "version": "1.0",
                "debug": False
            }, indent=2)
        
        @self.resource(
            "file:///logs/recent",
            "recent-logs",
            "最近日志"
        )
        def logs_resource(uri: str) -> str:
            return """[2026-06-08 09:00] Server started
[2026-06-08 09:01] Connection established
[2026-06-08 09:05] Query executed (23ms)
[2026-06-08 09:10] Cache refreshed"""
        
        @self.resource(
            "file:///stats",
            "system-stats",
            "系统统计信息"
        )
        def stats_resource(uri: str) -> str:
            return json.dumps({
                "uptime": "2h 35m",
                "requests": 1247,
                "avg_latency": "45ms",
                "error_rate": "0.3%"
            }, indent=2)
        
        # 动态资源:读取目录下的文件
        import os
        if os.path.exists(self.data_dir):
            for filename in os.listdir(self.data_dir):
                filepath = os.path.join(self.data_dir, filename)
                if os.path.isfile(filepath):
                    self._register_dynamic_resource(filepath)
    
    def _register_dynamic_resource(self, filepath: str):
        """注册动态资源"""
        uri = f"file:///{filepath}"
        name = os.path.basename(filepath)
        
        @self.resource(uri, name, f"文件: {name}")
        def file_resource(uri: str = uri) -> str:
            with open(filepath, 'r', encoding='utf-8') as f:
                return f.read()
        
        self.resource_handlers[uri] = file_resource

# 使用
if __name__ == '__main__':
    server = ResourceBasedMCPServer()
    print("资源:")
    for uri, res in server.resources.items():
        print(f"  📄 {uri}: {res.name} - {res.description}")

5.2 提示词(Prompts)系统

# ===== MCP 提示词系统 =====

class PromptBasedMCPServer(MCPServer):
    """基于提示词的 MCP Server"""
    
    def __init__(self):
        super().__init__("prompt-server", "1.0.0")
        self._register_prompts()
    
    def _register_prompts(self):
        """注册提示词"""
        
        @self.prompt(
            "code_review",
            "代码审查提示词",
            [{"name": "code", "description": "要审查的代码", "required": True}]
        )
        def code_review_prompt(code: str) -> list:
            return [
                {
                    "role": "user",
                    "content": f"""请审查以下代码,关注:
1. 潜在 bug
2. 安全问题
3. 性能优化
4. 代码规范

代码:

{code}

                }
            ]
        
        @self.prompt(
            "sql_optimizer",
            "SQL 优化提示词",
            [{"name": "sql", "description": "SQL 查询语句", "required": True}]
        )
        def sql_optimizer_prompt(sql: str) -> list:
            return [
                {
                    "role": "user",
                    "content": f"""请优化以下 SQL 查询:
1. 分析执行计划
2. 建议索引优化
3. 重写优化版本

SQL: {sql}"""
                }
            ]
        
        @self.prompt(
            "document_summary",
            "文档摘要提示词",
            [{"name": "document", "description": "文档内容", "required": True}]
        )
        def document_summary_prompt(document: str) -> list:
            return [
                {
                    "role": "user",
                    "content": f"请用 3-5 句话总结以下文档的要点:\n\n{document}"
                }
            ]
        
        @self.prompt(
            "debug_assistant",
            "调试助手提示词",
            [
                {"name": "error", "description": "错误信息", "required": True},
                {"name": "context", "description": "上下文代码", "required": False}
            ]
        )
        def debug_assistant_prompt(error: str, context: str = "") -> list:
            content = f"错误信息:\n{error}\n"
            if context:
                content += f"\n相关代码:\n{context}\n"
            content += "\n请分析可能的原因和解决方案。"
            
            return [{"role": "user", "content": content}]

# 使用
if __name__ == '__main__':
    server = PromptBasedMCPServer()
    print("提示词:")
    for name, prompt in server.prompts.items():
        args_desc = ", ".join(a["name"] for a in prompt.arguments)
        print(f"  💬 {name}({args_desc}): {prompt.description}")

六、多 Server 协作

6.1 MCP Server 管理器

# ===== 多 Server 管理 =====

import asyncio
from typing import Dict, List, Optional

class MCPServerManager:
    """管理多个 MCP Server"""
    
    def __init__(self):
        self.servers: Dict[str, MCPServer] = {}
        self.clients: Dict[str, MCPStdioClient] = {}
    
    def register_local(self, name: str, server: MCPServer):
        """注册本地 Server"""
        self.servers[name] = server
    
    async def connect_remote(self, name: str, command: str, args: List[str] = None):
        """连接远程 Server"""
        client = MCPStdioClient(command, args)
        await client.connect()
        self.clients[name] = client
    
    async def list_all_tools(self) -> List[Dict]:
        """列出所有 Server 的工具"""
        all_tools = []
        
        # 本地 Server
        for server_name, server in self.servers.items():
            tools = server.tools
            for tool in tools.values():
                all_tools.append({
                    "source": server_name,
                    "name": tool.name,
                    "description": tool.description,
                    "inputSchema": tool.input_schema
                })
        
        # 远程 Server
        for client_name, client in self.clients.items():
            tools = await client.list_tools()
            for tool in tools:
                all_tools.append({
                    "source": client_name,
                    "name": tool["name"],
                    "description": tool["description"],
                    "inputSchema": tool.get("inputSchema", {})
                })
        
        return all_tools
    
    async def call_tool(self, server_name: str, tool_name: str, arguments: Dict = None) -> str:
        """调用指定 Server 的工具"""
        if server_name in self.servers:
            # 本地 Server
            server = self.servers[server_name]
            handler = server.tool_handlers.get(tool_name)
            if handler:
                result = handler(**(arguments or {}))
                return str(result) if not asyncio.iscoroutinefunction(handler) else await result(**(arguments or {}))
            return f"工具不存在: {tool_name}"
        
        elif server_name in self.clients:
            # 远程 Server
            return await self.clients[server_name].call_tool(tool_name, arguments)
        
        return f"Server 不存在: {server_name}"

# 使用
if __name__ == '__main__':
    async def main():
        manager = MCPServerManager()
        
        # 注册本地 Server
        fs_server = FileSystemMCPServer(".")
        manager.register_local("filesystem", fs_server)
        
        # 列出所有工具
        tools = await manager.list_all_tools()
        for tool in tools:
            print(f"  [{tool['source']}] {tool['name']}: {tool['description']}")
    
    asyncio.run(main())

6.2 统一工具代理

# ===== 统一 MCP 工具代理 =====

class UnifiedMCPAgent:
    """统一 MCP 工具代理(自动路由到正确的 Server)"""
    
    def __init__(self, manager: MCPServerManager, model: str = "gpt-4o"):
        self.manager = manager
        self.model = model
        self.llm = OpenAI()
    
    async def chat(self, message: str, max_turns: int = 10) -> str:
        """对话(自动路由 MCP 工具)"""
        all_tools = await self.manager.list_all_tools()
        
        # 添加 server 来源到工具名避免冲突
        openai_tools = []
        for tool in all_tools:
            qualified_name = f"{tool['source']}__{tool['name']}"
            openai_tools.append({
                "type": "function",
                "function": {
                    "name": qualified_name,
                    "description": f"[{tool['source']}] {tool['description']}",
                    "parameters": tool.get("inputSchema", {})
                }
            })
        
        messages = [
            {
                "role": "system",
                "content": "你是一个助手,可以调用多个 MCP Server 的工具。工具名格式为 server__tool_name。"
            },
            {"role": "user", "content": message}
        ]
        
        for _ in range(max_turns):
            response = self.llm.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=openai_tools if openai_tools else None,
                tool_choice="auto"
            )
            
            msg = response.choices[0].message
            messages.append(msg)
            
            if not msg.tool_calls:
                return msg.content
            
            for tool_call in msg.tool_calls:
                full_name = tool_call.function.name
                parts = full_name.split("__", 1)
                
                if len(parts) == 2:
                    server_name, tool_name = parts
                else:
                    server_name = list(self.manager.servers.keys())[0] if self.manager.servers else ""
                    tool_name = full_name
                
                func_args = json.loads(tool_call.function.arguments)
                
                print(f"  🔧 [{server_name}].{tool_name}({json.dumps(func_args, ensure_ascii=False)})")
                
                result = await self.manager.call_tool(server_name, tool_name, func_args)
                print(f"  📋 {result[:200]}")
                
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": str(result)[:4000]
                })
        
        return "达到最大轮次。"

七、生产案例

7.1 IDE MCP Server

# ===== 案例:IDE MCP Server =====

class IDEMCPServer(MCPServer):
    """IDE MCP Server"""
    
    def __init__(self, project_root: str):
        super().__init__("ide-server", "1.0.0")
        self.project_root = project_root
        self._buffer_contents: Dict[str, str] = {}
        self._register_ide_tools()
    
    def _register_ide_tools(self):
        """注册 IDE 工具"""
        
        @self.tool(
            "search_code",
            "搜索代码",
            {"type": "object", "properties": {
                "pattern": {"type": "string", "description": "搜索模式(正则或文本)"},
                "file_pattern": {"type": "string", "description": "文件过滤(如 *.py)"},
                "context_lines": {"type": "integer", "description": "上下文行数"}
            }, "required": ["pattern"]}
        )
        def search_code(pattern: str, file_pattern: str = "*", context_lines: int = 2) -> str:
            import re, os, fnmatch
            
            results = []
            regex = re.compile(pattern)
            
            for root, dirs, files in os.walk(self.project_root):
                for f in files:
                    if not fnmatch.fnmatch(f, file_pattern):
                        continue
                    filepath = os.path.join(root, f)
                    try:
                        with open(filepath, 'r', encoding='utf-8') as fh:
                            lines = fh.readlines()
                            for i, line in enumerate(lines):
                                if regex.search(line):
                                    start = max(0, i - context_lines)
                                    end = min(len(lines), i + context_lines + 1)
                                    context = "".join(
                                        f"{j+1:4d} | {lines[j]}" for j in range(start, end)
                                    )
                                    results.append(
                                        f"\n📄 {filepath}:{i+1}\n{context}"
                                    )
                    except (UnicodeDecodeError, PermissionError):
                        continue
            
            return "\n".join(results[:20]) if results else "未找到匹配"
        
        @self.tool(
            "get_diagnostics",
            "获取诊断信息",
            {"type": "object", "properties": {
                "file_path": {"type": "string", "description": "文件路径"}
            }, "required": ["file_path"]}
        )
        def get_diagnostics(file_path: str) -> str:
            # 模拟诊断结果
            return json.dumps([
                {
                    "severity": "warning",
                    "line": 42,
                    "message": "未使用的变量 'tmp'",
                    "file": file_path
                },
                {
                    "severity": "error",
                    "line": 15,
                    "message": "缺少类型注解",
                    "file": file_path
                }
            ], ensure_ascii=False, indent=2)
        
        @self.tool(
            "apply_edit",
            "应用编辑",
            {"type": "object", "properties": {
                "file_path": {"type": "string", "description": "文件路径"},
                "old_text": {"type": "string", "description": "要替换的文本"},
                "new_text": {"type": "string", "description": "替换为新文本"}
            }, "required": ["file_path", "old_text", "new_text"]}
        )
        def apply_edit(file_path: str, old_text: str, new_text: str) -> str:
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                if old_text not in content:
                    return f"未找到匹配文本"
                
                new_content = content.replace(old_text, new_text, 1)
                
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(new_content)
                
                return f"已编辑 {file_path}"
            except Exception as e:
                return f"编辑失败: {e}"
        
        @self.tool(
            "run_tests",
            "运行测试",
            {"type": "object", "properties": {
                "test_pattern": {"type": "string", "description": "测试匹配模式"}
            }, "required": []}
        )
        def run_tests(test_pattern: str = "*") -> str:
            return f"运行测试: {test_pattern}\n✅ 12/12 passed (3.2s)"

# 使用
if __name__ == '__main__':
    server = IDEMCPServer(".")
    print(f"IDE Server 工具: {list(server.tools.keys())}")

7.2 Git MCP Server

# ===== 案例:Git MCP Server =====

class GitMCPServer(MCPServer):
    """Git MCP Server"""
    
    def __init__(self, repo_path: str):
        super().__init__("git-server", "1.0.0")
        self.repo_path = repo_path
        self._register_git_tools()
    
    def _run_git(self, *args) -> str:
        """执行 git 命令"""
        import subprocess
        result = subprocess.run(
            ["git"] + list(args),
            cwd=self.repo_path,
            capture_output=True,
            text=True
        )
        return result.stdout.strip() or result.stderr.strip()
    
    def _register_git_tools(self):
        
        @self.tool("git_status", "查看 Git 状态")
        def git_status() -> str:
            return self._run_git("status", "--short")
        
        @self.tool("git_log", "查看提交历史")
        def git_log(limit: int = 10) -> str:
            return self._run_git("log", f"--{limit}", "--oneline", "--decorate")
        
        @self.tool("git_diff", "查看差异")
        def git_diff(file: str = "", staged: bool = False) -> str:
            args = ["diff"]
            if staged:
                args.append("--staged")
            if file:
                args.append(file)
            return self._run_git(*args)
        
        @self.tool("git_branch", "查看/创建分支")
        def git_branch(name: str = "", create: bool = False) -> str:
            if create and name:
                return self._run_git("checkout", "-b", name)
            return self._run_git("branch", "-a")
        
        @self.tool("git_commit", "提交更改")
        def git_commit(message: str, files: str = ".") -> str:
            self._run_git("add", files)
            return self._run_git("commit", "-m", message)

# 使用
if __name__ == '__main__':
    server = GitMCPServer(".")
    print(f"Git Server 工具: {list(server.tools.keys())}")

7.3 Claude Desktop 配置

// claude_desktop_config.json 配置示例
{
    "mcpServers": {
        "filesystem": {
            "command": "python",
            "args": ["C:/path/to/filesystem_server.py"],
            "env": {
                "ALLOWED_ROOT": "C:/Users/user/projects"
            }
        },
        "database": {
            "command": "python",
            "args": ["C:/path/to/database_server.py"],
            "env": {
                "DATABASE_URL": "sqlite:///demo.db"
            }
        },
        "git": {
            "command": "python",
            "args": ["C:/path/to/git_server.py"],
            "env": {
                "REPO_PATH": "."
            }
        },
        "remote-api": {
            "url": "http://localhost:8080/mcp",
            "headers": {
                "Authorization": "Bearer token123"
            }
        }
    }
}
# ===== Claude Desktop 配置生成器 =====

import json
import os

def generate_claude_config(
    output_path: str,
    servers: Dict[str, Dict]
):
    """生成 Claude Desktop 配置"""
    config = {"mcpServers": servers}
    
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(config, f, ensure_ascii=False, indent=4)
    
    print(f"✅ 配置已生成: {output_path}")

# Windows Claude Desktop 配置路径
claude_config_path = os.path.expandvars(
    r"%APPDATA%\Claude\claude_desktop_config.json"
)

# 示例配置
if __name__ == '__main__':
    generate_claude_config(
        claude_config_path,
        {
            "filesystem": {
                "command": "python",
                "args": ["filesystem_server.py"],
                "env": {"ALLOWED_ROOT": os.getcwd()}
            },
            "database": {
                "command": "python",
                "args": ["database_server.py"],
                "env": {"DATABASE_URL": "sqlite:///demo.db"}
            }
        }
    )

八、总结

8.1 MCP 架构全景

能力

Server层

Host层

LLM Client

MCP Host

Client 1

Client 2

Client 3

File Server

DB Server

Git Server

read_file / write_file

list_directory

query / list_tables

git_status / git_log

8.2 MCP 三大能力对比

能力 用途 类比
Tools 执行操作(读写文件/查询DB) 函数调用
Resources 提供数据(配置/日志/文件) GET 接口
Prompts 提供模板(代码审查/调试) 提示词模板

8.3 最佳实践

实践 说明
安全性 路径检查、输入验证、权限控制
错误处理 标准化错误码、超时机制
资源管理 连接池、内存限制、清理
版本管理 协议版本协商、向后兼容
调试 stderr 日志、请求追踪

8.4 MCP vs 其他方案

方案 标准化 生态 复杂度
MCP ✅ 标准协议 Claude/Cursor
Function Calling 模型绑定 OpenAI/Claude
LangChain Tools 框架绑定 Python/JS
OpenAPI ✅ 通用标准 广泛

本文基于 MCP Protocol 2024-11-05 版本编写。MCP 正在快速发展,请关注官方最新文档。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐