前言

FastAPI 是目前最流行的 Python Web 框架之一,凭借其异步性能、自动文档生成和类型提示支持,几乎成为了 AI 后端开发的事实标准。而 MCP(Model Context Protocol)正在成为 AI 工具连接的标准协议。

那么问题来了:能不能把现有的 FastAPI API 直接暴露为 MCP Server,让 AI 客户端直接调用?

答案是:完全可以,而且只需要很少的代码改动。 本文将通过完整实战示例,手把手教你如何用最快的方式将现有 FastAPI 应用改造成 MCP Server。


一、为什么这个组合值得关注?

1.1 场景分析

在 MCP 出现之前,AI 应用调用后端 API 的流程是这样的:

AI Client → 解析用户意图 → 构造 HTTP 请求 → 调用后端 API → 解析响应 → 返回结果

问题在于:每接入一个新的后端服务,就需要写一套 Prompt + 请求构造 + 响应解析的代码。 这些代码与业务逻辑无关,但维护成本极高。

MCP 的出现改变了这一切:

AI Client → MCP Client → MCP Server → 你的业务代码
                    ↑ 标准化接口
                    不用改业务代码!

1.2 FastAPI + MCP 的优势

对比项 传统方式 FastAPI + MCP
接入新服务 写适配代码 写一个装饰器
参数校验 手动校验 Pydantic 自动处理
文档生成 需要额外维护 自动生成
错误处理 分散各处 统一异常处理
类型安全 类型提示 类型提示 + 运行时校验

二、基础概念快速回顾

2.1 FastAPI 核心要素

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
    name: str
    price: float
    tags: list[str] = []

@app.post("/items/")
async def create_item(item: Item):
    if item.price < 0:
        raise HTTPException(status_code=400, detail="价格不能为负")
    return {"id": 1, **item.model_dump()}

FastAPI 的核心优势:

  • 自动 OpenAPI 文档/docs/redoc 自动生成
  • Pydantic 集成:请求/响应自动校验
  • 异步支持:使用 async def,性能优异
  • 依赖注入:可复用的认证、校验逻辑

2.2 MCP 核心要素

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("my-server")

@mcp.tool()
def calculate_area(width: float, height: float) -> str:
    """计算矩形面积"""
    return f"面积为 {width * height} 平方单位"

if __name__ == "__main__":
    mcp.run(transport="stdio")

MCP 的核心概念:

  • Tool:AI 可调用的函数
  • Resource:AI 可访问的数据
  • Prompt:预定义的提示词模板
  • 传输层:stdio / streamable-http / SSE

三、快速上手:5 分钟改造现有 API

3.1 项目结构

假设我们有一个现有的订单管理 FastAPI 应用:

order-api/
├── main.py              # FastAPI 应用入口
├── models.py            # 数据模型
├── routers/
│   └── orders.py        # 订单路由
└── services/
    └── order_service.py # 业务逻辑

3.2 现有代码(改造前)

# models.py
from pydantic import BaseModel
from datetime import datetime
from typing import Optional

class OrderItem(BaseModel):
    product_id: str
    product_name: str
    quantity: int
    unit_price: float

class Order(BaseModel):
    id: Optional[int] = None
    user_id: str
    items: list[OrderItem]
    status: str = "pending"
    created_at: Optional[datetime] = None
    total_amount: Optional[float] = None

class OrderCreate(BaseModel):
    user_id: str
    items: list[OrderItem]
# services/order_service.py
from datetime import datetime
from models import Order, OrderCreate

class OrderService:
    def __init__(self):
        # 模拟数据库
        self.orders = {}
        self.next_id = 1
    
    def create_order(self, order_data: OrderCreate) -> Order:
        total = sum(item.quantity * item.unit_price for item in order_data.items)
        order = Order(
            id=self.next_id,
            user_id=order_data.user_id,
            items=order_data.items,
            status="pending",
            created_at=datetime.now(),
            total_amount=total
        )
        self.orders[self.next_id] = order
        self.next_id += 1
        return order
    
    def get_order(self, order_id: int) -> Order | None:
        return self.orders.get(order_id)
    
    def list_orders(self, user_id: str, status: str = None) -> list[Order]:
        result = [o for o in self.orders.values() if o.user_id == user_id]
        if status:
            result = [o for o in result if o.status == status]
        return result
    
    def update_status(self, order_id: int, status: str) -> Order | None:
        if order_id in self.orders:
            self.orders[order_id].status = status
            return self.orders[order_id]
        return None

# 全局单例
order_service = OrderService()
# routers/orders.py
from fastapi import APIRouter, HTTPException
from models import Order, OrderCreate

router = APIRouter(prefix="/api/orders", tags=["订单管理"])

@router.post("/", response_model=Order)
async def create_order(order_data: OrderCreate):
    order = order_service.create_order(order_data)
    return order

@router.get("/{order_id}", response_model=Order)
async def get_order(order_id: int):
    order = order_service.get_order(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="订单不存在")
    return order

@router.get("/", response_model=list[Order])
async def list_orders(user_id: str, status: str = None):
    return order_service.list_orders(user_id, status)

@router.patch("/{order_id}/status")
async def update_order_status(order_id: int, status: str):
    order = order_service.update_status(order_id, status)
    if not order:
        raise HTTPException(status_code=404, detail="订单不存在")
    return order
# main.py
from fastapi import FastAPI
from routers import orders

app = FastAPI(title="订单管理系统", version="1.0.0")
app.include_router(orders.router)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.3 快速改造:三行代码暴露 MCP 接口

核心思路: 利用 FastMCPfrom_fastapi 方法,直接从现有的 FastAPI 应用生成 MCP Server。

# mcp_server.py
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP
from routers import orders  # 复用现有路由

# 1. 创建 FastAPI 应用(现有代码)
app = FastAPI(title="订单管理系统", version="1.0.0")
app.include_router(orders.router)

# 2. 创建 MCP Server
mcp = FastMCP("order-mcp-server")

# 3. 一行代码绑定!自动将所有路由暴露为 MCP Tools
mcp.mount_app(app)

if __name__ == "__main__":
    # 可以选择以哪种方式运行
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "--web":
        # Web 模式(HTTP)
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    else:
        # MCP 模式(stdio)
        mcp.run(transport="stdio")

运行测试:

# MCP 模式(供 AI Client 调用)
python mcp_server.py

# Web 模式(HTTP API)
python mcp_server.py --web

这就是全部改造代码! 3 行额外代码,现有的 4 个 API 端点全部暴露为 MCP Tools。

3.4 自动生成的 MCP Tools

mount_app() 会自动将 FastAPI 路由转换为 MCP Tools:

FastAPI 端点 自动生成的 MCP Tool
POST /api/orders/ create_order
GET /api/orders/{order_id} get_order
GET /api/orders/ list_orders
PATCH /api/orders/{order_id}/status update_order_status

每个 Tool 的描述会从路由的 docstring 和参数描述自动生成。


四、精细化改造:自定义 Tool 行为

mount_app() 虽然方便,但有时候我们需要更精细的控制。

4.1 手动暴露特定端点

# mcp_server_advanced.py
from fastapi import FastAPI, HTTPException
from mcp.server.fastmcp import FastMCP
from mcp.types import Tool, CallToolResult
import json

app = FastAPI()
mcp = FastMCP("order-mcp-server")

# 全局业务服务实例
from services.order_service import order_service

# ─── 方式 1:手动定义 Tool ────────────────────────────────

@mcp.tool()
def create_order(user_id: str, items_json: str) -> str:
    """
    创建新订单
    
    Args:
        user_id: 用户ID
        items_json: 商品列表 JSON 字符串,格式:
            [{"product_id": "P001", "product_name": "商品A", "quantity": 2, "unit_price": 99.9}]
    
    Returns:
        创建成功的订单信息
    """
    import json
    from models import OrderCreate, OrderItem
    
    try:
        items_data = json.loads(items_json)
        items = [OrderItem(**item) for item in items_data]
        order_data = OrderCreate(user_id=user_id, items=items)
        order = order_service.create_order(order_data)
        
        return json.dumps({
            "success": True,
            "order_id": order.id,
            "total_amount": order.total_amount,
            "status": order.status
        }, ensure_ascii=False, indent=2)
    except Exception as e:
        return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)

@mcp.tool()
def query_orders(user_id: str, status: str = None) -> str:
    """
    查询用户的订单列表
    
    Args:
        user_id: 用户ID
        status: 可选,按订单状态过滤 (pending/paid/shipped/completed/cancelled)
    
    Returns:
        订单列表
    """
    import json
    orders = order_service.list_orders(user_id, status)
    
    return json.dumps([{
        "order_id": o.id,
        "user_id": o.user_id,
        "total_amount": o.total_amount,
        "status": o.status,
        "created_at": o.created_at.isoformat() if o.created_at else None,
        "items_count": len(o.items)
    } for o in orders], ensure_ascii=False, indent=2)

@mcp.tool()
def get_order_details(order_id: int) -> str:
    """
    获取订单详细信息
    
    Args:
        order_id: 订单ID
    """
    import json
    order = order_service.get_order(order_id)
    
    if not order:
        return json.dumps({"success": False, "error": "订单不存在"}, ensure_ascii=False)
    
    return json.dumps({
        "success": True,
        "order": {
            "order_id": order.id,
            "user_id": order.user_id,
            "status": order.status,
            "total_amount": order.total_amount,
            "created_at": order.created_at.isoformat() if order.created_at else None,
            "items": [{
                "product_id": item.product_id,
                "product_name": item.product_name,
                "quantity": item.quantity,
                "unit_price": item.unit_price,
                "subtotal": item.quantity * item.unit_price
            } for item in order.items]
        }
    }, ensure_ascii=False, indent=2)

@mcp.tool()
def update_order_status(order_id: int, new_status: str) -> str:
    """
    更新订单状态
    
    Args:
        order_id: 订单ID
        new_status: 新状态 (pending/paid/shipped/completed/cancelled)
    """
    import json
    
    valid_statuses = ["pending", "paid", "shipped", "completed", "cancelled"]
    if new_status not in valid_statuses:
        return json.dumps({
            "success": False, 
            "error": f"无效状态,可选值:{valid_status}"
        }, ensure_ascii=False)
    
    order = order_service.update_status(order_id, new_status)
    if not order:
        return json.dumps({"success": False, "error": "订单不存在"}, ensure_ascii=False)
    
    return json.dumps({
        "success": True,
        "order_id": order.id,
        "new_status": order.status
    }, ensure_ascii=False)

# ─── 启动服务 ─────────────────────────────────────────────

if __name__ == "__main__":
    mcp.run(transport="stdio")

4.2 添加 Resources(数据查询接口)

除了 Tools,MCP 还支持 Resources——让 AI 可以主动查询数据:

# 添加 Resources
@mcp.resource("orders://recent")
def get_recent_orders() -> str:
    """获取最近的订单(用于上下文注入)"""
    import json
    all_orders = list(order_service.orders.values())
    recent = sorted(all_orders, key=lambda x: x.created_at or 0, reverse=True)[:10]
    
    return json.dumps([{
        "id": o.id,
        "user_id": o.user_id,
        "status": o.status,
        "total": o.total_amount,
        "item_count": len(o.items)
    } for o in recent], ensure_ascii=False)

@mcp.resource("orders://stats/{user_id}")
def get_user_order_stats(user_id: str) -> str:
    """获取用户订单统计"""
    import json
    user_orders = order_service.list_orders(user_id)
    
    stats = {
        "total_orders": len(user_orders),
        "pending": len([o for o in user_orders if o.status == "pending"]),
        "completed": len([o for o in user_orders if o.status == "completed"]),
        "total_spent": sum(o.total_amount or 0 for o in user_orders)
    }
    
    return json.dumps(stats, ensure_ascii=False)

4.3 添加 Prompts(预定义模板)

# 添加 Prompt 模板
@mcp.prompt()
def order_summary_prompt(user_id: str) -> str:
    """生成用户订单摘要的 Prompt"""
    return f"""请分析用户 {user_id} 的订单情况,生成一份摘要报告。

报告应包含:
1. 订单总数和各状态分布
2. 消费总额和平均订单金额
3. 购买频率分析
4. 购买建议(可选)"""

五、实战:构建完整的 AI 订单助手

5.1 系统架构

用户:查询我的订单状态
   ↓
AI Client (Claude / Cursor)
   ↓ MCP
MCP Server (FastAPI + 订单服务)
   ↓
数据库 / 业务逻辑
   ↓
返回结果给 AI
   ↓
AI 生成自然语言回答
   ↓
用户看到:您的订单 #12345 已发货,预计明天送达

5.2 完整实现代码

# ai_order_assistant.py
"""
AI 订单助手 - FastAPI + MCP 完整实现
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
import json

from mcp.server.fastmcp import FastMCP
from mcp.types import (
    TextContent,
    CallToolResult,
    EmbeddedResource,
    PromptMessage
)

# ════════════════════════════════════════════════════════════
# 1. 数据模型
# ════════════════════════════════════════════════════════════

class OrderItem(BaseModel):
    product_id: str
    product_name: str
    quantity: int = Field(..., gt=0)
    unit_price: float = Field(..., gt=0)

class OrderCreate(BaseModel):
    user_id: str
    items: list[OrderItem]

class Order(BaseModel):
    id: int
    user_id: str
    items: list[OrderItem]
    status: Literal["pending", "paid", "shipped", "completed", "cancelled"]
    created_at: datetime
    total_amount: float

# ════════════════════════════════════════════════════════════
# 2. 业务服务层
# ════════════════════════════════════════════════════════════

class OrderService:
    """订单服务"""
    
    def __init__(self):
        self.orders: dict[int, Order] = {}
        self.next_id = 1
    
    def create_order(self, data: OrderCreate) -> Order:
        total = sum(item.quantity * item.unit_price for item in data.items)
        order = Order(
            id=self.next_id,
            user_id=data.user_id,
            items=data.items,
            status="pending",
            created_at=datetime.now(),
            total_amount=total
        )
        self.orders[self.next_id] = order
        self.next_id += 1
        return order
    
    def get_order(self, order_id: int) -> Optional[Order]:
        return self.orders.get(order_id)
    
    def list_orders(self, user_id: str, status: str = None) -> list[Order]:
        orders = [o for o in self.orders.values() if o.user_id == user_id]
        if status:
            orders = [o for o in orders if o.status == status]
        return sorted(orders, key=lambda x: x.created_at, reverse=True)
    
    def update_status(self, order_id: int, status: str) -> Optional[Order]:
        if order_id in self.orders and status in ["pending", "paid", "shipped", "completed", "cancelled"]:
            self.orders[order_id].status = status
            return self.orders[order_id]
        return None

order_service = OrderService()

# ════════════════════════════════════════════════════════════
# 3. FastAPI 应用
# ════════════════════════════════════════════════════════════

app = FastAPI(title="AI 订单助手 API", version="1.0.0")

@app.post("/orders/")
async def api_create_order(data: OrderCreate):
    order = order_service.create_order(data)
    return order

@app.get("/orders/{order_id}")
async def api_get_order(order_id: int):
    order = order_service.get_order(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="订单不存在")
    return order

@app.get("/orders/")
async def api_list_orders(user_id: str, status: str = None):
    return order_service.list_orders(user_id, status)

# ════════════════════════════════════════════════════════════
# 4. MCP Server
# ════════════════════════════════════════════════════════════

mcp = FastMCP("AI-Order-Assistant")

# ─── Tools ────────────────────────────────────────────────

@mcp.tool()
def create_order(user_id: str, items: list[dict]) -> str:
    """
    创建新订单
    
    参数:
    - user_id: 用户ID,用于标识下单用户
    - items: 商品列表,每项包含 product_id, product_name, quantity, unit_price
    
    返回:创建成功的订单信息,包含订单号和总金额
    """
    try:
        order_data = OrderCreate(
            user_id=user_id,
            items=[OrderItem(**item) for item in items]
        )
        order = order_service.create_order(order_data)
        
        return json.dumps({
            "success": True,
            "order_id": order.id,
            "user_id": order.user_id,
            "total_amount": f"¥{order.total_amount:.2f}",
            "status": order.status,
            "items_count": len(order.items),
            "created_at": order.created_at.strftime("%Y-%m-%d %H:%M:%S")
        }, ensure_ascii=False, indent=2)
    except Exception as e:
        return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)

@mcp.tool()
def get_order(order_id: int) -> str:
    """
    根据订单号查询订单详情
    
    参数:
    - order_id: 订单号(数字)
    
    返回:订单的完整信息,包括商品列表、金额、状态等
    """
    order = order_service.get_order(order_id)
    
    if not order:
        return json.dumps({
            "success": False,
            "error": f"订单 #{order_id} 不存在"
        }, ensure_ascii=False)
    
    return json.dumps({
        "success": True,
        "order_id": order.id,
        "user_id": order.user_id,
        "status": order.status,
        "status_text": _get_status_text(order.status),
        "total_amount": f"¥{order.total_amount:.2f}",
        "created_at": order.created_at.strftime("%Y-%m-%d %H:%M:%S"),
        "items": [{
            "product_id": item.product_id,
            "product_name": item.product_name,
            "quantity": item.quantity,
            "unit_price": f"¥{item.unit_price:.2f}",
            "subtotal": f"¥{item.quantity * item.unit_price:.2f}"
        } for item in order.items]
    }, ensure_ascii=False, indent=2)

@mcp.tool()
def list_user_orders(user_id: str, status: str = None) -> str:
    """
    查询用户的所有订单
    
    参数:
    - user_id: 用户ID
    - status: 可选,按状态过滤 (pending/paid/shipped/completed/cancelled)
    
    返回:订单列表,按创建时间倒序排列
    """
    orders = order_service.list_orders(user_id, status)
    
    if not orders:
        return json.dumps({
            "success": True,
            "message": f"用户 {user_id} 暂无订单",
            "orders": []
        }, ensure_ascii=False, indent=2)
    
    return json.dumps({
        "success": True,
        "user_id": user_id,
        "total_orders": len(orders),
        "orders": [{
            "order_id": o.id,
            "status": o.status,
            "status_text": _get_status_text(o.status),
            "total_amount": f"¥{o.total_amount:.2f}",
            "items_count": len(o.items),
            "created_at": o.created_at.strftime("%Y-%m-%d %H:%M:%S")
        } for o in orders]
    }, ensure_ascii=False, indent=2)

@mcp.tool()
def update_order_status(order_id: int, new_status: str) -> str:
    """
    更新订单状态
    
    参数:
    - order_id: 订单号
    - new_status: 新状态,可选值:paid, shipped, completed, cancelled
    
    注意:不能将已完成的订单改为其他状态
    """
    valid_statuses = ["pending", "paid", "shipped", "completed", "cancelled"]
    
    if new_status not in valid_statuses:
        return json.dumps({
            "success": False,
            "error": f"无效状态,可选值:{valid_statuses}"
        }, ensure_ascii=False)
    
    # 检查订单是否存在
    order = order_service.get_order(order_id)
    if not order:
        return json.dumps({
            "success": False,
            "error": f"订单 #{order_id} 不存在"
        }, ensure_ascii=False)
    
    # 业务规则:已完成或已取消的订单不能修改
    if order.status in ["completed", "cancelled"]:
        return json.dumps({
            "success": False,
            "error": f"订单 #{order_id} 已是终态({order.status}),无法修改"
        }, ensure_ascii=False)
    
    updated = order_service.update_status(order_id, new_status)
    
    return json.dumps({
        "success": True,
        "order_id": updated.id,
        "old_status": order.status,
        "new_status": updated.status,
        "new_status_text": _get_status_text(updated.status),
        "message": f"订单状态已更新:{_get_status_text(order.status)}{_get_status_text(updated.status)}"
    }, ensure_ascii=False, indent=2)

# ─── Resources ────────────────────────────────────────────

@mcp.resource("orders://summary/{user_id}")
def get_order_summary(user_id: str) -> str:
    """
    获取用户订单摘要
    用于向 AI 提供用户订单的概览信息
    """
    orders = order_service.list_orders(user_id)
    
    if not orders:
        return json.dumps({
            "user_id": user_id,
            "summary": "暂无订单记录"
        }, ensure_ascii=False)
    
    by_status = {}
    for o in orders:
        by_status.setdefault(o.status, 0)
        by_status[o.status] += 1
    
    return json.dumps({
        "user_id": user_id,
        "total_orders": len(orders),
        "total_spent": f"¥{sum(o.total_amount for o in orders):.2f}",
        "average_order_value": f"¥{sum(o.total_amount for o in orders) / len(orders):.2f}",
        "by_status": by_status
    }, ensure_ascii=False, indent=2)

# ─── 辅助函数 ────────────────────────────────────────────

def _get_status_text(status: str) -> str:
    status_map = {
        "pending": "待支付",
        "paid": "已支付",
        "shipped": "已发货",
        "completed": "已完成",
        "cancelled": "已取消"
    }
    return status_map.get(status, status)

# ════════════════════════════════════════════════════════════
# 5. 启动入口
# ════════════════════════════════════════════════════════════

if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1:
        if sys.argv[1] == "--http":
            # HTTP API 模式
            import uvicorn
            uvicorn.run(app, host="0.0.0.0", port=8000)
        elif sys.argv[1] == "--both":
            # 同时启动 HTTP 和 MCP
            import threading
            import uvicorn
            
            def run_http():
                uvicorn.run(app, host="0.0.0.0", port=8000)
            
            http_thread = threading.Thread(target=run_http, daemon=True)
            http_thread.start()
            
            print("🚀 HTTP API 已启动: http://localhost:8000")
            print("📡 MCP Server 准备就绪...")
            mcp.run(transport="stdio")
    else:
        # 默认 MCP 模式
        mcp.run(transport="stdio")

5.3 测试 MCP Server

# 启动 MCP Server
python ai_order_assistant.py

# 测试工具调用(通过 MCP Inspector 或 Claude Desktop)
# MCP Inspector 命令:
npx @modelcontextprotocol/inspector python ai_order_assistant.py

六、生产环境最佳实践

6.1 添加认证机制

# auth_mcp_server.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer
from mcp.server.fastmcp import FastMCP

app = FastAPI()
mcp = FastMCP("secure-order-server")
security = HTTPBearer()

# 模拟用户数据库
USERS = {
    "user1": "secret1",
    "user2": "secret2"
}

def verify_token(token: str) -> str:
    """验证 token,返回用户 ID"""
    for user_id, secret in USERS.items():
        if secret == token:
            return user_id
    raise HTTPException(status_code=401, detail="无效的认证凭证")

# 安全封装的 Tool
@mcp.tool()
def secure_get_orders(token: str, status: str = None) -> str:
    """查询订单(需要认证)
    
    参数:
    - token: 认证 token(用户的密钥)
    - status: 可选,按状态过滤
    """
    try:
        user_id = verify_token(token)
        orders = order_service.list_orders(user_id, status)
        
        return json.dumps({
            "success": True,
            "user_id": user_id,
            "orders": [...]
        }, ensure_ascii=False)
    except HTTPException as e:
        return json.dumps({"success": False, "error": "认证失败"}, ensure_ascii=False)

6.2 添加限流保护

# rate_limited_mcp.py
import time
from collections import defaultdict
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("rate-limited-server")

# 简单的内存限流
call_counts = defaultdict(list)
RATE_LIMIT = 10  # 每分钟最多 10 次
RATE_WINDOW = 60  # 时间窗口 60 秒

def check_rate_limit(tool_name: str, client_id: str = "default") -> bool:
    """检查是否超过限流"""
    key = f"{tool_name}:{client_id}"
    now = time.time()
    
    # 清理过期的记录
    call_counts[key] = [t for t in call_counts[key] if now - t < RATE_WINDOW]
    
    if len(call_counts[key]) >= RATE_LIMIT:
        return False
    
    call_counts[key].append(now)
    return True

@mcp.tool()
def rate_limited_query(user_id: str) -> str:
    """带限流的查询"""
    if not check_rate_limit("query", user_id):
        return json.dumps({
            "success": False,
            "error": "请求过于频繁,请稍后再试"
        }, ensure_ascii=False)
    
    # 正常业务逻辑
    ...

6.3 日志与监控

# logging_mcp.py
import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("mcp-server")

@mcp.tool()
def logged_create_order(user_id: str, items: list[dict]) -> str:
    """带完整日志的创建订单"""
    logger.info(f"创建订单请求: user_id={user_id}, items_count={len(items)}")
    
    start_time = datetime.now()
    try:
        result = create_order(user_id, items)
        duration = (datetime.now() - start_time).total_seconds()
        logger.info(f"订单创建成功: {result}, 耗时: {duration:.3f}s")
        return result
    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        logger.error(f"订单创建失败: {e}, 耗时: {duration:.3f}s")
        raise

6.4 健康检查与调试端点

# debug_endpoints.py
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class HealthResponse(BaseModel):
    status: str
    timestamp: datetime
    tools_count: int
    total_orders: int

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """健康检查端点"""
    return HealthResponse(
        status="healthy",
        timestamp=datetime.now(),
        tools_count=len(mcp._tool_manager.tools),  # 暴露的工具数量
        total_orders=len(order_service.orders)
    )

@app.get("/debug/tools")
async def list_tools():
    """列出所有可用的 MCP Tools(调试用)"""
    return {
        "tools": [
            {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.input_schema
            }
            for tool in mcp._tool_manager.tools
        ]
    }

七、与主流 AI 平台的集成

7.1 Claude Desktop

~/.claude/settings.json 中添加:

{
  "mcpServers": {
    "order-assistant": {
      "command": "python",
      "args": ["/path/to/ai_order_assistant.py"]
    }
  }
}

7.2 Cursor

在 Cursor 设置中添加 MCP Server:

{
  "mcpServers": {
    "order-assistant": {
      "command": "python",
      "args": ["/path/to/ai_order_assistant.py"]
    }
  }
}

7.3 自定义 AI 客户端

# client_example.py
from mcp.client.fastmcp import FastMCPClient

async def use_order_assistant():
    async with FastMCPClient("python /path/to/ai_order_assistant.py") as client:
        # 调用 Tool
        result = await client.call_tool("list_user_orders", {"user_id": "user1"})
        print(result)
        
        # 读取 Resource
        resource = await client.read_resource("orders://summary/user1")
        print(resource)

八、常见问题 FAQ

Q1: mount_app() 和手动定义 Tools 哪个更好?

取决于场景:

  • 快速原型 / 内部工具mount_app() 更方便,自动映射所有端点
  • 生产环境 / 需要精细控制:手动定义更灵活,可以添加详细的参数描述、输入校验和错误处理

Q2: MCP Server 支持热更新吗?

不支持热更新。每次修改代码后需要重启服务。如果需要频繁更新,建议:

  1. 将业务逻辑封装为独立服务
  2. MCP Server 只做薄薄的适配层
  3. 业务更新不需要重启 MCP Server

Q3: 如何处理 Tool 执行超时?

在 Tool 内部添加超时控制:

from functools import partial
import asyncio

@mcp.tool()
def long_running_task(param: str) -> str:
    try:
        # 使用 asyncio.wait_for 设置超时
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(
            asyncio.wait_for(async_task(param), timeout=30)
        )
        return result
    except asyncio.TimeoutError:
        return json.dumps({"error": "任务执行超时"})

Q4: MCP 和 Web API 可以同时运行吗?

可以!参考上面的 --both 模式:

python ai_order_assistant.py --both

这样既可以通过 MCP 供 AI 调用,也可以通过 HTTP 供其他系统调用。

Q5: 如何处理 Tool 的复杂返回(如图、文件等)?

MCP 支持返回多种格式:

  • 文本:直接返回字符串
  • 结构化 JSON:返回序列化的 JSON 字符串
  • 图片/文件:返回 base64 编码或文件路径
  • 资源引用:返回 EmbeddedResource 类型

总结

核心要点

  1. FastAPI + MCP 是黄金组合:FastAPI 的异步性能 + 自动文档 + 类型校验,配合 MCP 的标准化接口,如虎添翼

  2. 三行代码完成基础改造mount_app() 方法可以一键将 FastAPI 应用暴露为 MCP Server

  3. 手动定义更灵活:对于生产环境,建议手动定义 Tool,可以精细控制参数描述、错误处理和业务规则

  4. 完整的示例代码:本文提供了从基础到生产级别的完整实现,可直接参考使用

代码复用优势

使用 FastAPI + MCP 架构,你的代码复用效率大幅提升:

一份业务代码
├── 暴露为 HTTP API(供其他系统调用)
├── 暴露为 MCP Server(供 AI 客户端调用)
└── 同时运行(--both 模式)

适用场景

场景 推荐方式
快速验证 MCP 概念 mount_app()
企业内部工具服务 手动定义 + 认证 + 限流
需要同时服务 AI 和其他系统 --both 双模式
高并发生产环境 独立部署 + 负载均衡
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐