MCP技术深度指南:从开发到测试的完整实战手册
MCP技术深度指南:从开发到测试的完整实战手册
“给AI装上双手,让它真正能干活”——这就是MCP存在的意义
##引言:为什么MCP值得你认真对待?
MCP是什么?一句话说清楚
想象一下,在USB标准出现之前,每个外设都需要专门的驱动和接口——打印机用一种线,鼠标用另一种,键盘又不一样。混乱吧?MCP(Model Context Protocol)就是AI领域的"USB标准"。
它由Anthropic公司提出,核心目标是解决AI助手的"功能孤岛"问题。简单说,就是让Claude、Cursor这些AI能通过统一的方式调用外部工具、数据源和服务。
MCP能干什么?
- Tools(工具):让AI能执行操作,比如查询数据库、调用API
- Resources(资源):让AI能读取数据,比如文件内容、数据库记录
- Prompts(提示模板):预定义的交互模式,提高效率
为什么选择FastMCP?
FastMCP是Anthropic官方维护的Python框架,它的核心价值就一个字:快。用最Pythonic的方式,几行代码就能搭建一个功能完整的MCP服务器。
仓库地址:https://github.com/PrefectHQ/fastmcp
开发实战:基于FastMCP从零开始
环境搭建:工欲善其事
方式一:传统pip方式
# 创建虚拟环境(强烈推荐)
python -m venv mcp-env
# 激活环境
# Windows PowerShell:
.\mcp-env\Scripts\Activate.ps1
# Linux/Mac:
source mcp-env/bin/activate
# 安装MCP
pip install "mcp[cli]"
方式二:现代uv方式(推荐)
# 安装uv(如果还没有)
pip install uv
# 初始化项目
uv init mcp-server-demo
cd mcp-server-demo
# 添加依赖
uv add "mcp[cli]"
uv add fastmcp
# 激活环境
source .venv/bin/activate # Linux/Mac
# 或
.\venv\Scripts\Activate.ps1 # Windows
为什么推荐uv? 因为它比pip快10-100倍,而且依赖管理更优雅。
第一个MCP服务器:Hello World
创建文件 hello_server.py:
from mcp.server.fastmcp import FastMCP
# 初始化服务器
mcp = FastMCP("我的第一个MCP服务器")
# 定义一个工具
@mcp.tool()
def greet(name: str) -> str:
"""生成个性化问候语
Args:
name: 用户名称
Returns:
问候语字符串
"""
return f"🎉 你好,{name}!很高兴认识你!"
# 定义一个资源
@mcp.resource("config://settings")
def get_settings() -> dict:
"""获取应用配置"""
return {
"theme": "dark",
"language": "zh-CN",
"version": "1.0.0"
}
# 定义一个提示模板
@mcp.prompt()
def code_review_prompt(code: str) -> str:
"""代码审查提示模板"""
return f"""请审查以下代码并指出潜在问题:
{code}
请从以下角度分析:
1. 代码风格
2. 潜在bug
3. 性能问题
4. 安全隐患
"""
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
代码解析:
@mcp.tool()装饰器把普通函数变成MCP工具,函数的docstring会被AI读取理解@mcp.resource()定义资源,通过URI访问@mcp.prompt()定义提示模板,方便复用transport="stdio"表示使用标准输入输出通信
进阶示例:建材产品查询服务
创建文件 product_server.py:
from mcp.server.fastmcp import FastMCP, Context
import asyncio
mcp = FastMCP("建材产品查询服务")
PRODUCT_CATEGORIES = [
"石材", "陶瓷", "门墙柜一体", "家具", "地板",
"楼梯", "卫浴", "家电", "灯饰", "墙板", "门窗", "金属玻璃"
]
PRODUCTS_DB = {
"16": {"id": "16", "name": "进口大理石", "category": "石材", "price": 580, "unit": "平方米"},
"18": {"id": "18", "name": "高端瓷砖", "category": "陶瓷", "price": 120, "unit": "平方米"},
"20": {"id": "20", "name": "实木地板", "category": "地板", "price": 320, "unit": "平方米"},
"22": {"id": "22", "name": "智能马桶", "category": "卫浴", "price": 2800, "unit": "套"},
"24": {"id": "24", "name": "铝合金门窗", "category": "门窗", "price": 450, "unit": "平方米"},
}
@mcp.tool()
async def search_products(query: str, limit: int = 100, ctx: Context = None) -> list:
"""根据查询条件检索建材产品
检索范围限定于以下建材产品大类:
石材、陶瓷、门墙柜一体、家具、地板、楼梯、卫浴、家电、灯饰、墙板、门窗、金属玻璃
Args:
query: 搜索关键词,如"大理石"、"瓷砖"、"地板"等
limit: 返回结果数量限制,默认100
Returns:
匹配的产品列表
"""
if ctx:
await ctx.info(f"正在搜索产品: {query}")
await asyncio.sleep(0.3)
results = []
for product in PRODUCTS_DB.values():
if query.lower() in product["name"].lower() or query.lower() in product["category"].lower():
results.append(product)
if ctx:
await ctx.report_progress(100, 100, f"找到 {len(results)} 个产品")
return results[:limit]
@mcp.tool()
async def get_products_by_ids(ids: list[str], ctx: Context = None) -> list:
"""根据ID列表获取商品详细信息
Args:
ids: 商品ID列表,如 ["16", "18"]
Returns:
商品详细信息列表
"""
if ctx:
await ctx.info(f"查询产品详情: {ids}")
results = []
for id in ids:
if id in PRODUCTS_DB:
results.append(PRODUCTS_DB[id])
return results
@mcp.tool()
def list_categories() -> list:
"""获取支持的建材品类列表"""
return PRODUCT_CATEGORIES
@mcp.resource("product://categories")
def get_categories_resource() -> str:
"""建材品类资源"""
return "支持的品类:" + "、".join(PRODUCT_CATEGORIES)
@mcp.resource("product://{product_id}")
def get_product_resource(product_id: str) -> dict:
"""单个产品资源"""
return PRODUCTS_DB.get(product_id, {"error": "产品不存在"})
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
新知识点:
Context对象提供日志记录、进度报告等功能async函数支持异步操作ctx.report_progress()可以报告任务进度streamable-http传输方式支持远程访问- URI模板
product://{product_id}支持动态参数
配置管理:让服务器更灵活
创建配置文件 config.py:
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class ServerConfig:
"""服务器配置"""
name: str = "MCP Server"
version: str = "1.0.0"
transport: str = "stdio"
host: str = "0.0.0.0"
port: int = 8000
debug: bool = False
@classmethod
def from_env(cls) -> "ServerConfig":
"""从环境变量加载配置"""
return cls(
name=os.getenv("MCP_SERVER_NAME", "MCP Server"),
version=os.getenv("MCP_SERVER_VERSION", "1.0.0"),
transport=os.getenv("MCP_TRANSPORT", "stdio"),
host=os.getenv("MCP_HOST", "0.0.0.0"),
port=int(os.getenv("MCP_PORT", "8000")),
debug=os.getenv("MCP_DEBUG", "false").lower() == "true",
)
# 使用示例
config = ServerConfig.from_env()
mcp = FastMCP(config.name)
模块化设计:大型项目必备
项目结构:
george_product_service/
├── __init__.py
├── main.py # 入口文件
├── config.py # 配置管理
├── tools/
│ ├── __init__.py
│ ├── product.py # 产品相关工具
│ └── category.py # 品类相关工具
├── resources/
│ ├── __init__.py
│ └── products.py # 产品资源
└── prompts/
├── __init__.py
└── templates.py # 提示模板
tools/product.py:
from mcp.server.fastmcp import FastMCP
def register_product_tools(mcp: FastMCP):
"""注册产品相关工具"""
@mcp.tool()
async def search_products(query: str, limit: int = 100) -> list:
"""搜索建材产品"""
results = []
for product in PRODUCTS_DB.values():
if query.lower() in product["name"].lower():
results.append(product)
return results[:limit]
@mcp.tool()
async def get_products_by_ids(ids: list[str]) -> list:
"""根据ID获取产品详情"""
return [PRODUCTS_DB.get(id) for id in ids if id in PRODUCTS_DB]
main.py:
from mcp.server.fastmcp import FastMCP
from tools.product import register_product_tools
from tools.category import register_category_tools
from resources.products import register_product_resources
from prompts.templates import register_prompts
mcp = FastMCP("建材产品查询服务")
register_product_tools(mcp)
register_category_tools(mcp)
register_product_resources(mcp)
register_prompts(mcp)
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
传输方式选择
| 传输方式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
stdio |
本地开发、Claude Desktop | 简单、无需网络配置 | 仅限本地 |
sse |
远程服务、Web应用 | 支持远程访问 | 需要HTTP服务器 |
streamable-http |
生产环境、云部署 | 高性能、可扩展 | 配置稍复杂 |
stdio模式启动:
mcp.run(transport="stdio")
HTTP模式启动:
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
客户端配置:让AI认识你的服务器
CoPaw配置

编辑配置:
{
"key": "my-tesat-mcp",
"name": "my-tesat-mcp",
"description": "",
"enabled": true,
"transport": "streamable_http",
"url": "http://127.0.0.1:8000/mcp",
"headers": {},
"command": "",
"args": [],
"env": {},
"cwd": ""
}

Cursor配置
在项目根目录创建 .cursor/mcp.json:
{
"mcpServers": {
"local-tools": {
"url": "http://127.0.0.1:8000/mcp"
}
}
}
测试策略:MCP Inspector完全指南
什么是MCP Inspector?
MCP Inspector是官方提供的可视化调试工具,让你能直观地测试MCP服务器的各项功能。简单说,就是给你的MCP服务器装了个"体检仪"。
仓库地址:https://github.com/modelcontextprotocol/inspector
安装与启动
最简单的方式(推荐):
npx @modelcontextprotocol/inspector
启动后访问 http://localhost:5173(或控制台显示的地址)。
自定义端口:
CLIENT_PORT=8080 SERVER_PORT=3000 npx @modelcontextprotocol/inspector
Docker方式:
docker run --rm --network host -p 6274:6274 -p 6277:6277 \
ghcr.io/modelcontextprotocol/inspector:latest
源码安装方式:
下载源码
#进入根目录
cd inspector
npm install
npm run dev
连接MCP服务器
方式一:STDIO模式(本地开发)
# 直接指定Python脚本
npx @modelcontextprotocol/inspector python product_server.py
# 使用uv运行
npx @modelcontextprotocol/inspector \
uv --directory /项目路径 run main.py
# 传递环境变量
npx @modelcontextprotocol/inspector \
-e API_KEY=your-key \
-e DEBUG=true \
python server.py
方式二:SSE/HTTP模式(远程服务)
# 连接远程服务器
npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp
# 带认证的连接
npx @modelcontextprotocol/inspector --url https://api.example.com/mcp
方式三:普通启动后图形化连接

根据mcp启动方式,选择类型,输入url地址后,点击 Connect
MCP服务正常响应的
具体测试某个tool

Run Tool 后,下方会有请求回来的数据
Inspector核心功能详解
1. 服务器连接面板
- 显示协议版本、服务器名称、连接状态
- 支持手动重连和断开
- 配置命令行参数和环境变量
2. Tools标签页(工具测试)
这是最常用的功能:
- 查看工具列表:显示所有注册的工具
- 查看Schema:每个工具的参数类型和描述
- 执行测试:输入参数,点击Run执行
- 查看结果:实时显示返回值和执行状态
测试示例:
// 工具:search_products
// 输入参数:
{
"query": "大理石",
"limit": 10
}
// 输出结果:
{
"content": [
{
"type": "text",
"text": "[{\"id\": \"16\", \"name\": \"进口大理石\", \"category\": \"石材\", \"price\": 580, \"unit\": \"平方米\"}]"
}
]
}
3. Resources标签页(资源管理)
- 浏览所有可用资源
- 查看资源URI和MIME类型
- 测试资源读取功能
- 支持资源订阅测试
4. Prompts标签页(提示测试)
- 查看提示模板列表
- 测试不同参数组合
- 预览生成的消息
5. Notifications面板(通知监控)
- 实时显示服务器日志
- 记录所有消息交互
- 便于调试问题
测试用例设计
下面以 my-tesat-mcp MCP为例,演示完整的测试流程。这是一个建材产品查询服务,提供产品搜索和详情查询功能。
基础功能测试
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def test_product_service():
"""测试建材产品服务MCP"""
async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
print("✅ MCP连接成功")
result = await session.call_tool("search_products", {"query": "大理石", "limit": 10})
products = result.content[0].text if result.content else []
print(f"✅ 产品搜索测试通过,找到 {len(products)} 个产品")
if products and len(products) > 0:
product_id = products[0].get("id", "16")
result = await session.call_tool("get_products_by_ids", {"ids": [product_id]})
print(f"✅ 产品详情查询测试通过")
result = await session.call_tool("search_products", {"query": "不存在的产品xyz", "limit": 5})
print(f"✅ 空结果处理测试通过")
asyncio.run(test_product_service())
边界条件测试
async def test_edge_cases():
"""测试边界条件"""
async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
try:
await session.call_tool("search_products", {"query": ""})
except Exception as e:
print(f"✅ 空查询正确处理: {type(e).__name__}")
result = await session.call_tool("search_products", {"query": "瓷砖@#$%^&*()", "limit": 5})
print(f"✅ 特殊字符处理正常")
result = await session.call_tool("search_products", {"query": "地板", "limit": 1000})
print(f"✅ 大limit值处理正常")
try:
await session.call_tool("get_products_by_ids", {"ids": ["invalid_id"]})
except Exception as e:
print(f"✅ 无效ID正确处理: {type(e).__name__}")
try:
await session.call_tool("unknown_tool", {})
except Exception as e:
print(f"✅ 未知工具正确拒绝: {type(e).__name__}")
asyncio.run(test_edge_cases())
性能测试
import time
import asyncio
async def test_performance():
"""性能测试"""
async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
start = time.time()
await session.call_tool("search_products", {"query": "石材", "limit": 20})
latency = time.time() - start
print(f"单次搜索延迟: {latency:.3f}s")
start = time.time()
tasks = [
session.call_tool("search_products", {"query": category, "limit": 10})
for category in ["石材", "陶瓷", "地板", "卫浴", "门窗"]
]
results = await asyncio.gather(*tasks)
total_time = time.time() - start
print(f"并发5次搜索总时间: {total_time:.3f}s")
print(f"平均每次: {total_time/5:.3f}s")
start = time.time()
await session.call_tool("get_products_by_ids", {"ids": ["16", "18", "20", "22", "24"]})
latency = time.time() - start
print(f"批量ID查询延迟: {latency:.3f}s")
asyncio.run(test_performance())
自动化测试流程
创建测试文件 test_product_service.py:
import pytest
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
@pytest.fixture
async def session():
"""测试会话fixture"""
async with streamablehttp_client("http://127.0.0.1:8000/mcp") as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
yield session
@pytest.mark.asyncio
async def test_search_products(session):
"""测试产品搜索"""
result = await session.call_tool("search_products", {"query": "大理石", "limit": 10})
assert result.content is not None
print("✅ 产品搜索测试通过")
@pytest.mark.asyncio
async def test_get_products_by_ids(session):
"""测试ID查询"""
result = await session.call_tool("get_products_by_ids", {"ids": ["16", "18"]})
assert result.content is not None
print("✅ ID查询测试通过")
@pytest.mark.asyncio
async def test_search_categories(session):
"""测试各品类搜索"""
categories = ["石材", "陶瓷", "门墙柜一体", "家具", "地板", "楼梯", "卫浴", "家电", "灯饰", "墙板", "门窗", "金属玻璃"]
for category in categories:
result = await session.call_tool("search_products", {"query": category, "limit": 5})
assert result.content is not None
print(f"✅ {len(categories)}个品类搜索测试通过")
@pytest.mark.asyncio
async def test_empty_result(session):
"""测试空结果"""
result = await session.call_tool("search_products", {"query": "不存在的xyz产品", "limit": 5})
assert result.content is not None
print("✅ 空结果处理测试通过")
运行测试:
pytest test_product_service.py -v
Inspector测试实操
启动Inspector后,连接到 http://127.0.0.1:8000/mcp:
测试 search_products 工具:
{
"query": "大理石",
"limit": 10
}
测试 get_products_by_ids 工具:
{
"ids": ["16", "18"]
}
测试结果分析
关注指标:
- 响应时间:正常应在100ms-1s之间
- 成功率:应达到99%以上
- 错误类型:关注错误分布
- 资源消耗:内存、CPU使用情况
常见问题排查:
| 现象 | 可能原因 | 排查方法 |
|---|---|---|
| 连接超时 | 服务器未启动、端口被占用 | 检查进程、端口 |
| 工具未显示 | 装饰器配置错误 | 检查docstring格式 |
| 参数验证失败 | 类型不匹配 | 检查参数类型定义 |
| 返回为空 | 函数逻辑问题 | 添加日志调试 |
⚠️ 坑点描述:那些年踩过的坑
坑一:Windows路径问题
问题描述:
Error executing MCP tool: Not connected
产生原因:
Windows使用反斜杠 \ 作为路径分隔符,而MCP配置中可能混用了正斜杠 /。
解决方案:
// ❌ 错误写法
{
"command": "python",
"args": ["C:/Users/name/server.py"]
}
// ✅ 正确写法(使用双反斜杠或正斜杠)
{
"command": "python",
"args": ["C:\\Users\\name\\server.py"]
}
// ✅ 更好的写法(使用uv)
{
"command": "uv",
"args": [
"--directory",
"C:\\Users\\name\\project",
"run",
"server.py"
]
}
最佳实践:
# 使用pathlib处理路径
from pathlib import Path
# 获取当前文件目录
CURRENT_DIR = Path(__file__).parent
CONFIG_FILE = CURRENT_DIR / "config.json"
# 跨平台路径
data_path = Path.home() / "data" / "mcp_data"
坑二:装饰器顺序问题
问题描述:
工具函数在Inspector中不显示,或者参数类型丢失。
产生原因:
装饰器顺序错误,或者使用了不兼容的装饰器。
解决方案:
# ❌ 错误:装饰器顺序不对
@mcp.tool()
@cache_result # 自定义装饰器
def my_tool(x: int) -> str:
return str(x)
# ✅ 正确:确保@mcp.tool()在最外层
@cache_result
@mcp.tool()
def my_tool(x: int) -> str:
return str(x)
# ✅ 或者使用functools.wraps
from functools import wraps
def cache_result(func):
@wraps(func) # 保留原函数的元信息
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@mcp.tool()
@cache_result
def my_tool(x: int) -> str:
return str(x)
坑三:异步函数处理不当
问题描述:
RuntimeWarning: coroutine was never awaited
产生原因:
在同步上下文中调用了异步函数,或者混用了async/await。
解决方案:
# ❌ 错误:同步函数中调用异步操作
@mcp.tool()
def bad_tool(url: str) -> str:
import aiohttp
async def fetch():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
return fetch() # 返回了coroutine对象
# ✅ 正确:使用async def
@mcp.tool()
async def good_tool(url: str) -> str:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# ✅ 或者使用同步库
@mcp.tool()
def sync_tool(url: str) -> str:
import requests
response = requests.get(url)
return response.text
坑四:类型注解缺失或错误
问题描述:
AI无法正确理解参数类型,导致调用失败。
产生原因:
缺少类型注解,或使用了复杂类型但未正确声明。
解决方案:
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
# ❌ 错误:没有类型注解
@mcp.tool()
def search(query): # AI不知道query是什么类型
return f"搜索: {query}"
# ✅ 正确:添加类型注解
@mcp.tool()
def search(query: str) -> str:
"""搜索功能
Args:
query: 搜索关键词
Returns:
搜索结果
"""
return f"搜索: {query}"
# ✅ 更好:使用Pydantic定义复杂类型
class SearchParams(BaseModel):
"""搜索参数"""
query: str = Field(description="搜索关键词")
limit: int = Field(default=10, description="返回结果数量")
filters: Optional[Dict[str, Any]] = Field(default=None, description="过滤条件")
@mcp.tool()
def advanced_search(params: SearchParams) -> List[str]:
"""高级搜索"""
return [f"结果{i}" for i in range(params.limit)]
坑五:资源URI格式错误
问题描述:
Error: Resource not found
产生原因:
URI格式不规范,或者URI模板参数处理错误。
解决方案:
# ❌ 错误:URI格式不规范
@mcp.resource("myfile.txt") # 缺少协议前缀
def get_file():
return "content"
# ✅ 正确:使用标准URI格式
@mcp.resource("file:///path/to/myfile.txt")
def get_file():
return "content"
# ✅ 使用URI模板
@mcp.resource("user://{user_id}/profile")
def get_user_profile(user_id: str) -> dict:
return {"id": user_id, "name": "用户名"}
# ✅ 常见URI格式示例
# file:///absolute/path/to/file
# config://app_settings
# database://table_name
# http://api.example.com/data
坑六:Context对象使用错误
问题描述:
TypeError: Context object is not available
产生原因:
Context对象需要通过类型注解注入,不能直接实例化。
解决方案:
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("test")
# ❌ 错误:直接创建Context
@mcp.tool()
def bad_tool(ctx: Context): # 没有通过参数注入
ctx.info("这条消息不会显示")
# ✅ 正确:通过类型注解自动注入
@mcp.tool()
async def good_tool(name: str, ctx: Context) -> str:
"""正确使用Context"""
await ctx.info(f"处理请求: {name}")
await ctx.report_progress(50, 100, "处理中...")
return f"Hello, {name}!"
# ✅ Context的常用方法
@mcp.tool()
async def demo_context(ctx: Context) -> str:
# 日志记录
await ctx.info("信息日志")
await ctx.warning("警告日志")
await ctx.error("错误日志")
# 进度报告
await ctx.report_progress(25, 100, "第一步完成")
# 读取资源
# content = await ctx.read_resource("file:///path/to/file")
return "完成"
坑七:错误处理不当
问题描述:
工具执行失败时,AI收到的是无意义的错误信息。
产生原因:
没有正确捕获和处理异常。
解决方案:
from mcp.server.fastmcp import FastMCP
from mcp.shared.exceptions import McpError
mcp = FastMCP("error-demo")
# ❌ 错误:没有错误处理
@mcp.tool()
def divide_bad(a: int, b: int) -> float:
return a / b # b=0时会崩溃
# ✅ 正确:捕获并返回友好错误
@mcp.tool()
def divide_good(a: int, b: int) -> str:
"""安全除法
Args:
a: 被除数
b: 除数
Returns:
计算结果或错误信息
"""
try:
if b == 0:
return "错误:除数不能为零"
result = a / b
return f"{a} ÷ {b} = {result}"
except Exception as e:
return f"计算错误:{str(e)}"
# ✅ 更好:使用自定义异常
class ToolError(Exception):
"""工具执行错误"""
pass
@mcp.tool()
async def fetch_url(url: str, ctx: Context) -> str:
"""获取网页内容"""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status != 200:
raise ToolError(f"HTTP错误: {response.status}")
return await response.text()
except aiohttp.ClientError as e:
await ctx.error(f"网络请求失败: {e}")
return f"请求失败: {str(e)}"
except ToolError as e:
await ctx.error(str(e))
return str(e)
坑八:性能瓶颈
问题描述:
工具响应缓慢,并发处理能力差。
产生原因:
同步阻塞操作、资源未复用、缺少缓存。
解决方案:
import asyncio
from functools import lru_cache
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("performance-demo")
# ✅ 使用缓存
@lru_cache(maxsize=100)
def expensive_computation(n: int) -> int:
"""耗时计算,使用缓存"""
# 模拟耗时操作
import time
time.sleep(1)
return n * n
@mcp.tool()
def cached_tool(n: int) -> str:
"""使用缓存的工具"""
result = expensive_computation(n)
return f"计算结果: {result}"
# ✅ 使用连接池
import aiohttp
_session = None
async def get_session():
global _session
if _session is None:
_session = aiohttp.ClientSession()
return _session
@mcp.tool()
async def fetch_with_pool(url: str) -> str:
"""使用连接池的HTTP请求"""
session = await get_session()
async with session.get(url) as response:
return await response.text()
# ✅ 批量处理
@mcp.tool()
async def batch_process(items: list[str], ctx: Context) -> list[str]:
"""批量处理,并发执行"""
await ctx.info(f"开始处理 {len(items)} 个项目")
async def process_one(item: str) -> str:
await asyncio.sleep(0.1) # 模拟处理
return f"处理完成: {item}"
results = await asyncio.gather(*[process_one(item) for item in items])
await ctx.info("所有项目处理完成")
return list(results)
坑九:环境变量和配置问题
问题描述:
配置文件读取失败,环境变量未生效。
产生原因:
路径问题、环境变量作用域、配置格式错误。
解决方案:
import os
import json
from pathlib import Path
from typing import Optional
# ✅ 正确的环境变量读取
class Config:
def __init__(self):
# 使用getenv提供默认值
self.api_key = os.getenv("API_KEY", "")
self.debug = os.getenv("DEBUG", "false").lower() == "true"
self.port = int(os.getenv("PORT", "8000"))
@staticmethod
def from_env_file(filepath: str = ".env"):
"""从.env文件加载"""
env_path = Path(filepath)
if env_path.exists():
with open(env_path) as f:
for line in f:
if "=" in line and not line.startswith("#"):
key, value = line.strip().split("=", 1)
os.environ[key] = value.strip("\"'")
# ✅ 配置文件处理
def load_config(config_path: Optional[str] = None) -> dict:
"""加载配置文件"""
if config_path is None:
# 默认配置路径
config_path = Path(__file__).parent / "config.json"
else:
config_path = Path(config_path)
if not config_path.exists():
return {}
with open(config_path, encoding="utf-8") as f:
return json.load(f)
# ✅ 在MCP工具中使用
from mcp.server.fastmcp import FastMCP
config = Config()
config.from_env_file()
mcp = FastMCP("config-demo", config={"debug": config.debug})
@mcp.tool()
def get_config_value(key: str) -> str:
"""获取配置值"""
return os.getenv(key, "未设置")
坑十:客户端配置不生效
问题描述:
修改了配置文件,但客户端没有识别到新的MCP服务器。
产生原因:
配置文件格式错误、路径问题、未重启应用。
排查步骤:
- 验证JSON格式:使用JSON验证器检查格式
- 重启客户端:完全退出后重新打开
# 添加调试日志
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@mcp.tool()
def debug_tool(message: str) -> str:
logger.debug(f"收到消息: {message}")
logger.info("处理中...")
return f"已处理: {message}"
生产部署、MCP认证机制:保护你的服务器安全
当你的MCP服务器需要暴露给外部世界时,认证就成了必须考虑的问题。毕竟,你不想让任何人都能随意调用你的服务吧?
认证方式概览
MCP支持两种主要的认证方式:
| 认证方式 | 适用场景 | 安全级别 | 实现复杂度 |
|---|---|---|---|
| API Key | 本地开发、内部服务、快速原型 | 中等 | 简单 |
| OAuth 2.1 | 生产环境、多用户、第三方集成 | 高 | 较复杂 |
方式一:API Key认证
API Key是最简单直接的认证方式,适合快速开发和内部服务调用。
服务端实现
from mcp.server.fastmcp import FastMCP, Context
from fastapi import HTTPException, Header
import os
mcp = FastMCP("protected-service")
API_KEY = os.getenv("MCP_API_KEY", "your-secret-key-here")
def verify_api_key(api_key: str = None):
"""验证API Key"""
if not api_key or api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
return True
@mcp.tool()
async def protected_tool(data: str, ctx: Context, api_key: str = None) -> str:
"""需要认证的工具
Args:
data: 输入数据
api_key: API密钥(通过Header传递)
"""
verify_api_key(api_key)
return f"处理成功: {data}"
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
使用FastAPI中间件(推荐)
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware import Middleware
from mcp.server.fastmcp import FastMCP
import os
API_KEY = os.getenv("MCP_API_KEY", "your-secret-key")
async def auth_middleware(request: Request, call_next):
"""认证中间件"""
if request.url.path.startswith("/mcp"):
api_key = request.headers.get("X-API-Key")
if not api_key or api_key != API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
return await call_next(request)
mcp = FastMCP("secure-service")
@mcp.tool()
def secure_query(query: str) -> str:
"""安全查询工具"""
return f"查询结果: {query}"
app = mcp.http_app()
app.middleware("http")(auth_middleware)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
客户端配置
在CoPaw/Cursor中配置:
{
"mcpServers": {
"secure-service": {
"url": "http://127.0.0.1:8000/mcp",
"headers": {
"X-API-Key": "your-secret-key"
}
}
}
}
在Inspector中测试:
连接时添加Header:
- Header名称:
X-API-Key - Header值:
your-secret-key
方式二:OAuth 2.1认证
OAuth 2.1是MCP官方推荐的认证标准,适合生产环境和多用户场景。
OAuth 2.1核心概念
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ MCP Client │────▶│ Auth Server │────▶│ MCP Server │
│ (AI应用) │ │ (授权服务器) │ │ (资源服务器) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ 1.请求授权 │ │
│──────────────────▶│ │
│ │ │
│ 2.返回授权码 │ │
│◀──────────────────│ │
│ │ │
│ 3.用授权码换Token│ │
│──────────────────▶│ │
│ │ │
│ 4.返回Access │ │
│ Token │ │
│◀──────────────────│ │
│ │ │
│ 5.带Token调用MCP │ │
│──────────────────────────────────────▶│
│ │ │
│ 6.验证Token返回数据 │
│◀──────────────────────────────────────│
服务端实现(使用Authlib)
from mcp.server.fastmcp import FastMCP
from fastapi import FastAPI, Depends, HTTPException
from authlib.integrations.fastapi_client import OAuth
from authlib.oauth2.rfc7591 import ClientRegistration
import os
mcp = FastMCP("oauth-protected-service")
AUTH_SERVER_URL = os.getenv("AUTH_SERVER_URL", "https://auth.example.com")
CLIENT_ID = os.getenv("OAUTH_CLIENT_ID")
CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET")
oauth = OAuth()
oauth.register(
name="mcp_auth",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
server_metadata_url=f"{AUTH_SERVER_URL}/.well-known/oauth-authorization-server",
client_kwargs={"scope": "mcp:read mcp:write"}
)
async def verify_token(token: str):
"""验证OAuth Token"""
try:
introspection_url = f"{AUTH_SERVER_URL}/introspect"
async with oauth.mcp_auth.get(introspection_url, token=token) as resp:
data = await resp.json()
if not data.get("active"):
raise HTTPException(status_code=401, detail="Token expired or invalid")
return data
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
@mcp.tool()
async def oauth_protected_tool(query: str, token_info: dict = Depends(verify_token)) -> str:
"""OAuth保护的工具"""
user_id = token_info.get("sub", "unknown")
return f"用户 {user_id} 的查询结果: {query}"
PKCE流程(公共客户端)
对于没有客户端密钥的公共客户端(如移动应用、浏览器应用),需要使用PKCE(Proof Key for Code Exchange):
import hashlib
import base64
import secrets
def generate_pkce_challenge():
"""生成PKCE challenge"""
code_verifier = secrets.token_urlsafe(32)
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().rstrip('=')
return code_verifier, code_challenge
async def pkce_auth_flow():
"""PKCE授权流程"""
code_verifier, code_challenge = generate_pkce_challenge()
auth_url = f"{AUTH_SERVER_URL}/authorize?" + \
f"client_id={CLIENT_ID}&" + \
f"redirect_uri=http://localhost:8000/callback&" + \
f"scope=mcp:read&" + \
f"response_type=code&" + \
f"code_challenge={code_challenge}&" + \
f"code_challenge_method=S256"
print(f"请访问: {auth_url}")
auth_code = await get_auth_code_from_callback()
token_url = f"{AUTH_SERVER_URL}/token"
token_data = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": "http://localhost:8000/callback",
"client_id": CLIENT_ID,
"code_verifier": code_verifier
}
async with aiohttp.ClientSession() as session:
async with session.post(token_url, data=token_data) as resp:
return await resp.json()
动态客户端注册(RFC 7591)
MCP支持动态客户端注册,让客户端自动注册到授权服务器:
async def register_client():
"""动态注册OAuth客户端"""
registration_url = f"{AUTH_SERVER_URL}/register"
client_metadata = {
"client_name": "MCP Client",
"redirect_uris": ["http://localhost:8000/callback"],
"grant_types": ["authorization_code"],
"response_types": ["code"],
"scope": "mcp:read mcp:write",
"token_endpoint_auth_method": "none" # 公共客户端
}
async with aiohttp.ClientSession() as session:
async with session.post(registration_url, json=client_metadata) as resp:
return await resp.json()
MCP元数据发现
MCP服务器需要提供OAuth元数据发现端点:
from fastapi import FastAPI
app = FastAPI()
@app.get("/.well-known/oauth-authorization-server")
async def oauth_metadata():
"""OAuth服务器元数据"""
return {
"issuer": "https://your-server.com",
"authorization_endpoint": "https://your-server.com/oauth/authorize",
"token_endpoint": "https://your-server.com/oauth/token",
"introspection_endpoint": "https://your-server.com/oauth/introspect",
"revocation_endpoint": "https://your-server.com/oauth/revoke",
"registration_endpoint": "https://your-server.com/oauth/register",
"scopes_supported": ["mcp:read", "mcp:write"],
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "none"]
}
认证最佳实践
1. API Key管理
import os
from pathlib import Path
class APIKeyManager:
"""API Key管理器"""
def __init__(self):
self.keys_file = Path.home() / ".mcp" / "api_keys.json"
self.keys = self._load_keys()
def _load_keys(self) -> dict:
"""加载API Keys"""
if self.keys_file.exists():
import json
with open(self.keys_file) as f:
return json.load(f)
return {}
def generate_key(self, name: str, expires_days: int = 30) -> str:
"""生成新的API Key"""
import secrets
from datetime import datetime, timedelta
key = secrets.token_urlsafe(32)
self.keys[name] = {
"key": key,
"created": datetime.now().isoformat(),
"expires": (datetime.now() + timedelta(days=expires_days)).isoformat()
}
self._save_keys()
return key
def verify_key(self, key: str) -> bool:
"""验证API Key"""
from datetime import datetime
for name, data in self.keys.items():
if data["key"] == key:
expires = datetime.fromisoformat(data["expires"])
if datetime.now() < expires:
return True
return False
def _save_keys(self):
"""保存API Keys"""
import json
self.keys_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.keys_file, "w") as f:
json.dump(self.keys, f, indent=2)
key_manager = APIKeyManager()
@mcp.tool()
async def protected_operation(data: str, api_key: str) -> str:
"""受保护的操作"""
if not key_manager.verify_key(api_key):
raise HTTPException(status_code=401, detail="Invalid or expired API Key")
return f"操作成功: {data}"
2. Token刷新机制
class TokenManager:
"""Token管理器"""
def __init__(self):
self.tokens = {}
self.refresh_tokens = {}
async def refresh_access_token(self, refresh_token: str) -> dict:
"""刷新Access Token"""
token_url = f"{AUTH_SERVER_URL}/token"
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
async with aiohttp.ClientSession() as session:
async with session.post(token_url, data=data) as resp:
new_tokens = await resp.json()
return new_tokens
async def get_valid_token(self, user_id: str) -> str:
"""获取有效的Token"""
token_info = self.tokens.get(user_id)
if not token_info:
raise HTTPException(status_code=401, detail="No token found")
from datetime import datetime
if datetime.now() > datetime.fromisoformat(token_info["expires_at"]):
new_tokens = await self.refresh_access_token(token_info["refresh_token"])
self.tokens[user_id] = {
"access_token": new_tokens["access_token"],
"refresh_token": new_tokens.get("refresh_token", token_info["refresh_token"]),
"expires_at": (datetime.now() + timedelta(seconds=new_tokens["expires_in"])).isoformat()
}
return self.tokens[user_id]["access_token"]
3. 安全配置建议
import os
from dataclasses import dataclass
@dataclass
class SecurityConfig:
"""安全配置"""
api_key_length: int = 32
token_expire_seconds: int = 3600
refresh_token_expire_days: int = 30
max_failed_attempts: int = 5
rate_limit_per_minute: int = 60
@classmethod
def from_env(cls) -> "SecurityConfig":
return cls(
api_key_length=int(os.getenv("MCP_API_KEY_LENGTH", "32")),
token_expire_seconds=int(os.getenv("MCP_TOKEN_EXPIRE", "3600")),
refresh_token_expire_days=int(os.getenv("MCP_REFRESH_EXPIRE", "30")),
max_failed_attempts=int(os.getenv("MCP_MAX_FAILED", "5")),
rate_limit_per_minute=int(os.getenv("MCP_RATE_LIMIT", "60"))
)
security_config = SecurityConfig.from_env()
认证常见问题
问题一:Token过期处理
async def handle_expired_token(token: str, ctx: Context):
"""处理Token过期"""
try:
result = await verify_token(token)
return result
except HTTPException as e:
if e.status_code == 401:
await ctx.warning("Token已过期,请重新授权")
raise HTTPException(
status_code=401,
detail="Token expired. Please re-authenticate."
)
raise
问题二:跨域认证
from fastapi.middleware.cors import CORSMiddleware
app = mcp.http_app()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://trusted-client.com"],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type", "X-API-Key"]
)
问题三:多租户隔离
@mcp.tool()
async def tenant_isolated_tool(
query: str,
token_info: dict = Depends(verify_token),
ctx: Context
) -> str:
"""租户隔离的工具"""
tenant_id = token_info.get("tenant_id")
user_id = token_info.get("sub")
await ctx.info(f"租户 {tenant_id} 用户 {user_id} 发起查询")
result = await query_tenant_data(tenant_id, query)
return result
核心要点回顾
- FastMCP让开发变简单:几行代码就能创建功能完整的MCP服务器
- Inspector是调试神器:可视化测试,事半功倍
- 坑点都有解:理解原理,问题迎刃而解
- 认证很重要:API Key适合快速开发,OAuth 2.1适合生产环境
学习资源
“MCP让AI从’能说’变成’能做’,这是AI应用落地的重要一步。”
希望这份指南能帮助你顺利踏上MCP开发之旅!有问题随时交流,一起探索这个充满可能性的新世界!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)