MCP技术深度指南:从开发到测试的完整实战手册

“给AI装上双手,让它真正能干活”——这就是MCP存在的意义


##引言:为什么MCP值得你认真对待?

MCP是什么?一句话说清楚

想象一下,在USB标准出现之前,每个外设都需要专门的驱动和接口——打印机用一种线,鼠标用另一种,键盘又不一样。混乱吧?MCP(Model Context Protocol)就是AI领域的"USB标准"。

它由Anthropic公司提出,核心目标是解决AI助手的"功能孤岛"问题。简单说,就是让Claude、Cursor这些AI能通过统一的方式调用外部工具、数据源和服务。

MCP能干什么?

  • Tools(工具):让AI能执行操作,比如查询数据库、调用API
  • Resources(资源):让AI能读取数据,比如文件内容、数据库记录
  • Prompts(提示模板):预定义的交互模式,提高效率

为什么选择FastMCP?

FastMCP是Anthropic官方维护的Python框架,它的核心价值就一个字:。用最Pythonic的方式,几行代码就能搭建一个功能完整的MCP服务器。
仓库地址:https://github.com/PrefectHQ/fastmcp


开发实战:基于FastMCP从零开始

环境搭建:工欲善其事

方式一:传统pip方式
# 创建虚拟环境(强烈推荐)
python -m venv mcp-env

# 激活环境
# Windows PowerShell:
.\mcp-env\Scripts\Activate.ps1
# Linux/Mac:
source mcp-env/bin/activate

# 安装MCP
pip install "mcp[cli]"
方式二:现代uv方式(推荐)
# 安装uv(如果还没有)
pip install uv

# 初始化项目
uv init mcp-server-demo
cd mcp-server-demo

# 添加依赖
uv add "mcp[cli]"
uv add fastmcp

# 激活环境
source .venv/bin/activate  # Linux/Mac
# 或
.\venv\Scripts\Activate.ps1  # Windows

为什么推荐uv? 因为它比pip快10-100倍,而且依赖管理更优雅。

第一个MCP服务器:Hello World

创建文件 hello_server.py

from mcp.server.fastmcp import FastMCP

# 初始化服务器
mcp = FastMCP("我的第一个MCP服务器")

# 定义一个工具
@mcp.tool()
def greet(name: str) -> str:
    """生成个性化问候语
    
    Args:
        name: 用户名称
        
    Returns:
        问候语字符串
    """
    return f"🎉 你好,{name}!很高兴认识你!"

# 定义一个资源
@mcp.resource("config://settings")
def get_settings() -> dict:
    """获取应用配置"""
    return {
        "theme": "dark",
        "language": "zh-CN",
        "version": "1.0.0"
    }

# 定义一个提示模板
@mcp.prompt()
def code_review_prompt(code: str) -> str:
    """代码审查提示模板"""
    return f"""请审查以下代码并指出潜在问题:

{code}


请从以下角度分析:
1. 代码风格
2. 潜在bug
3. 性能问题
4. 安全隐患
"""

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

代码解析:

  1. @mcp.tool() 装饰器把普通函数变成MCP工具,函数的docstring会被AI读取理解
  2. @mcp.resource() 定义资源,通过URI访问
  3. @mcp.prompt() 定义提示模板,方便复用
  4. transport="stdio" 表示使用标准输入输出通信

进阶示例:建材产品查询服务

创建文件 product_server.py

from mcp.server.fastmcp import FastMCP, Context
import asyncio

mcp = FastMCP("建材产品查询服务")

PRODUCT_CATEGORIES = [
    "石材", "陶瓷", "门墙柜一体", "家具", "地板",
    "楼梯", "卫浴", "家电", "灯饰", "墙板", "门窗", "金属玻璃"
]

PRODUCTS_DB = {
    "16": {"id": "16", "name": "进口大理石", "category": "石材", "price": 580, "unit": "平方米"},
    "18": {"id": "18", "name": "高端瓷砖", "category": "陶瓷", "price": 120, "unit": "平方米"},
    "20": {"id": "20", "name": "实木地板", "category": "地板", "price": 320, "unit": "平方米"},
    "22": {"id": "22", "name": "智能马桶", "category": "卫浴", "price": 2800, "unit": "套"},
    "24": {"id": "24", "name": "铝合金门窗", "category": "门窗", "price": 450, "unit": "平方米"},
}

@mcp.tool()
async def search_products(query: str, limit: int = 100, ctx: Context = None) -> list:
    """根据查询条件检索建材产品
    
    检索范围限定于以下建材产品大类:
    石材、陶瓷、门墙柜一体、家具、地板、楼梯、卫浴、家电、灯饰、墙板、门窗、金属玻璃
    
    Args:
        query: 搜索关键词,如"大理石"、"瓷砖"、"地板"等
        limit: 返回结果数量限制,默认100
        
    Returns:
        匹配的产品列表
    """
    if ctx:
        await ctx.info(f"正在搜索产品: {query}")
    
    await asyncio.sleep(0.3)
    
    results = []
    for product in PRODUCTS_DB.values():
        if query.lower() in product["name"].lower() or query.lower() in product["category"].lower():
            results.append(product)
    
    if ctx:
        await ctx.report_progress(100, 100, f"找到 {len(results)} 个产品")
    
    return results[:limit]

@mcp.tool()
async def get_products_by_ids(ids: list[str], ctx: Context = None) -> list:
    """根据ID列表获取商品详细信息
    
    Args:
        ids: 商品ID列表,如 ["16", "18"]
        
    Returns:
        商品详细信息列表
    """
    if ctx:
        await ctx.info(f"查询产品详情: {ids}")
    
    results = []
    for id in ids:
        if id in PRODUCTS_DB:
            results.append(PRODUCTS_DB[id])
    
    return results

@mcp.tool()
def list_categories() -> list:
    """获取支持的建材品类列表"""
    return PRODUCT_CATEGORIES

@mcp.resource("product://categories")
def get_categories_resource() -> str:
    """建材品类资源"""
    return "支持的品类:" + "、".join(PRODUCT_CATEGORIES)

@mcp.resource("product://{product_id}")
def get_product_resource(product_id: str) -> dict:
    """单个产品资源"""
    return PRODUCTS_DB.get(product_id, {"error": "产品不存在"})

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

新知识点:

  1. Context 对象提供日志记录、进度报告等功能
  2. async 函数支持异步操作
  3. ctx.report_progress() 可以报告任务进度
  4. streamable-http 传输方式支持远程访问
  5. URI模板 product://{product_id} 支持动态参数

配置管理:让服务器更灵活

创建配置文件 config.py

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class ServerConfig:
    """服务器配置"""
    name: str = "MCP Server"
    version: str = "1.0.0"
    transport: str = "stdio"
    host: str = "0.0.0.0"
    port: int = 8000
    debug: bool = False
    
    @classmethod
    def from_env(cls) -> "ServerConfig":
        """从环境变量加载配置"""
        return cls(
            name=os.getenv("MCP_SERVER_NAME", "MCP Server"),
            version=os.getenv("MCP_SERVER_VERSION", "1.0.0"),
            transport=os.getenv("MCP_TRANSPORT", "stdio"),
            host=os.getenv("MCP_HOST", "0.0.0.0"),
            port=int(os.getenv("MCP_PORT", "8000")),
            debug=os.getenv("MCP_DEBUG", "false").lower() == "true",
        )

# 使用示例
config = ServerConfig.from_env()
mcp = FastMCP(config.name)

模块化设计:大型项目必备

项目结构:

george_product_service/
├── __init__.py
├── main.py              # 入口文件
├── config.py            # 配置管理
├── tools/
│   ├── __init__.py
│   ├── product.py       # 产品相关工具
│   └── category.py      # 品类相关工具
├── resources/
│   ├── __init__.py
│   └── products.py      # 产品资源
└── prompts/
    ├── __init__.py
    └── templates.py     # 提示模板

tools/product.py

from mcp.server.fastmcp import FastMCP

def register_product_tools(mcp: FastMCP):
    """注册产品相关工具"""
    
    @mcp.tool()
    async def search_products(query: str, limit: int = 100) -> list:
        """搜索建材产品"""
        results = []
        for product in PRODUCTS_DB.values():
            if query.lower() in product["name"].lower():
                results.append(product)
        return results[:limit]
    
    @mcp.tool()
    async def get_products_by_ids(ids: list[str]) -> list:
        """根据ID获取产品详情"""
        return [PRODUCTS_DB.get(id) for id in ids if id in PRODUCTS_DB]

main.py

from mcp.server.fastmcp import FastMCP
from tools.product import register_product_tools
from tools.category import register_category_tools
from resources.products import register_product_resources
from prompts.templates import register_prompts

mcp = FastMCP("建材产品查询服务")

register_product_tools(mcp)
register_category_tools(mcp)
register_product_resources(mcp)
register_prompts(mcp)

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

传输方式选择

传输方式 适用场景 优点 缺点
stdio 本地开发、Claude Desktop 简单、无需网络配置 仅限本地
sse 远程服务、Web应用 支持远程访问 需要HTTP服务器
streamable-http 生产环境、云部署 高性能、可扩展 配置稍复杂

stdio模式启动:

mcp.run(transport="stdio")

HTTP模式启动:

mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

客户端配置:让AI认识你的服务器

CoPaw配置

编辑配置:

{
  "key": "my-tesat-mcp",
  "name": "my-tesat-mcp",
  "description": "",
  "enabled": true,
  "transport": "streamable_http",
  "url": "http://127.0.0.1:8000/mcp",
  "headers": {},
  "command": "",
  "args": [],
  "env": {},
  "cwd": ""
}

在这里插入图片描述

Cursor配置

在项目根目录创建 .cursor/mcp.json

{
  "mcpServers": {
    "local-tools": {
      "url": "http://127.0.0.1:8000/mcp"
    }
  }
}

测试策略:MCP Inspector完全指南

什么是MCP Inspector?

MCP Inspector是官方提供的可视化调试工具,让你能直观地测试MCP服务器的各项功能。简单说,就是给你的MCP服务器装了个"体检仪"。
仓库地址:https://github.com/modelcontextprotocol/inspector

安装与启动

最简单的方式(推荐):

npx @modelcontextprotocol/inspector

启动后访问 http://localhost:5173(或控制台显示的地址)。

自定义端口:

CLIENT_PORT=8080 SERVER_PORT=3000 npx @modelcontextprotocol/inspector

Docker方式:

docker run --rm --network host -p 6274:6274 -p 6277:6277 \
  ghcr.io/modelcontextprotocol/inspector:latest

源码安装方式:
下载源码

#进入根目录
cd inspector
npm install
npm run dev

连接MCP服务器

方式一:STDIO模式(本地开发)
# 直接指定Python脚本
npx @modelcontextprotocol/inspector python product_server.py

# 使用uv运行
npx @modelcontextprotocol/inspector \
  uv --directory /项目路径 run main.py

# 传递环境变量
npx @modelcontextprotocol/inspector \
  -e API_KEY=your-key \
  -e DEBUG=true \
  python server.py
方式二:SSE/HTTP模式(远程服务)
# 连接远程服务器
npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp

# 带认证的连接
npx @modelcontextprotocol/inspector --url https://api.example.com/mcp
方式三:普通启动后图形化连接

在这里插入图片描述
根据mcp启动方式,选择类型,输入url地址后,点击 Connect
在这里插入图片描述
MCP服务正常响应的
在这里插入图片描述
具体测试某个tool
在这里插入图片描述
在这里插入图片描述
Run Tool 后,下方会有请求回来的数据

Inspector核心功能详解

1. 服务器连接面板
  • 显示协议版本、服务器名称、连接状态
  • 支持手动重连和断开
  • 配置命令行参数和环境变量
2. Tools标签页(工具测试)

这是最常用的功能:

  1. 查看工具列表:显示所有注册的工具
  2. 查看Schema:每个工具的参数类型和描述
  3. 执行测试:输入参数,点击Run执行
  4. 查看结果:实时显示返回值和执行状态

测试示例:

// 工具:search_products
// 输入参数:
{
  "query": "大理石",
  "limit": 10
}

// 输出结果:
{
  "content": [
    {
      "type": "text",
      "text": "[{\"id\": \"16\", \"name\": \"进口大理石\", \"category\": \"石材\", \"price\": 580, \"unit\": \"平方米\"}]"
    }
  ]
}
3. Resources标签页(资源管理)
  • 浏览所有可用资源
  • 查看资源URI和MIME类型
  • 测试资源读取功能
  • 支持资源订阅测试
4. Prompts标签页(提示测试)
  • 查看提示模板列表
  • 测试不同参数组合
  • 预览生成的消息
5. Notifications面板(通知监控)
  • 实时显示服务器日志
  • 记录所有消息交互
  • 便于调试问题

测试用例设计

下面以 my-tesat-mcp MCP为例,演示完整的测试流程。这是一个建材产品查询服务,提供产品搜索和详情查询功能。

基础功能测试
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def test_product_service():
    """测试建材产品服务MCP"""
    
    async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            
            print("✅ MCP连接成功")
            
            result = await session.call_tool("search_products", {"query": "大理石", "limit": 10})
            products = result.content[0].text if result.content else []
            print(f"✅ 产品搜索测试通过,找到 {len(products)} 个产品")
            
            if products and len(products) > 0:
                product_id = products[0].get("id", "16")
                result = await session.call_tool("get_products_by_ids", {"ids": [product_id]})
                print(f"✅ 产品详情查询测试通过")
            
            result = await session.call_tool("search_products", {"query": "不存在的产品xyz", "limit": 5})
            print(f"✅ 空结果处理测试通过")

asyncio.run(test_product_service())
边界条件测试
async def test_edge_cases():
    """测试边界条件"""
    
    async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            
            try:
                await session.call_tool("search_products", {"query": ""})
            except Exception as e:
                print(f"✅ 空查询正确处理: {type(e).__name__}")
            
            result = await session.call_tool("search_products", {"query": "瓷砖@#$%^&*()", "limit": 5})
            print(f"✅ 特殊字符处理正常")
            
            result = await session.call_tool("search_products", {"query": "地板", "limit": 1000})
            print(f"✅ 大limit值处理正常")
            
            try:
                await session.call_tool("get_products_by_ids", {"ids": ["invalid_id"]})
            except Exception as e:
                print(f"✅ 无效ID正确处理: {type(e).__name__}")
            
            try:
                await session.call_tool("unknown_tool", {})
            except Exception as e:
                print(f"✅ 未知工具正确拒绝: {type(e).__name__}")

asyncio.run(test_edge_cases())
性能测试
import time
import asyncio

async def test_performance():
    """性能测试"""
    
    async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            
            start = time.time()
            await session.call_tool("search_products", {"query": "石材", "limit": 20})
            latency = time.time() - start
            print(f"单次搜索延迟: {latency:.3f}s")
            
            start = time.time()
            tasks = [
                session.call_tool("search_products", {"query": category, "limit": 10})
                for category in ["石材", "陶瓷", "地板", "卫浴", "门窗"]
            ]
            results = await asyncio.gather(*tasks)
            total_time = time.time() - start
            print(f"并发5次搜索总时间: {total_time:.3f}s")
            print(f"平均每次: {total_time/5:.3f}s")
            
            start = time.time()
            await session.call_tool("get_products_by_ids", {"ids": ["16", "18", "20", "22", "24"]})
            latency = time.time() - start
            print(f"批量ID查询延迟: {latency:.3f}s")

asyncio.run(test_performance())

自动化测试流程

创建测试文件 test_product_service.py

import pytest
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

@pytest.fixture
async def session():
    """测试会话fixture"""
    async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            yield session

@pytest.mark.asyncio
async def test_search_products(session):
    """测试产品搜索"""
    result = await session.call_tool("search_products", {"query": "大理石", "limit": 10})
    assert result.content is not None
    print("✅ 产品搜索测试通过")

@pytest.mark.asyncio
async def test_get_products_by_ids(session):
    """测试ID查询"""
    result = await session.call_tool("get_products_by_ids", {"ids": ["16", "18"]})
    assert result.content is not None
    print("✅ ID查询测试通过")

@pytest.mark.asyncio
async def test_search_categories(session):
    """测试各品类搜索"""
    categories = ["石材", "陶瓷", "门墙柜一体", "家具", "地板", "楼梯", "卫浴", "家电", "灯饰", "墙板", "门窗", "金属玻璃"]
    for category in categories:
        result = await session.call_tool("search_products", {"query": category, "limit": 5})
        assert result.content is not None
    print(f"✅ {len(categories)}个品类搜索测试通过")

@pytest.mark.asyncio
async def test_empty_result(session):
    """测试空结果"""
    result = await session.call_tool("search_products", {"query": "不存在的xyz产品", "limit": 5})
    assert result.content is not None
    print("✅ 空结果处理测试通过")

运行测试:

pytest test_product_service.py -v

Inspector测试实操

启动Inspector后,连接到 http://127.0.0.1:8000/mcp

测试 search_products 工具:

{
  "query": "大理石",
  "limit": 10
}

测试 get_products_by_ids 工具:

{
  "ids": ["16", "18"]
}

测试结果分析

关注指标:

  1. 响应时间:正常应在100ms-1s之间
  2. 成功率:应达到99%以上
  3. 错误类型:关注错误分布
  4. 资源消耗:内存、CPU使用情况

常见问题排查:

现象 可能原因 排查方法
连接超时 服务器未启动、端口被占用 检查进程、端口
工具未显示 装饰器配置错误 检查docstring格式
参数验证失败 类型不匹配 检查参数类型定义
返回为空 函数逻辑问题 添加日志调试

⚠️ 坑点描述:那些年踩过的坑

坑一:Windows路径问题

问题描述:

Error executing MCP tool: Not connected

产生原因:

Windows使用反斜杠 \ 作为路径分隔符,而MCP配置中可能混用了正斜杠 /

解决方案:

// ❌ 错误写法
{
  "command": "python",
  "args": ["C:/Users/name/server.py"]
}

// ✅ 正确写法(使用双反斜杠或正斜杠)
{
  "command": "python",
  "args": ["C:\\Users\\name\\server.py"]
}

// ✅ 更好的写法(使用uv)
{
  "command": "uv",
  "args": [
    "--directory",
    "C:\\Users\\name\\project",
    "run",
    "server.py"
  ]
}

最佳实践:

# 使用pathlib处理路径
from pathlib import Path

# 获取当前文件目录
CURRENT_DIR = Path(__file__).parent
CONFIG_FILE = CURRENT_DIR / "config.json"

# 跨平台路径
data_path = Path.home() / "data" / "mcp_data"

坑二:装饰器顺序问题

问题描述:

工具函数在Inspector中不显示,或者参数类型丢失。

产生原因:

装饰器顺序错误,或者使用了不兼容的装饰器。

解决方案:

# ❌ 错误:装饰器顺序不对
@mcp.tool()
@cache_result  # 自定义装饰器
def my_tool(x: int) -> str:
    return str(x)

# ✅ 正确:确保@mcp.tool()在最外层
@cache_result
@mcp.tool()
def my_tool(x: int) -> str:
    return str(x)

# ✅ 或者使用functools.wraps
from functools import wraps

def cache_result(func):
    @wraps(func)  # 保留原函数的元信息
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

@mcp.tool()
@cache_result
def my_tool(x: int) -> str:
    return str(x)

坑三:异步函数处理不当

问题描述:

RuntimeWarning: coroutine was never awaited

产生原因:

在同步上下文中调用了异步函数,或者混用了async/await。

解决方案:

# ❌ 错误:同步函数中调用异步操作
@mcp.tool()
def bad_tool(url: str) -> str:
    import aiohttp
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
    return fetch()  # 返回了coroutine对象

# ✅ 正确:使用async def
@mcp.tool()
async def good_tool(url: str) -> str:
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# ✅ 或者使用同步库
@mcp.tool()
def sync_tool(url: str) -> str:
    import requests
    response = requests.get(url)
    return response.text

坑四:类型注解缺失或错误

问题描述:

AI无法正确理解参数类型,导致调用失败。

产生原因:

缺少类型注解,或使用了复杂类型但未正确声明。

解决方案:

from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field

# ❌ 错误:没有类型注解
@mcp.tool()
def search(query):  # AI不知道query是什么类型
    return f"搜索: {query}"

# ✅ 正确:添加类型注解
@mcp.tool()
def search(query: str) -> str:
    """搜索功能
    
    Args:
        query: 搜索关键词
        
    Returns:
        搜索结果
    """
    return f"搜索: {query}"

# ✅ 更好:使用Pydantic定义复杂类型
class SearchParams(BaseModel):
    """搜索参数"""
    query: str = Field(description="搜索关键词")
    limit: int = Field(default=10, description="返回结果数量")
    filters: Optional[Dict[str, Any]] = Field(default=None, description="过滤条件")

@mcp.tool()
def advanced_search(params: SearchParams) -> List[str]:
    """高级搜索"""
    return [f"结果{i}" for i in range(params.limit)]

坑五:资源URI格式错误

问题描述:

Error: Resource not found

产生原因:

URI格式不规范,或者URI模板参数处理错误。

解决方案:

# ❌ 错误:URI格式不规范
@mcp.resource("myfile.txt")  # 缺少协议前缀
def get_file():
    return "content"

# ✅ 正确:使用标准URI格式
@mcp.resource("file:///path/to/myfile.txt")
def get_file():
    return "content"

# ✅ 使用URI模板
@mcp.resource("user://{user_id}/profile")
def get_user_profile(user_id: str) -> dict:
    return {"id": user_id, "name": "用户名"}

# ✅ 常见URI格式示例
# file:///absolute/path/to/file
# config://app_settings
# database://table_name
# http://api.example.com/data

坑六:Context对象使用错误

问题描述:

TypeError: Context object is not available

产生原因:

Context对象需要通过类型注解注入,不能直接实例化。

解决方案:

from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("test")

# ❌ 错误:直接创建Context
@mcp.tool()
def bad_tool(ctx: Context):  # 没有通过参数注入
    ctx.info("这条消息不会显示")

# ✅ 正确:通过类型注解自动注入
@mcp.tool()
async def good_tool(name: str, ctx: Context) -> str:
    """正确使用Context"""
    await ctx.info(f"处理请求: {name}")
    await ctx.report_progress(50, 100, "处理中...")
    return f"Hello, {name}!"

# ✅ Context的常用方法
@mcp.tool()
async def demo_context(ctx: Context) -> str:
    # 日志记录
    await ctx.info("信息日志")
    await ctx.warning("警告日志")
    await ctx.error("错误日志")
    
    # 进度报告
    await ctx.report_progress(25, 100, "第一步完成")
    
    # 读取资源
    # content = await ctx.read_resource("file:///path/to/file")
    
    return "完成"

坑七:错误处理不当

问题描述:

工具执行失败时,AI收到的是无意义的错误信息。

产生原因:

没有正确捕获和处理异常。

解决方案:

from mcp.server.fastmcp import FastMCP
from mcp.shared.exceptions import McpError

mcp = FastMCP("error-demo")

# ❌ 错误:没有错误处理
@mcp.tool()
def divide_bad(a: int, b: int) -> float:
    return a / b  # b=0时会崩溃

# ✅ 正确:捕获并返回友好错误
@mcp.tool()
def divide_good(a: int, b: int) -> str:
    """安全除法
    
    Args:
        a: 被除数
        b: 除数
        
    Returns:
        计算结果或错误信息
    """
    try:
        if b == 0:
            return "错误:除数不能为零"
        result = a / b
        return f"{a} ÷ {b} = {result}"
    except Exception as e:
        return f"计算错误:{str(e)}"

# ✅ 更好:使用自定义异常
class ToolError(Exception):
    """工具执行错误"""
    pass

@mcp.tool()
async def fetch_url(url: str, ctx: Context) -> str:
    """获取网页内容"""
    import aiohttp
    
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=10) as response:
                if response.status != 200:
                    raise ToolError(f"HTTP错误: {response.status}")
                return await response.text()
    except aiohttp.ClientError as e:
        await ctx.error(f"网络请求失败: {e}")
        return f"请求失败: {str(e)}"
    except ToolError as e:
        await ctx.error(str(e))
        return str(e)

坑八:性能瓶颈

问题描述:

工具响应缓慢,并发处理能力差。

产生原因:

同步阻塞操作、资源未复用、缺少缓存。

解决方案:

import asyncio
from functools import lru_cache
from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("performance-demo")

# ✅ 使用缓存
@lru_cache(maxsize=100)
def expensive_computation(n: int) -> int:
    """耗时计算,使用缓存"""
    # 模拟耗时操作
    import time
    time.sleep(1)
    return n * n

@mcp.tool()
def cached_tool(n: int) -> str:
    """使用缓存的工具"""
    result = expensive_computation(n)
    return f"计算结果: {result}"

# ✅ 使用连接池
import aiohttp

_session = None

async def get_session():
    global _session
    if _session is None:
        _session = aiohttp.ClientSession()
    return _session

@mcp.tool()
async def fetch_with_pool(url: str) -> str:
    """使用连接池的HTTP请求"""
    session = await get_session()
    async with session.get(url) as response:
        return await response.text()

# ✅ 批量处理
@mcp.tool()
async def batch_process(items: list[str], ctx: Context) -> list[str]:
    """批量处理,并发执行"""
    await ctx.info(f"开始处理 {len(items)} 个项目")
    
    async def process_one(item: str) -> str:
        await asyncio.sleep(0.1)  # 模拟处理
        return f"处理完成: {item}"
    
    results = await asyncio.gather(*[process_one(item) for item in items])
    await ctx.info("所有项目处理完成")
    return list(results)

坑九:环境变量和配置问题

问题描述:

配置文件读取失败,环境变量未生效。

产生原因:

路径问题、环境变量作用域、配置格式错误。

解决方案:

import os
import json
from pathlib import Path
from typing import Optional

# ✅ 正确的环境变量读取
class Config:
    def __init__(self):
        # 使用getenv提供默认值
        self.api_key = os.getenv("API_KEY", "")
        self.debug = os.getenv("DEBUG", "false").lower() == "true"
        self.port = int(os.getenv("PORT", "8000"))
        
    @staticmethod
    def from_env_file(filepath: str = ".env"):
        """从.env文件加载"""
        env_path = Path(filepath)
        if env_path.exists():
            with open(env_path) as f:
                for line in f:
                    if "=" in line and not line.startswith("#"):
                        key, value = line.strip().split("=", 1)
                        os.environ[key] = value.strip("\"'")

# ✅ 配置文件处理
def load_config(config_path: Optional[str] = None) -> dict:
    """加载配置文件"""
    if config_path is None:
        # 默认配置路径
        config_path = Path(__file__).parent / "config.json"
    else:
        config_path = Path(config_path)
    
    if not config_path.exists():
        return {}
    
    with open(config_path, encoding="utf-8") as f:
        return json.load(f)

# ✅ 在MCP工具中使用
from mcp.server.fastmcp import FastMCP

config = Config()
config.from_env_file()
mcp = FastMCP("config-demo", config={"debug": config.debug})

@mcp.tool()
def get_config_value(key: str) -> str:
    """获取配置值"""
    return os.getenv(key, "未设置")

坑十:客户端配置不生效

问题描述:

修改了配置文件,但客户端没有识别到新的MCP服务器。

产生原因:

配置文件格式错误、路径问题、未重启应用。

排查步骤:

  1. 验证JSON格式:使用JSON验证器检查格式
  2. 重启客户端:完全退出后重新打开
# 添加调试日志
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

@mcp.tool()
def debug_tool(message: str) -> str:
    logger.debug(f"收到消息: {message}")
    logger.info("处理中...")
    return f"已处理: {message}"

生产部署、MCP认证机制:保护你的服务器安全

当你的MCP服务器需要暴露给外部世界时,认证就成了必须考虑的问题。毕竟,你不想让任何人都能随意调用你的服务吧?

认证方式概览

MCP支持两种主要的认证方式:

认证方式 适用场景 安全级别 实现复杂度
API Key 本地开发、内部服务、快速原型 中等 简单
OAuth 2.1 生产环境、多用户、第三方集成 较复杂

方式一:API Key认证

API Key是最简单直接的认证方式,适合快速开发和内部服务调用。

服务端实现
from mcp.server.fastmcp import FastMCP, Context
from fastapi import HTTPException, Header
import os

mcp = FastMCP("protected-service")

API_KEY = os.getenv("MCP_API_KEY", "your-secret-key-here")

def verify_api_key(api_key: str = None):
    """验证API Key"""
    if not api_key or api_key != API_KEY:
        raise HTTPException(status_code=401, detail="Invalid API Key")
    return True

@mcp.tool()
async def protected_tool(data: str, ctx: Context, api_key: str = None) -> str:
    """需要认证的工具
    
    Args:
        data: 输入数据
        api_key: API密钥(通过Header传递)
    """
    verify_api_key(api_key)
    return f"处理成功: {data}"

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
使用FastAPI中间件(推荐)
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware import Middleware
from mcp.server.fastmcp import FastMCP
import os

API_KEY = os.getenv("MCP_API_KEY", "your-secret-key")

async def auth_middleware(request: Request, call_next):
    """认证中间件"""
    if request.url.path.startswith("/mcp"):
        api_key = request.headers.get("X-API-Key")
        if not api_key or api_key != API_KEY:
            raise HTTPException(status_code=401, detail="Unauthorized")
    return await call_next(request)

mcp = FastMCP("secure-service")

@mcp.tool()
def secure_query(query: str) -> str:
    """安全查询工具"""
    return f"查询结果: {query}"

app = mcp.http_app()
app.middleware("http")(auth_middleware)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
客户端配置

在CoPaw/Cursor中配置:

{
  "mcpServers": {
    "secure-service": {
      "url": "http://127.0.0.1:8000/mcp",
      "headers": {
        "X-API-Key": "your-secret-key"
      }
    }
  }
}

在Inspector中测试:

连接时添加Header:

  • Header名称:X-API-Key
  • Header值:your-secret-key

方式二:OAuth 2.1认证

OAuth 2.1是MCP官方推荐的认证标准,适合生产环境和多用户场景。

OAuth 2.1核心概念
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ MCP Client  │────▶│ Auth Server │────▶│ MCP Server  │
│  (AI应用)   │     │ (授权服务器) │     │ (资源服务器) │
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       │  1.请求授权       │                   │
       │──────────────────▶│                   │
       │                   │                   │
       │  2.返回授权码     │                   │
       │◀──────────────────│                   │
       │                   │                   │
       │  3.用授权码换Token│                   │
       │──────────────────▶│                   │
       │                   │                   │
       │  4.返回Access     │                   │
       │    Token          │                   │
       │◀──────────────────│                   │
       │                   │                   │
       │  5.带Token调用MCP │                   │
       │──────────────────────────────────────▶│
       │                   │                   │
       │  6.验证Token返回数据                  │
       │◀──────────────────────────────────────│
服务端实现(使用Authlib)
from mcp.server.fastmcp import FastMCP
from fastapi import FastAPI, Depends, HTTPException
from authlib.integrations.fastapi_client import OAuth
from authlib.oauth2.rfc7591 import ClientRegistration
import os

mcp = FastMCP("oauth-protected-service")

AUTH_SERVER_URL = os.getenv("AUTH_SERVER_URL", "https://auth.example.com")
CLIENT_ID = os.getenv("OAUTH_CLIENT_ID")
CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET")

oauth = OAuth()
oauth.register(
    name="mcp_auth",
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    server_metadata_url=f"{AUTH_SERVER_URL}/.well-known/oauth-authorization-server",
    client_kwargs={"scope": "mcp:read mcp:write"}
)

async def verify_token(token: str):
    """验证OAuth Token"""
    try:
        introspection_url = f"{AUTH_SERVER_URL}/introspect"
        async with oauth.mcp_auth.get(introspection_url, token=token) as resp:
            data = await resp.json()
            if not data.get("active"):
                raise HTTPException(status_code=401, detail="Token expired or invalid")
            return data
    except Exception as e:
        raise HTTPException(status_code=401, detail=str(e))

@mcp.tool()
async def oauth_protected_tool(query: str, token_info: dict = Depends(verify_token)) -> str:
    """OAuth保护的工具"""
    user_id = token_info.get("sub", "unknown")
    return f"用户 {user_id} 的查询结果: {query}"
PKCE流程(公共客户端)

对于没有客户端密钥的公共客户端(如移动应用、浏览器应用),需要使用PKCE(Proof Key for Code Exchange):

import hashlib
import base64
import secrets

def generate_pkce_challenge():
    """生成PKCE challenge"""
    code_verifier = secrets.token_urlsafe(32)
    code_challenge = base64.urlsafe_b64encode(
        hashlib.sha256(code_verifier.encode()).digest()
    ).decode().rstrip('=')
    return code_verifier, code_challenge

async def pkce_auth_flow():
    """PKCE授权流程"""
    code_verifier, code_challenge = generate_pkce_challenge()
    
    auth_url = f"{AUTH_SERVER_URL}/authorize?" + \
        f"client_id={CLIENT_ID}&" + \
        f"redirect_uri=http://localhost:8000/callback&" + \
        f"scope=mcp:read&" + \
        f"response_type=code&" + \
        f"code_challenge={code_challenge}&" + \
        f"code_challenge_method=S256"
    
    print(f"请访问: {auth_url}")
    
    auth_code = await get_auth_code_from_callback()
    
    token_url = f"{AUTH_SERVER_URL}/token"
    token_data = {
        "grant_type": "authorization_code",
        "code": auth_code,
        "redirect_uri": "http://localhost:8000/callback",
        "client_id": CLIENT_ID,
        "code_verifier": code_verifier
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.post(token_url, data=token_data) as resp:
            return await resp.json()
动态客户端注册(RFC 7591)

MCP支持动态客户端注册,让客户端自动注册到授权服务器:

async def register_client():
    """动态注册OAuth客户端"""
    registration_url = f"{AUTH_SERVER_URL}/register"
    
    client_metadata = {
        "client_name": "MCP Client",
        "redirect_uris": ["http://localhost:8000/callback"],
        "grant_types": ["authorization_code"],
        "response_types": ["code"],
        "scope": "mcp:read mcp:write",
        "token_endpoint_auth_method": "none"  # 公共客户端
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.post(registration_url, json=client_metadata) as resp:
            return await resp.json()

MCP元数据发现

MCP服务器需要提供OAuth元数据发现端点:

from fastapi import FastAPI

app = FastAPI()

@app.get("/.well-known/oauth-authorization-server")
async def oauth_metadata():
    """OAuth服务器元数据"""
    return {
        "issuer": "https://your-server.com",
        "authorization_endpoint": "https://your-server.com/oauth/authorize",
        "token_endpoint": "https://your-server.com/oauth/token",
        "introspection_endpoint": "https://your-server.com/oauth/introspect",
        "revocation_endpoint": "https://your-server.com/oauth/revoke",
        "registration_endpoint": "https://your-server.com/oauth/register",
        "scopes_supported": ["mcp:read", "mcp:write"],
        "response_types_supported": ["code"],
        "grant_types_supported": ["authorization_code", "refresh_token"],
        "code_challenge_methods_supported": ["S256"],
        "token_endpoint_auth_methods_supported": ["client_secret_basic", "none"]
    }

认证最佳实践

1. API Key管理
import os
from pathlib import Path

class APIKeyManager:
    """API Key管理器"""
    
    def __init__(self):
        self.keys_file = Path.home() / ".mcp" / "api_keys.json"
        self.keys = self._load_keys()
    
    def _load_keys(self) -> dict:
        """加载API Keys"""
        if self.keys_file.exists():
            import json
            with open(self.keys_file) as f:
                return json.load(f)
        return {}
    
    def generate_key(self, name: str, expires_days: int = 30) -> str:
        """生成新的API Key"""
        import secrets
        from datetime import datetime, timedelta
        
        key = secrets.token_urlsafe(32)
        self.keys[name] = {
            "key": key,
            "created": datetime.now().isoformat(),
            "expires": (datetime.now() + timedelta(days=expires_days)).isoformat()
        }
        self._save_keys()
        return key
    
    def verify_key(self, key: str) -> bool:
        """验证API Key"""
        from datetime import datetime
        
        for name, data in self.keys.items():
            if data["key"] == key:
                expires = datetime.fromisoformat(data["expires"])
                if datetime.now() < expires:
                    return True
        return False
    
    def _save_keys(self):
        """保存API Keys"""
        import json
        self.keys_file.parent.mkdir(parents=True, exist_ok=True)
        with open(self.keys_file, "w") as f:
            json.dump(self.keys, f, indent=2)

key_manager = APIKeyManager()

@mcp.tool()
async def protected_operation(data: str, api_key: str) -> str:
    """受保护的操作"""
    if not key_manager.verify_key(api_key):
        raise HTTPException(status_code=401, detail="Invalid or expired API Key")
    return f"操作成功: {data}"
2. Token刷新机制
class TokenManager:
    """Token管理器"""
    
    def __init__(self):
        self.tokens = {}
        self.refresh_tokens = {}
    
    async def refresh_access_token(self, refresh_token: str) -> dict:
        """刷新Access Token"""
        token_url = f"{AUTH_SERVER_URL}/token"
        
        data = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(token_url, data=data) as resp:
                new_tokens = await resp.json()
                return new_tokens
    
    async def get_valid_token(self, user_id: str) -> str:
        """获取有效的Token"""
        token_info = self.tokens.get(user_id)
        if not token_info:
            raise HTTPException(status_code=401, detail="No token found")
        
        from datetime import datetime
        if datetime.now() > datetime.fromisoformat(token_info["expires_at"]):
            new_tokens = await self.refresh_access_token(token_info["refresh_token"])
            self.tokens[user_id] = {
                "access_token": new_tokens["access_token"],
                "refresh_token": new_tokens.get("refresh_token", token_info["refresh_token"]),
                "expires_at": (datetime.now() + timedelta(seconds=new_tokens["expires_in"])).isoformat()
            }
        
        return self.tokens[user_id]["access_token"]
3. 安全配置建议
import os
from dataclasses import dataclass

@dataclass
class SecurityConfig:
    """安全配置"""
    
    api_key_length: int = 32
    token_expire_seconds: int = 3600
    refresh_token_expire_days: int = 30
    max_failed_attempts: int = 5
    rate_limit_per_minute: int = 60
    
    @classmethod
    def from_env(cls) -> "SecurityConfig":
        return cls(
            api_key_length=int(os.getenv("MCP_API_KEY_LENGTH", "32")),
            token_expire_seconds=int(os.getenv("MCP_TOKEN_EXPIRE", "3600")),
            refresh_token_expire_days=int(os.getenv("MCP_REFRESH_EXPIRE", "30")),
            max_failed_attempts=int(os.getenv("MCP_MAX_FAILED", "5")),
            rate_limit_per_minute=int(os.getenv("MCP_RATE_LIMIT", "60"))
        )

security_config = SecurityConfig.from_env()

认证常见问题

问题一:Token过期处理
async def handle_expired_token(token: str, ctx: Context):
    """处理Token过期"""
    try:
        result = await verify_token(token)
        return result
    except HTTPException as e:
        if e.status_code == 401:
            await ctx.warning("Token已过期,请重新授权")
            raise HTTPException(
                status_code=401,
                detail="Token expired. Please re-authenticate."
            )
        raise
问题二:跨域认证
from fastapi.middleware.cors import CORSMiddleware

app = mcp.http_app()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://trusted-client.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["Authorization", "Content-Type", "X-API-Key"]
)
问题三:多租户隔离
@mcp.tool()
async def tenant_isolated_tool(
    query: str,
    token_info: dict = Depends(verify_token),
    ctx: Context
) -> str:
    """租户隔离的工具"""
    tenant_id = token_info.get("tenant_id")
    user_id = token_info.get("sub")
    
    await ctx.info(f"租户 {tenant_id} 用户 {user_id} 发起查询")
    
    result = await query_tenant_data(tenant_id, query)
    return result

核心要点回顾

  1. FastMCP让开发变简单:几行代码就能创建功能完整的MCP服务器
  2. Inspector是调试神器:可视化测试,事半功倍
  3. 坑点都有解:理解原理,问题迎刃而解
  4. 认证很重要:API Key适合快速开发,OAuth 2.1适合生产环境

学习资源


“MCP让AI从’能说’变成’能做’,这是AI应用落地的重要一步。”

希望这份指南能帮助你顺利踏上MCP开发之旅!有问题随时交流,一起探索这个充满可能性的新世界!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐