AI大模型之旅之MCP深入理解

一、基础概念

1.1 什么是MCP?

MCP (Model Context Protocol,模型上下文协议),2024年11月底,由Anthropic推出的一种开放标准,旨在统一大型语言模型(LLM)与外部数据源和工具之间的通 信协议。MCP的主要目的在于解决当前Al模型因数据孤岛限制而无法充分发挥潜力 的难题,MCP使得AI应用能够安全地访问和操作本地及远程数据,为AI应用提供了 连接万物的接口

使用 MCP,您可以:

  1. 构建为 LLMs 提供工具和数据的服务器
  2. 将这些服务器连接到兼容 MCP 的客户端
  3. 通过自定义功能扩展 LLM 的能力
1.2 MCP与Function Calling的区别
类别 MCP Function Calling
定义 Model Context Protocol,模型上下文协议是一个标准协议,使AI模型与API无缝交互 Function Calling是Al模型调用函数的机制
性质 协议 功能
范围 通用(多数据源、多功能) 特定场景(单一数据源或功能)
目标 统一接口,实现互操作 扩展模型能力
实现 基于标准协议 依赖特定模型
开发复杂度 低:通过统一协议实现多源兼容 高:需要为每个任务单独开发函数
灵活性 高:支持动态适配和扩展 低:功能扩展需要额外开发
1.3 工作原理

MCP协议采用了一种独特的架构设计,它将LLM与资源之间的通信划分为三个主要 部分:客户端、服务器和资源。客户端负责发送请求给MCP服务器,服务器则将这些请求转发给相应的资源。这种分层的设计使得MCP协议能够更好地控制访问权限,确保只有经过授权的用户才能 访问特定的资源。

以下是MCP的基本工作流程:

  • 初始化连接:客户端向服务器发送连接请求,建立通信通道。
  • 发送请求:客户端根据需求构建请求消息,并发送给服务器。
  • 处理请求:服务器接收到请求后,解析请求内容,执行相应的为操作(如查询数据 库、读取文件等)。
  • 返回结果:服务器将处理结果封装成响应消息,发送回客户站瑞。
  • 断开连接:任务完成后,客户端可以主动关闭连接或等待服务各器超时关

在这里插入图片描述

1.4 三种传输方式
传输方式 适用场景 特点
stdio 本地 Server 最简单,通过标准输入输出通信
HTTP + SSE 远程 Server Server-Sent Events,兼容性好
Streamable HTTP 远程 Server(新) 更灵活,支持双向流

mcp.tool()的作用是对函数进行注册,将函数转化为可供MCP框架使用的工具

1.5 MCP架构

在这里插入图片描述

1.5 MCP 服务器可以提供三种主要类型的功能
  • 工具:可被 LLM 调用的函数(需要用户批准)
  • 资源:可被客户端读取的类文件数据(如 API 响应或文件内容)
  • 提示词:帮助用户完成特定任务的预设模板
能力 说明 场景 示例
@mcp.tool() 用于 注册一个工具(Tool),
让大模型可以像调用函数一样调用这个能力
查询数据库
调用外部 API
执行业务逻辑
计算或处理数据
@mcp.tool()
def add(a: int, b: int) -> int:
“”“计算两个数字之和”“”
return a + b
@mcp.resource() 用于 注册资源(Resource),
向大模型提供 只读数据
规则文档
FAQ
说明书
知识库
@mcp.resource(“docs://refund_policy”)
def refund_policy():
return “”“
退款规则:
1. 7天无理由退货
2. 已发货订单需承担运费
”“”
@mcp.prompt() 用于 注册 Prompt 模板,供客户端或大模型使用 系统 Prompt
任务 Prompt
@mcp.model() 用于 在 MCP Server 中注册模型能力 提供内部 LLM
提供 embedding
提供 AI 推理能力
mcp.run() 用于 启动 MCP Server 服务。
1.6 mcp架构

在这里插入图片描述

工作流程:

  1. ⽤⼾提交查询
  2. 智能体连接到 MCP 服务器以发现⼯具
  3. 根据查询,智能体调⽤合适的⼯具并获取上下⽂
  4. 智能体返回上下⽂感知的响应
1.6 中文官网

https://mcpcn.com/docs/

二、一个简单的LangGraph Agent + MCP Server + MCP Client + Tool 调用

2.1 代码结构

ai-agent-mcp

├── mcp_server
│ └── server.py

├── agent
│ └── client.py

├── client
│ └── run_agent.py

├── requirements.txt

2.2 代码

requirements.txt

langchain==1.2.8
langgraph==1.0.8
mcp==1.26.0
fastapi
uvicorn
pymysql
redis
weaviate-client
langchain-weaviate
langchain-openai
python-dotenv
google-search-results

server.py

from fastmcp import FastMCP
from datetime import datetime

mcp = FastMCP("AI Tool Server")


@mcp.tool()
def add(a: int, b: int) -> int:
    """计算两个数的和"""
    return a + b


@mcp.tool()
def multiply(a: int, b: int) -> int:
    """计算两个数的乘积"""
    return a * b


@mcp.tool()
def get_time() -> str:
    """获取当前时间"""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


@mcp.tool()
def get_weather(city: str) -> str:
    """获取城市天气"""
    return f"{city}天气:23℃ 晴"


if __name__ == "__main__":
    mcp.run(
        transport="sse",
        host="0.0.0.0",
        port=9000
    )

client.py

import dotenv
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
from typing import TypedDict
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
import logging
from langchain.chat_models import init_chat_model

# =========================================================
# 1️⃣ 加载 .env 环境变量 注意:这里面有
# OPENAI_API_KEY
# OPENAI_BASE_UR
# 等配置
# =========================================================
dotenv.load_dotenv()

# =========================================================
# 2️⃣ 配置日志系统
# level: 日志等级
# format: 输出格式
# datefmt: 时间格式
# =========================================================
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# 创建 logger 对象
# __name__ 表示当前模块名称
logger = logging.getLogger(__name__)

# =========================================================
# 3️⃣ 初始化大模型
# init_chat_model 是 LangChain 统一模型入口
# 这里使用 gpt-4o-mini
# =========================================================
model = init_chat_model("gpt-4o-mini")


# =========================================================
# 4️⃣ 定义 LangGraph 的 State
# Graph 中节点之间通过 State 传递数据
# =========================================================
class State(TypedDict):
    # 用户问题
    question: str

    # 最终答案
    answer: str


# =========================================================
# 5️⃣ 封装 LLM 调用函数
# =========================================================
async def call_llm(question: str):
    # ainvoke = 异步调用模型
    response = await model.ainvoke(
        [HumanMessage(content=question)]
    )

    # 返回模型生成的文本
    return response.content


# =========================================================
# 6️⃣ Agent 节点
# 这个节点负责:
# 1. 获取 MCP 工具
# 2. 让 LLM 判断是否需要调用工具
# 3. 调用工具
# 4. 再次调用 LLM 生成最终答案
# =========================================================
async def agent_node(state: State):
    # 从 State 中获取用户问题
    question = state["question"]

    # =====================================================
    # 连接 MCP Server
    # SSE = Server Sent Event
    # =====================================================
    async with sse_client("http://localhost:9000/sse") as streams:

        # 创建 MCP 会话
        async with ClientSession(streams[0], streams[1]) as session:

            # 初始化 MCP 会话
            await session.initialize()

            # =================================================
            # 获取 MCP Server 所有可用工具
            # =================================================
            tools = (await session.list_tools()).tools

            logger.info("获取可用tools:%s", tools)

            # =================================================
            # 构造工具描述字符串
            # 格式:
            # tool1:description
            # tool2:description
            # =================================================
            tool_desc = "\n".join(
                [f"{t.name}:{t.description}" for t in tools]
            )

            logger.info("获取可用tool_desc:%s", tool_desc)

            # =================================================
            # 构造 Prompt
            # 让 LLM 决策是否调用工具
            # =================================================
            prompt = f"""
                你可以调用以下工具:

                {tool_desc}

                用户问题:{question}

                如果需要工具返回JSON:
                {{"tool":"工具名","tool_input":{{}}}}

                否则直接回答
                """

            # 调用大模型
            result = await call_llm(prompt)

            # =================================================
            # 判断 LLM 是否选择调用工具
            # =================================================
            if "tool" in result:

                import json

                # 解析 JSON
                tool_json = json.loads(result)

                # 获取工具名称
                tool_name = tool_json["tool"]

                # 获取工具参数
                tool_input = tool_json["tool_input"]

                # =================================================
                # 调用 MCP 工具
                # =================================================
                ret = await session.call_tool(tool_name, tool_input)

                # 工具返回结果
                tool_result = ret.content[0].text

                logger.info(f"用户问题:{question},使用工具:{tool_name}")

                # =================================================
                # 再次调用 LLM
                # 使用工具结果生成最终回答
                # =================================================
                final_answer = await call_llm(
                    f"用户问题:{question},工具返回:{tool_result}"
                )

                return {"answer": final_answer}

            else:

                # 如果不需要工具
                # 直接返回模型结果
                return {"answer": result}


# =========================================================
# 7️⃣ 构建 LangGraph
# =========================================================
def build_graph():
    # 创建 Graph Builder
    builder = StateGraph(State)

    # 添加节点
    builder.add_node("agent", agent_node)

    # 设置入口节点
    builder.set_entry_point("agent")

    # agent 执行完后结束
    builder.add_edge("agent", END)

    # 编译 Graph
    return builder.compile()

run_agent.py

import asyncio
from mcp_client.client import build_graph


graph = build_graph()


async def main():

    questions = [
        "25+38是多少",
        "356*125是多少",
        "现在时间",
        "Shanghai天气",
        "讲个笑话"
    ]

    for q in questions:

        result = await graph.ainvoke({
            "question": q
        })

        print("问题:", q)
        print("回答:", result["answer"])
        print("-" * 40)


if __name__ == "__main__":
    asyncio.run(main())
2.3 测试

1.启动服务端,直接点击server.py的main函数启动
2.启动客户端,直接点击run_agent.py的main函数即可启动

输出如下:
…… 太多了

三、一个企业级智能客户系统(SSE)

3.1 需求及使用到的知识点

1.使用:Langchain+LangGrap+MCP+agent+rag
2.使用向量数据库weaviate 向量化必要数据,如公司信息、退款规则等(提供单独接口可供postman访问,调用接口向向量数据库更新或存储新知识)
3.使用redis作记忆存储等。redis设置过期时间,防止数据爆炸
4.查询mysql数据库,查询订单信息,商品信息,支付信息等,但要注意安全控制仅允许查询语句(给出用到的SQL建表语句以及少量的数据)
5.可以联网搜索答案(使用GoogleSerperAPIWrapper),可以闲聊查询天气、旅游景点、美食等
6.使用MCP 的SSE 模式完成。server+client

3.2 流程图

在这里插入图片描述

3.3 项目代码结构
fast-mcp04
│
├── main.py
│
├── agent/
│   └── graph_agent.py
│	 └── prompts.py
│	└── schema_loader.py
│
├── api/
│   └── chat_api.py
│   
│
├── mcp_server/
│   └── server.py
├── mcp_client/
│   └── client.py
│
├── rag/
│   └── weaviate_rag.py
│
├── mcp_memory/
│   └── redis_memory.py
│
├── db/
│   └── mysql_schema.sql
├── rag/
│   └── weaviate_client.py
│
├── tools/
│   ├── search_tool.py
│   └── mysql_tool.py
│
└── requirements.txt
3.4 代码展示

mysql_schema.sql

CREATE DATABASE ecommerce;

USE ecommerce;

CREATE TABLE users (
 id INT PRIMARY KEY AUTO_INCREMENT,
 name VARCHAR(50),
 email VARCHAR(100)
);

CREATE TABLE products (
 id INT PRIMARY KEY AUTO_INCREMENT,
 name VARCHAR(100),
 price DECIMAL(10,2)
);

CREATE TABLE orders (
 id INT PRIMARY KEY AUTO_INCREMENT,
 user_id INT,
 product_id INT,
 status VARCHAR(50),
 created_at DATETIME
);

INSERT INTO users VALUES
(1,'Alice','alice@test.com'),
(2,'Bob','bob@test.com');

INSERT INTO products VALUES
(1,'iPhone 15',6999),
(2,'MacBook Pro',15999);

INSERT INTO orders VALUES
(1,1,1,'PAID',NOW()),
(2,2,2,'SHIPPED',NOW());

requirements.txt

fastapi~=0.135.1
pydantic~=2.12.5
python-dotenv~=1.2.1
weaviate-client~=4.19.2
langchain-core~=1.2.15
langchain-openai~=1.1.7
langchain-text-splitters~=1.1.0
langgraph~=1.0.10
langchain~=1.2.10
SQLAlchemy~=2.0.35
langchain-community~=0.4.1
uvicorn~=0.40.0
llama-index-core~=0.14.15
fastmcp~=3.1.0

graph_agent.py

import dotenv
import logging
from typing import Annotated, TypedDict

from langchain_core.tools import StructuredTool
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages

from pydantic import BaseModel, SecretStr

from mcp_client.client import init_mcp
from mcp_memory.redis_memory import init_redis, get_redis_saver
from agent.schema_loader import load_schema
from agent.prompts import build_system_prompt

dotenv.load_dotenv()

# =========================================================
# 1.日志配置
# =========================================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

log = logging.getLogger(__name__)

# =========================================================
# 2.LLM 初始化
# =========================================================
# 方式一: OpenAI远端模型
llm = init_chat_model(model="gpt-4o-mini")
# 方式二: 本地模型
# llm = init_chat_model(model="qwen2.5:3b",model_provider="ollama",base_url="http://localhost:11434",api_key=SecretStr("ollama"))

graph = None

# =========================================================
# 3.Agent State
# =========================================================
class AgentState(TypedDict):
    """
    LangGraph 状态结构

    add_messages 作用:
    自动累加 messages,实现对话记忆
    """
    messages: Annotated[list[BaseMessage], add_messages]


class QueryInput(BaseModel):
    query: str


# =========================================================
# 4.MCP 工具注册
# =========================================================
def build_tools(session, tools):
    """
    将 MCP 工具包装为 LangChain Tool
    """
    lc_tools = []

    for t in tools:
        tool_name = t.metadata.name
        desc = t.metadata.description or f"MCP tool: {tool_name}"

        log.info("发现 MCP 工具: %s", tool_name)

        async def run(query: str, name=tool_name):
            log.info("调用 MCP 工具: %s", name)
            log.info("工具参数: %s", query)

            result = await session.call_tool(
                name,
                {"query": query}
            )

            tool_result = result.content[0].text

            log.info("工具返回结果: %s", tool_result)

            return tool_result

        lc_tools.append(
            StructuredTool.from_function(
                name=tool_name,
                description=desc,
                coroutine=run,
                args_schema=QueryInput
            )
        )

    return lc_tools


# =========================================================
# 5.构建 Graph
# =========================================================
async def build_graph():
    """
    系统初始化

    1 初始化 Redis Memory
    2 加载数据库 Schema
    3 初始化 MCP 工具
    4 构建 LangGraph Agent
    """

    global graph


    log.info("===== 系统启动 =====")

    # 5.1初始化 Redis
    await init_redis()
    saver = get_redis_saver()

    # 5.2加载数据库结构
    log.info("加载数据库 Schema")
    schema_text = load_schema()

    # 5.3构建系统 Prompt
    full_system_prompt = build_system_prompt(schema_text)

    # 5.4连接 MCP
    log.info("连接 MCP Server")
    session, tools = await init_mcp()

    # 5.5注册 MCP 工具
    lc_tools = build_tools(session, tools)

    log.info("工具注册完成,共 %d 个工具", len(lc_tools))

    # 5.6绑定工具
    llm_with_tools = llm.bind_tools(lc_tools)

    # 5.7 创建了一个“图结构的工作流构建器”,并定义了整个 Agent 的状态结构
    builder = StateGraph(AgentState)

    # =========================================================
    # 5.8 Agent 节点
    # =========================================================
    async def agent(state: AgentState):

        messages = state["messages"]
        question = messages[-1].content

        log.info("用户问题: %s", question)

        # 5.9 第一次 LLM 调用 作用:让模型判断是否需要调用工具
        resp = await llm_with_tools.ainvoke(
            [SystemMessage(content=full_system_prompt)] + messages
        )

        # =====================================================
        # 5.10 需要调用工具
        # =====================================================
        if hasattr(resp, "tool_calls") and resp.tool_calls:

            call = resp.tool_calls[0]

            tool_name = call["name"]
            args = call["args"]

            log.info("LLM 决定调用工具:LLM选择工具: %s,工具参数: %s", tool_name, args)

            # 调用 MCP 工具
            ret = await session.call_tool(tool_name, args)

            tool_result = ret.content[0].text

            log.info("工具返回: %s", tool_result)

            # 第二次调用 LLM
            # 作用:根据工具返回结果生成自然语言回答
            final_answer = await call_llm(
                messages,
                question,
                full_system_prompt,
                tool_result
            )

            return {
                "messages": messages + [
                    AIMessage(content=final_answer)
                ]
            }

        # =====================================================
        # 5.11 不需要工具
        # =====================================================
        else:

            log.info("LLM 判断无需调用工具")

            return {
                "messages": messages + [
                    AIMessage(content=resp.content)
                ]
            }

    builder.add_node("agent", agent)
    builder.set_entry_point("agent")
    # 构建 LangGraph 带redis存储
    graph = builder.compile(checkpointer=saver)

    log.info("LangGraph 构建完成")


# =========================================================
# 6.Agent 调用入口
# =========================================================
async def run_agent(user_id: str, msg: str):
    """
    Agent 执行入口
    """

    log.info("收到请求 user_id=%s", user_id)

    result = await graph.ainvoke(
        {
            "messages": [HumanMessage(content=msg)]
        },
        config={
            "configurable": {
                "thread_id": user_id
            }
        }
    )

    answer = result["messages"][-1].content

    log.info("Agent 返回结果: %s", answer)

    return answer


# =========================================================
# 7.二次 LLM 调用
# =========================================================
async def call_llm(messages, question, system_prompt, tool_result):
    """
    第二次调用 LLM

    作用:
    将工具返回结果整理成用户可读的回答
    """

    log.info("LLM 根据工具结果生成回答")

    response = await llm.ainvoke(
        [SystemMessage(content=system_prompt)]
        + messages
        + [
            HumanMessage(content=f"""
用户问题:
{question}

工具返回:
{tool_result}

请根据结果回答用户问题
""")
        ]
    )

    return response.content

prompts.py

def build_system_prompt(schema_text: str) -> str:
    """
    构建系统提示词

    作用:
    1 让大模型知道数据库结构
    2 约束 SQL 生成规则
    3 指导工具使用
    """

    tool_guideline = """
你是电商智能客服助手“小蜜蜂”,请使用礼貌、专业语气回答客户问题。

可用工具:

1. mysql_query
   查询商品、订单、库存等 MySQL 数据库信息
   参数: { "query": "SQL语句" }

2. rag
   查询公司概况、公告、退款规则等知识库
   参数: { "query": "自然语言问题" }

3. search
   查询互联网公开信息,闲聊、查询天气、旅游景点、美食等
   参数: { "query": "搜索关键词" }

工具使用规则:

- 商品 / 订单 / 库存 → mysql_query
- 公司介绍 / 退款规则 → rag
- 新闻 / 互联网信息 → search
"""

    db_prompt = f"""
你是 MySQL 数据库专家。

数据库结构:

{schema_text}

SQL规则:

1 只允许 SELECT
2 禁止 UPDATE DELETE INSERT
3 SQL 必须基于提供的表结构
4 SQL 语句必须正确
"""

    return tool_guideline + "\n" + db_prompt

schema_loader.py

import os
import dotenv
from sqlalchemy import create_engine, text

dotenv.load_dotenv()

MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_PORT = os.getenv("MYSQL_PORT")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DB = os.getenv("MYSQL_DB")

MYSQL_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}"

engine = create_engine(MYSQL_URL)

# 读取数据库表结构
def load_schema():

    sql = """
    SELECT
        table_name AS table_name,
        column_name AS column_name,
        data_type AS data_type
    FROM information_schema.columns
    WHERE table_schema = :db
    ORDER BY table_name
    """

    with engine.connect() as conn:

        result = conn.execute(text(sql), {"db": MYSQL_DB})

        tables = {}

        for r in result:

            row = dict(r._mapping)

            table = row["table_name"]
            column = row["column_name"]
            dtype = row["data_type"]

            tables.setdefault(table, []).append(f"{column} ({dtype})")

    schema_text = ""

    for table, cols in tables.items():

        schema_text += f"\n表 {table}\n"

        for c in cols:
            schema_text += f" - {c}\n"

    return schema_text

chat_api.py

from fastapi import APIRouter
from agent.graph_agent import run_agent
from rag.weaviate_client import add_knowledge, search_knowledge
from pydantic import BaseModel
import logging

# =========================================================
# 1.日志配置
# =========================================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
log = logging.getLogger(__name__)

router = APIRouter()


# 1️.定义客户聊天请求模型
class ChatRequest(BaseModel):
    msg: str
    user_id: str


# 2.定义添加向量数据库请求模型
class AddEmbeddingRequest(BaseModel):
    content: str


@router.post("/chat")
async def chat(request: ChatRequest):
    log.info("收到请求,用户:%s,消息:%s", request.user_id, request.msg)
    answer = await run_agent(request.user_id, request.msg)

    return {"answer": answer}


@router.post("/add_knowledge")
def add(text: AddEmbeddingRequest):
    context = text.content
    log.info("收到请求,存储向量数据库消息:%s", context)
    add_knowledge(context)

    return {"msg": "ok"}


@router.post("/search_knowledge")
def add(text: AddEmbeddingRequest):
    content = text.content
    log.info("收到请求,搜索向量数据库消息:%s", content)
    resp = search_knowledge(content)
    return {"msg": resp}

client.py

from llama_index.tools.mcp import BasicMCPClient, McpToolSpec
import os
import dotenv

dotenv.load_dotenv()
tools = None
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL")

# 连接MCP 服务器并获取可用工具
async def init_mcp():
    global tools

    client = BasicMCPClient(MCP_SERVER_URL)

    tool_spec = McpToolSpec(client)

    tools = await tool_spec.to_tool_list_async()


    return client, tools

redis_memory.py


```python
import os
import dotenv
import logging
from langgraph.checkpoint.redis import AsyncRedisSaver

dotenv.load_dotenv()

log = logging.getLogger(__name__)

REDIS_URL = os.getenv("REDIS_URL")

redis_saver: AsyncRedisSaver | None = None


# ===================================
# 带 TTL 的 RedisSaver
# ===================================
class TTLRedisSaver(AsyncRedisSaver):

    def __init__(self, *args, ttl_seconds=3600, **kwargs):
        super().__init__(*args, **kwargs)
        self.ttl_seconds = ttl_seconds

    async def aput(self, config, checkpoint, metadata, new_versions):

        result = await super().aput(
            config,
            checkpoint,
            metadata,
            new_versions
        )

        await self._apply_ttl(config)

        return result

    async def aget(self, config):

        data = await super().aget(config)

        await self._apply_ttl(config)

        return data

    async def _apply_ttl(self, config):

        try:

            thread_id = config["configurable"]["thread_id"]

            async for key in self._redis.scan_iter(f"*{thread_id}*"):

                ttl = await self._redis.ttl(key)

                if ttl == -1:  # 只有没有 TTL 的 key 才设置
                    await self._redis.expire(key, self.ttl_seconds)

        except Exception as e:
            log.warning("TTL设置失败: %s", e)
# ===================================
# 初始化 Redis
# ===================================
async def init_redis():
    global redis_saver

    log.info("初始化 Redis Checkpoint Saver")

    cm = TTLRedisSaver.from_conn_string(REDIS_URL)

    redis_saver = await cm.__aenter__()
    redis_saver.ttl_seconds = 600

def get_redis_saver() -> AsyncRedisSaver:

    if redis_saver is None:
        raise RuntimeError("RedisSaver 未初始化")

    return redis_saver

server.py

from fastmcp import FastMCP
from tools.mysql_tool import safe_query
from tools.search_tool import search_web
from rag.weaviate_client import search_knowledge
import logging

# =========================================================
# 日志配置
# =========================================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

log = logging.getLogger(__name__)

mcp = FastMCP("ecommerce-tools")


@mcp.tool()
def mysql_query(query: str):


    log.info("调用 mysql_query")
    log.info(f"SQL: {query}")
    logging.debug("mysql_query:", query)
    sql = query

    return safe_query(sql)


@mcp.tool()
def rag(query: str):
    print("调用 RAG")

    return search_knowledge(query)


@mcp.tool()
def search(query: str):
    print("调用 search")

    return search_web(query)


if __name__ == "__main__":
    mcp.run(
        transport="sse",
        host="0.0.0.0",
        port=9000
    )

weaviate_client.py

`mysql_tool.py`
```python
import os
import dotenv
import weaviate
from langchain_core.documents import Document
from langchain_weaviate import WeaviateVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

dotenv.load_dotenv()

WEAVIATE_HOST = os.getenv("WEAVIATE_URL", "localhost")

# ==========================
# 1 初始化 Weaviate Client
# ==========================
client = weaviate.connect_to_local(
    host=WEAVIATE_HOST,
    port=8088,
    grpc_port=50051
)

# ==========================
# 2 Embedding
# ==========================
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small"
)

# ==========================
# 3 Vector Store
# ==========================
vector_store = WeaviateVectorStore(
    client=client,
    embedding=embeddings,
    index_name="yunFanDb2",
    text_key="text",
)

print("✅ Weaviate 向量数据库连接成功")


# 搜索向量数据库
def search_knowledge(query):
    retriever = vector_store.as_retriever(search_kwargs={"k": 2})
    resp = retriever.invoke(query)
    return "\n".join([d.page_content for d in resp])
    # return vector_store.similarity_search(query)


# 向量化数据
def add_knowledge(req: str):
    documents = [
        Document(page_content=req)
    ]
    # 文档分割器
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=0,
                                                   separators=["。", "?", "\n\n", "\n", " ", ""])
    docs = text_splitter.split_documents(documents)
    vector_store.add_documents(docs)
    return {"msg": "知识添加成功"}

mysql_tool.py

import os

import dotenv
from sqlalchemy import create_engine, text
import re

dotenv.load_dotenv()

MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_PORT = os.getenv("MYSQL_PORT")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DB = os.getenv("MYSQL_DB")
MYSQL_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}"
engine = create_engine(MYSQL_URL)


def safe_query(sql):

    print("执行SQL:", sql)

    if not re.match(r"^\s*select", sql, re.I):
        return "只允许SELECT查询"

    with engine.connect() as conn:
        result = conn.execute(text(sql))

        rows = [dict(r._mapping) for r in result]

    return rows

search_tool.py

import os

import dotenv
from langchain_community.utilities import GoogleSerperAPIWrapper

dotenv.load_dotenv()
SERPER_API_KEY = os.getenv("SERPER_API_KEY")

search=GoogleSerperAPIWrapper(
    serper_api_key=SERPER_API_KEY
)

def search_web(query):

    print("联网搜索:",query)

    return search.run(query)

main.py

from fastapi import FastAPI
from contextlib import asynccontextmanager
import uvicorn
import logging
from agent.graph_agent import build_graph
from api.chat_api import router

# =========================================================
# 1.日志配置
# =========================================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
log = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    log.info("初始化 LangGraph Agent ...")

    await build_graph()

    log.info("Agent 初始化完成")

    yield

    log.info("===== 系统关闭 =====")


def create_app() -> FastAPI:
    app = FastAPI(
        title="AI 电商智能客服",
        version="1.0",
        lifespan=lifespan
    )

    app.include_router(router)

    return app


app = create_app()

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8888,
        reload=True
    )
3.5 测试

请求一:查询谷歌互联网

http://localhost:8888/chat
{
	"msg":"今天有哪些新闻",
	"user_id":"20260310-009"
}
返回:
{
    "answer": "今天的新闻内容包括:\n\n1. 最新发布的视频证实,击中伊朗学校附近海军基地的是一枚战斧巡航导弹,表明美军是这场冲突中唯一使用战斧导弹的武装力量。\n2. 伊朗向美以展现强硬态度,涉及的潜在军事冲突分析。\n3. 美国及伊朗在战争走向上的误判。\n4. 富裕国家考虑释放石油储备,以应对油价波动。\n5. 特朗普即将访华之际,中国出口机器依然强劲。\n6. 中国“养龙虾”热潮提振科技股,加速人工智能转型。\n\n如果您需要更详细的报道或其他信息,欢迎随时告知我!"
}

请求二:查询mysql

http://localhost:8888/chat
{
	"msg":"查询订单号为1 的商品状态",
	"user_id":"20260310-009"
}
返回:
{
    "answer": "您好,订单号为1的商品状态是:已支付(paid)。如果您还有其他问题,请随时告诉我!"
}

请求三:查询向量数据库weaviate

http://localhost:8888/chat
{
	"msg":"请介绍字节公司",
	"user_id":"20260310-009"
}
{
    "answer": "字节跳动(ByteDance)是一家成立于2012年的中国互联网科技企业,是最早将人工智能应用于移动场景的科技公司之一。公司的使命是“激发创造、丰富生活”,致力于建设全球创作与交流平台。凭借技术出海战略与本土化运营,字节跳动的业务已覆盖全球150个国家和地区,是全球最具价值的科技独角兽之一。如果您还有其他问题,欢迎随时询问!"
}

四、由SSE模式改造为Streamable HTTP模式

4.1 简介

Streamable-http streamable HTTP是MCP协议在2025年3月引入的一种新传输机制,旨在取代之前的HTTP+SSE传输模式。它的设计理念是在保留SSE优点的同时克服其限 制,特别是提供更好的可扩展性和企业环境兼容性。 Streamable HTTP的核心思想是提供一个统一的HTTP端点点,同时支持POST和 GET方法:
POST方法: 用于客户端向服务器发送请求和接收响应
GET方法(可选): 用于建立SSE流,接收服务器实时推送的消自息
与传统HTTP+SSE不同,StreamableHTTP不要求维护单独的初始化连接和消 息端点,简化了协议设计并提高了可靠性

4.2 原理

1.连接:在streamableHTTP模式下,并没有和SSE模式类似的"连接"过程(在 sse_client调用时),因为无需事先创建SSE连接
2. 客户端发起初始化请求(Initialize):如果是有状态模式,+会在返回消息的 HTTP头中携带session-id;
3. 客户端发起初始化确认(Initialized):此时如果已有session-id(有状态),客 户端会首先发起一次HTTPGet请求,以建立独立的SSE通道
4.后续正常交互:普通的交互都是通过Post通道来进行,只有两种情况会使用 SSE通道:服务端发起的通知与请求、以及会话恢复的事件发送。

4.3 Streamable HTTP对比SSE的优势

1.简化的通信模型
传统的HTTP+SSE方法需要两个不同的端点:一个用于建立立连接,另一个用于发送消息。而StreamableHTTP提供了一个统一的端点,简化了客户端和服务器之间的交 互。
2.支持无状态模式
StreamableHTTP的一个重要创新是支持完全无状态操作。通过设 sessionIdGenerator:() =>undefined,服务器可以在不维护会话状态的情况下 处理请求,非常适合无服务器环境。比如:部署在AWSLambda、Azure Functions 等无服务器环境。短暂交互而非长期连接的场景。
3.更好的可伸缩性
由于StreamableHTTP可以在无状态模式下运行,它非常适合容器化和自动扩展场景。服务器不需要维护长期连接,可以根据请求动态分配资源,显著提高可伸缩性。这解决了SSE的一个主要问题:当有大量客户端时,每个客户端都需要维持一个长连接,可能导致服务器资源耗尽。使用StreamableHTTP的无状态模式,服务器只在处 理请求时分配资源,处理完成后即可释放。
4.提高的可靠性
StreamableHTTP的简化设计减少了出错机会:

  • 会话管理:在有状态模式下,会话ID通过HTTP头而非查询参数传递,减少安 全风险
  • 重连处理:客户端可以在会话有效期内随时重连,无需复杂的重连逻辑
  • 错误恢复:简化的协议使错误处理和恢复更加直观

5.更好的企业环境兼容性
在企业环境中,代理服务器和防火墙常常会阻止非标准HTTP连主接。Streamable HTTP使用标准HTTP通信,大大减少了这类问题:

  • 使用标准HTTPPOST和GET,无需特殊配置 不
  • 依赖长连接,减少代理超时问题
  • 会话ID通过HTTP头传递,更符合企业安全要
4.4client.py改造
import os
import dotenv
import logging

from mcp.client.streamable_http import streamable_http_client
from mcp import ClientSession

dotenv.load_dotenv()

log = logging.getLogger(__name__)

MCP_SERVER_URL = os.getenv("MCP_SERVER_URL")


# =========================================================
# 获取 MCP tools schema
# =========================================================
async def load_mcp_tools():

    log.info("加载 MCP tools")

    async with streamable_http_client(MCP_SERVER_URL) as (read, write, _):

        async with ClientSession(read, write) as session:

            await session.initialize()

            tools = await session.list_tools()

            log.info("MCP 工具列表: %s", [t.name for t in tools.tools])

            return tools


# =========================================================
# 调用 MCP tool
# =========================================================
async def call_mcp_tool(tool_name, args):

    log.info("创建 MCP session 调用工具")

    async with streamable_http_client(MCP_SERVER_URL) as (read, write, _):

        async with ClientSession(read, write) as session:

            await session.initialize()

            result = await session.call_tool(tool_name, args)

            return result

async def close_mcp():

    global ctx

    if ctx:
        await ctx.__aexit__(None, None, None)
4.5 graph_agent.py改造
import dotenv
import logging
from typing import Annotated, TypedDict

from langchain_core.tools import StructuredTool
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from pydantic import BaseModel
from mcp_client.client import load_mcp_tools, call_mcp_tool
from mcp_memory.redis_memory import init_redis, get_redis_saver
from agent.schema_loader import load_schema
from agent.prompts import build_system_prompt

dotenv.load_dotenv()

# =========================================================
# 1.日志配置
# =========================================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

log = logging.getLogger(__name__)

# =========================================================
# 2.LLM 初始化
# =========================================================
# 方式一: OpenAI远端模型
llm = init_chat_model(model="gpt-4o-mini")
# 方式二: 本地模型
# llm = init_chat_model(model="qwen2.5:3b",model_provider="ollama",base_url="http://localhost:11434",api_key=SecretStr("ollama"))

graph = None


# =========================================================
# 3.Agent State
# =========================================================
class AgentState(TypedDict):
    """
    LangGraph 状态结构

    add_messages 作用:
    自动累加 messages,实现对话记忆
    """
    messages: Annotated[list[BaseMessage], add_messages]


class QueryInput(BaseModel):
    query: str


# =========================================================
# 4.MCP 工具注册
# =========================================================
def build_tools(tools_result):
    lc_tools = []

    for t in tools_result.tools:
        tool_name = t.name
        desc = t.description or f"MCP tool: {tool_name}"

        async def run(query: str, name=tool_name):
            log.info("调用 MCP 工具: %s", name)

            result = await call_mcp_tool(
                name,
                {"query": query}
            )

            tool_result = result.content[0].text

            log.info("工具返回结果: %s", tool_result)

            return tool_result

        lc_tools.append(
            StructuredTool.from_function(
                name=tool_name,
                description=desc,
                coroutine=run,
                args_schema=QueryInput
            )
        )

    return lc_tools


# =========================================================
# 5.构建 Graph
# =========================================================
async def build_graph():
    """
    系统初始化

    1 初始化 Redis Memory
    2 加载数据库 Schema
    3 初始化 MCP 工具
    4 构建 LangGraph Agent
    """

    global graph

    log.info("===== 系统启动 =====")

    # 5.1初始化 Redis
    await init_redis()
    saver = get_redis_saver()

    # 5.2加载数据库结构
    log.info("加载数据库 Schema")
    schema_text = load_schema()

    # 5.3构建系统 Prompt
    full_system_prompt = build_system_prompt(schema_text)

    # 5.4连接 MCP
    tools = await load_mcp_tools()
    lc_tools = build_tools(tools)

    log.info("工具注册完成,共 %d 个工具", len(lc_tools))

    # 5.6绑定工具
    llm_with_tools = llm.bind_tools(lc_tools)

    # 5.7 创建了一个“图结构的工作流构建器”,并定义了整个 Agent 的状态结构
    builder = StateGraph(AgentState)

    # =========================================================
    # 5.8 Agent 节点
    # =========================================================
    async def agent(state: AgentState):

        messages = state["messages"]
        question = messages[-1].content

        log.info("用户问题: %s", question)

        # 5.9 第一次 LLM 调用 作用:让模型判断是否需要调用工具
        resp = await llm_with_tools.ainvoke(
            [SystemMessage(content=full_system_prompt)] + messages
        )

        # =====================================================
        # 5.10 需要调用工具
        # =====================================================
        if hasattr(resp, "tool_calls") and resp.tool_calls:

            call = resp.tool_calls[0]

            tool_name = call["name"]
            args = call["args"]

            log.info("LLM 决定调用工具:LLM选择工具: %s,工具参数: %s", tool_name, args)

            # 调用 MCP 工具
            ret = await call_mcp_tool(tool_name, args)

            tool_result = ret.content[0].text

            log.info("工具返回: %s", tool_result)

            # 第二次调用 LLM
            # 作用:根据工具返回结果生成自然语言回答
            final_answer = await call_llm(
                messages,
                question,
                full_system_prompt,
                tool_result
            )

            return {
                "messages": messages + [
                    AIMessage(content=final_answer)
                ]
            }

        # =====================================================
        # 5.11 不需要工具
        # =====================================================
        else:

            log.info("LLM 判断无需调用工具")

            return {
                "messages": messages + [
                    AIMessage(content=resp.content)
                ]
            }

    builder.add_node("agent", agent)
    builder.set_entry_point("agent")
    # 构建 LangGraph 带redis存储
    graph = builder.compile(checkpointer=saver)

    log.info("LangGraph 构建完成")


# =========================================================
# 6.Agent 调用入口
# =========================================================
async def run_agent(user_id: str, msg: str):
    """
    Agent 执行入口
    """

    log.info("收到请求 user_id=%s", user_id)

    result = await graph.ainvoke(
        {
            "messages": [HumanMessage(content=msg)]
        },
        config={
            "configurable": {
                "thread_id": user_id
            }
        }
    )

    answer = result["messages"][-1].content

    log.info("Agent 返回结果: %s", answer)

    return answer


# =========================================================
# 7.二次 LLM 调用
# =========================================================
async def call_llm(messages, question, system_prompt, tool_result):
    """
    第二次调用 LLM

    作用:
    将工具返回结果整理成用户可读的回答
    """

    log.info("LLM 根据工具结果生成回答")

    response = await llm.ainvoke(
        [SystemMessage(content=system_prompt)]
        + messages
        + [
            HumanMessage(content=f"""
用户问题:
{question}

工具返回:
{tool_result}

请根据结果回答用户问题
""")
        ]
    )

    return response.content

五、FASTMCP 与MCP的区别与联系

5.1 对比FASTMCP 与MCP
对比 MCP FASTMCP
类型 协议 框架
全称 Model Context Protocol Fast Model Context Protocol
作用 定义LLM如何调用工具 快速实现MCP Server
类似 HTTP Fast API
用途 规范接口工具 写服务工具

总结:MCP 是client与server的通信标准,FastMCP 是server端的实现。是两个不同的东西

5.2 对比多种实现MCP服务端的方式

FastMCP 只是一个“快速实现服务器”的框架。如果不用 FastMCP,其实还有多种方式实现 MCP Server
以下是多种实现MCP Server的方法

方式 一:Fast MCP

from fastmcp import FastMCP
mcp = FastMCP("ecommerce-tools")
@mcp.tool()
def mysql_query(query: str):
    log.info("调用 mysql_query")
    log.info(f"SQL: {query}")

    return safe_query(query)

方式 二:MCP Server(官方)

from mcp.server import Server

server = Server("demo-server")

@server.tool()
async def add(a: int, b: int) -> int:
    """加法计算"""
    return a + b

方式三 Fast API

from fastapi import FastAPI
app = FastAPI()
@app.post("/mcp")
async def mcp_endpoint(req: dict):
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐